533 lines
20 KiB
Python
533 lines
20 KiB
Python
"""Tests for ExperienceStore - 任务经验记录、检索和指标追踪"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from datetime import datetime, timedelta, timezone
|
||
|
||
import pytest
|
||
|
||
from agentkit.evolution.experience_schema import EvolutionMetrics, TaskExperience
|
||
from agentkit.evolution.experience_store import (
|
||
InMemoryExperienceStore,
|
||
_parse_time_window,
|
||
)
|
||
from agentkit.memory.embedder import MockEmbedder
|
||
from agentkit.utils.vector_math import compute_cosine_similarity
|
||
|
||
|
||
# ── Fixtures ──────────────────────────────────────────────
|
||
|
||
|
||
@pytest.fixture
|
||
def mock_embedder():
|
||
"""MockEmbedder 实例,生成确定性伪向量"""
|
||
return MockEmbedder(dimension=64)
|
||
|
||
|
||
@pytest.fixture
|
||
def store(mock_embedder):
|
||
"""带 MockEmbedder 的 InMemoryExperienceStore"""
|
||
return InMemoryExperienceStore(embedder=mock_embedder, decay_rate=0.01, alpha=0.7)
|
||
|
||
|
||
@pytest.fixture
|
||
def store_no_embedder():
|
||
"""无 embedder 的 InMemoryExperienceStore"""
|
||
return InMemoryExperienceStore(decay_rate=0.01, alpha=0.7)
|
||
|
||
|
||
def _make_experience(
|
||
task_type: str = "code_review",
|
||
goal: str = "Review the PR",
|
||
outcome: str = "success",
|
||
duration_seconds: float = 10.0,
|
||
success_rate: float = 1.0,
|
||
failure_reasons: list[str] | None = None,
|
||
optimization_tips: list[str] | None = None,
|
||
created_at: datetime | None = None,
|
||
) -> TaskExperience:
|
||
"""创建测试用 TaskExperience"""
|
||
return TaskExperience(
|
||
experience_id="",
|
||
task_type=task_type,
|
||
goal=goal,
|
||
steps_summary=f"Executed {task_type} task",
|
||
outcome=outcome,
|
||
duration_seconds=duration_seconds,
|
||
success_rate=success_rate,
|
||
failure_reasons=failure_reasons or [],
|
||
optimization_tips=optimization_tips or [],
|
||
created_at=created_at or datetime.now(timezone.utc),
|
||
)
|
||
|
||
|
||
# ── TaskExperience 数据模型测试 ────────────────────────────
|
||
|
||
|
||
class TestTaskExperience:
|
||
def test_to_dict(self):
|
||
exp = TaskExperience(
|
||
experience_id="exp-1",
|
||
task_type="code_review",
|
||
goal="Review PR",
|
||
steps_summary="Checked code",
|
||
outcome="success",
|
||
duration_seconds=5.0,
|
||
success_rate=1.0,
|
||
failure_reasons=[],
|
||
optimization_tips=["Use faster linter"],
|
||
)
|
||
d = exp.to_dict()
|
||
assert d["experience_id"] == "exp-1"
|
||
assert d["task_type"] == "code_review"
|
||
assert d["outcome"] == "success"
|
||
assert d["duration_seconds"] == 5.0
|
||
assert "embedding" not in d # embedding 不应出现在字典中
|
||
assert d["optimization_tips"] == ["Use faster linter"]
|
||
|
||
def test_text_for_embedding(self):
|
||
exp = TaskExperience(
|
||
task_type="code_review",
|
||
goal="Review the PR",
|
||
steps_summary="Checked code style",
|
||
failure_reasons=["timeout"],
|
||
optimization_tips=["Increase timeout"],
|
||
)
|
||
text = exp.text_for_embedding()
|
||
assert "code_review" in text
|
||
assert "Review the PR" in text
|
||
assert "timeout" in text
|
||
assert "Increase timeout" in text
|
||
|
||
def test_text_for_embedding_minimal(self):
|
||
exp = TaskExperience(task_type="test", goal="Run tests")
|
||
text = exp.text_for_embedding()
|
||
assert "test" in text
|
||
assert "Run tests" in text
|
||
|
||
|
||
# ── EvolutionMetrics 数据模型测试 ──────────────────────────
|
||
|
||
|
||
class TestEvolutionMetrics:
|
||
def test_to_dict(self):
|
||
now = datetime.now(timezone.utc)
|
||
m = EvolutionMetrics(
|
||
task_type="code_review",
|
||
time_window="24h",
|
||
completion_rate=0.9,
|
||
avg_duration=12.5,
|
||
retry_rate=0.1,
|
||
sample_count=100,
|
||
window_start=now,
|
||
window_end=now,
|
||
)
|
||
d = m.to_dict()
|
||
assert d["task_type"] == "code_review"
|
||
assert d["completion_rate"] == 0.9
|
||
assert d["avg_duration"] == 12.5
|
||
assert d["retry_rate"] == 0.1
|
||
assert d["sample_count"] == 100
|
||
|
||
|
||
# ── 辅助函数测试 ──────────────────────────────────────────
|
||
|
||
|
||
class TestHelperFunctions:
|
||
def test_cosine_similarity_identical(self):
|
||
vec = [1.0, 0.0, 0.0]
|
||
assert compute_cosine_similarity(vec, vec) == pytest.approx(1.0)
|
||
|
||
def test_cosine_similarity_orthogonal(self):
|
||
a = [1.0, 0.0]
|
||
b = [0.0, 1.0]
|
||
assert compute_cosine_similarity(a, b) == pytest.approx(0.0)
|
||
|
||
def test_cosine_similarity_opposite(self):
|
||
a = [1.0, 0.0]
|
||
b = [-1.0, 0.0]
|
||
assert compute_cosine_similarity(a, b) == pytest.approx(-1.0)
|
||
|
||
def test_cosine_similarity_empty(self):
|
||
assert compute_cosine_similarity([], []) == 0.0
|
||
|
||
def test_cosine_similarity_mismatched_dims(self):
|
||
assert compute_cosine_similarity([1.0], [1.0, 2.0]) == 0.0
|
||
|
||
def test_parse_time_window_hours(self):
|
||
delta = _parse_time_window("24h")
|
||
assert delta == timedelta(hours=24)
|
||
|
||
def test_parse_time_window_days(self):
|
||
delta = _parse_time_window("7d")
|
||
assert delta == timedelta(days=7)
|
||
|
||
def test_parse_time_window_unknown_unit(self):
|
||
delta = _parse_time_window("30m")
|
||
assert delta == timedelta(hours=24) # fallback
|
||
|
||
|
||
# ── InMemoryExperienceStore.record_experience 测试 ────────
|
||
|
||
|
||
class TestRecordExperience:
|
||
async def test_record_returns_experience_id(self, store):
|
||
exp = _make_experience()
|
||
exp_id = await store.record_experience(exp)
|
||
assert exp_id is not None
|
||
assert len(exp_id) > 0
|
||
|
||
async def test_record_auto_generates_id(self, store):
|
||
exp = _make_experience()
|
||
assert exp.experience_id == ""
|
||
exp_id = await store.record_experience(exp)
|
||
assert exp.experience_id == exp_id
|
||
|
||
async def test_record_auto_generates_embedding(self, store):
|
||
exp = _make_experience()
|
||
assert exp.embedding is None
|
||
await store.record_experience(exp)
|
||
assert exp.embedding is not None
|
||
assert len(exp.embedding) == 64
|
||
|
||
async def test_record_preserves_existing_embedding(self, store):
|
||
custom_embedding = [0.1] * 64
|
||
exp = _make_experience()
|
||
exp.embedding = custom_embedding
|
||
await store.record_experience(exp)
|
||
# 内部存储的副本应保留原始 embedding
|
||
stored = store._experiences[exp.experience_id]
|
||
assert stored.embedding == custom_embedding
|
||
|
||
async def test_record_without_embedder(self, store_no_embedder):
|
||
exp = _make_experience()
|
||
await store_no_embedder.record_experience(exp)
|
||
assert exp.embedding is None
|
||
|
||
async def test_record_success_experience(self, store):
|
||
exp = _make_experience(outcome="success", success_rate=1.0)
|
||
exp_id = await store.record_experience(exp)
|
||
stored = store._experiences[exp_id]
|
||
assert stored.outcome == "success"
|
||
assert stored.success_rate == 1.0
|
||
|
||
async def test_record_failure_experience(self, store):
|
||
exp = _make_experience(
|
||
outcome="failure",
|
||
success_rate=0.0,
|
||
failure_reasons=["timeout", "connection refused"],
|
||
)
|
||
exp_id = await store.record_experience(exp)
|
||
stored = store._experiences[exp_id]
|
||
assert stored.outcome == "failure"
|
||
assert stored.failure_reasons == ["timeout", "connection refused"]
|
||
|
||
async def test_record_stores_independent_copy(self, store):
|
||
"""验证存储的是副本,外部修改不影响内部"""
|
||
exp = _make_experience(failure_reasons=["original"])
|
||
exp_id = await store.record_experience(exp)
|
||
exp.failure_reasons.append("modified")
|
||
stored = store._experiences[exp_id]
|
||
assert stored.failure_reasons == ["original"]
|
||
|
||
|
||
# ── InMemoryExperienceStore.search 测试 ───────────────────
|
||
|
||
|
||
class TestSearchExperience:
|
||
async def test_search_returns_results(self, store):
|
||
await store.record_experience(
|
||
_make_experience(task_type="code_review", goal="Review Python code")
|
||
)
|
||
await store.record_experience(
|
||
_make_experience(task_type="data_analysis", goal="Analyze sales data")
|
||
)
|
||
|
||
results = await store.search("Review Python code", top_k=2)
|
||
assert len(results) == 2
|
||
# 验证返回的经验包含已记录的 task_type
|
||
task_types = {r.task_type for r in results}
|
||
assert "code_review" in task_types
|
||
|
||
async def test_search_with_task_type_filter(self, store):
|
||
await store.record_experience(
|
||
_make_experience(task_type="code_review", goal="Review code")
|
||
)
|
||
await store.record_experience(
|
||
_make_experience(task_type="data_analysis", goal="Analyze data")
|
||
)
|
||
|
||
results = await store.search("code", top_k=5, task_type="code_review")
|
||
assert all(r.task_type == "code_review" for r in results)
|
||
|
||
async def test_search_empty_store(self, store):
|
||
results = await store.search("anything", top_k=5)
|
||
assert results == []
|
||
|
||
async def test_search_top_k_limit(self, store):
|
||
for i in range(10):
|
||
await store.record_experience(
|
||
_make_experience(task_type="code_review", goal=f"Task {i}")
|
||
)
|
||
results = await store.search("code review", top_k=3)
|
||
assert len(results) == 3
|
||
|
||
async def test_search_without_embedder(self, store_no_embedder):
|
||
await store_no_embedder.record_experience(
|
||
_make_experience(task_type="code_review", goal="Review code", success_rate=0.9)
|
||
)
|
||
await store_no_embedder.record_experience(
|
||
_make_experience(task_type="code_review", goal="Check code", success_rate=0.5)
|
||
)
|
||
# 无 embedder 时,按 time_decay 排序(success_rate * decay)
|
||
results = await store_no_embedder.search("code", top_k=2)
|
||
assert len(results) == 2
|
||
# success_rate=0.9 的应排在前面
|
||
assert results[0].success_rate == 0.9
|
||
|
||
|
||
# ── 时效性衰减测试 ─────────────────────────────────────────
|
||
|
||
|
||
class TestTimeDecay:
|
||
async def test_recent_experiences_ranked_higher(self, store):
|
||
now = datetime.now(timezone.utc)
|
||
old_exp = _make_experience(
|
||
task_type="code_review",
|
||
goal="Review old code",
|
||
success_rate=1.0,
|
||
created_at=now - timedelta(hours=100),
|
||
)
|
||
recent_exp = _make_experience(
|
||
task_type="code_review",
|
||
goal="Review recent code",
|
||
success_rate=1.0,
|
||
created_at=now,
|
||
)
|
||
await store.record_experience(old_exp)
|
||
await store.record_experience(recent_exp)
|
||
|
||
results = await store.search("Review code", top_k=2)
|
||
# 两个经验 success_rate 相同,但近期经验的 time_decay 更高
|
||
assert results[0].created_at > results[1].created_at
|
||
|
||
async def test_high_success_rate_compensates_age(self, store_no_embedder):
|
||
"""高 success_rate 的旧经验可能仍排在低 success_rate 的新经验之前"""
|
||
now = datetime.now(timezone.utc)
|
||
old_good = _make_experience(
|
||
task_type="code_review",
|
||
goal="Review code",
|
||
success_rate=1.0,
|
||
created_at=now - timedelta(hours=1),
|
||
)
|
||
new_bad = _make_experience(
|
||
task_type="code_review",
|
||
goal="Review code",
|
||
success_rate=0.1,
|
||
created_at=now,
|
||
)
|
||
await store_no_embedder.record_experience(old_good)
|
||
await store_no_embedder.record_experience(new_bad)
|
||
|
||
results = await store_no_embedder.search("code", top_k=2)
|
||
# old_good: 1.0 * exp(-0.01*1) ≈ 0.99
|
||
# new_bad: 0.1 * exp(0) = 0.1
|
||
# old_good 应排在前面
|
||
assert results[0].success_rate == 1.0
|
||
|
||
|
||
# ── InMemoryExperienceStore.get_metrics 测试 ──────────────
|
||
|
||
|
||
class TestGetMetrics:
|
||
async def test_metrics_single_task_type(self, store):
|
||
await store.record_experience(
|
||
_make_experience(task_type="code_review", outcome="success", duration_seconds=10.0)
|
||
)
|
||
await store.record_experience(
|
||
_make_experience(task_type="code_review", outcome="failure", duration_seconds=20.0, success_rate=0.0)
|
||
)
|
||
|
||
metrics = await store.get_metrics(task_type="code_review", time_window="24h")
|
||
assert len(metrics) == 1
|
||
m = metrics[0]
|
||
assert m.task_type == "code_review"
|
||
assert m.completion_rate == 0.5 # 1 success / 2 total
|
||
assert m.avg_duration == 15.0 # (10 + 20) / 2
|
||
assert m.retry_rate == 0.5 # 1 with success_rate < 1.0
|
||
assert m.sample_count == 2
|
||
|
||
async def test_metrics_multiple_task_types(self, store):
|
||
await store.record_experience(
|
||
_make_experience(task_type="code_review", outcome="success", duration_seconds=10.0)
|
||
)
|
||
await store.record_experience(
|
||
_make_experience(task_type="data_analysis", outcome="success", duration_seconds=30.0)
|
||
)
|
||
|
||
metrics = await store.get_metrics(time_window="24h")
|
||
assert len(metrics) == 2
|
||
task_types = {m.task_type for m in metrics}
|
||
assert task_types == {"code_review", "data_analysis"}
|
||
|
||
async def test_metrics_empty_store(self, store):
|
||
metrics = await store.get_metrics(time_window="24h")
|
||
assert metrics == []
|
||
|
||
async def test_metrics_respects_time_window(self, store):
|
||
now = datetime.now(timezone.utc)
|
||
# 旧经验(超出 1h 窗口)
|
||
await store.record_experience(
|
||
_make_experience(
|
||
task_type="code_review",
|
||
outcome="success",
|
||
created_at=now - timedelta(hours=2),
|
||
)
|
||
)
|
||
# 新经验(在 1h 窗口内)
|
||
await store.record_experience(
|
||
_make_experience(
|
||
task_type="code_review",
|
||
outcome="failure",
|
||
created_at=now,
|
||
)
|
||
)
|
||
|
||
metrics = await store.get_metrics(task_type="code_review", time_window="1h")
|
||
assert len(metrics) == 1
|
||
assert metrics[0].sample_count == 1
|
||
assert metrics[0].completion_rate == 0.0 # 只有 failure
|
||
|
||
async def test_metrics_completion_rate(self, store):
|
||
for _ in range(8):
|
||
await store.record_experience(
|
||
_make_experience(task_type="test", outcome="success")
|
||
)
|
||
for _ in range(2):
|
||
await store.record_experience(
|
||
_make_experience(task_type="test", outcome="failure", success_rate=0.0)
|
||
)
|
||
|
||
metrics = await store.get_metrics(task_type="test", time_window="24h")
|
||
assert len(metrics) == 1
|
||
assert metrics[0].completion_rate == pytest.approx(0.8)
|
||
|
||
async def test_metrics_retry_rate(self, store):
|
||
await store.record_experience(
|
||
_make_experience(task_type="test", outcome="success", success_rate=1.0)
|
||
)
|
||
await store.record_experience(
|
||
_make_experience(task_type="test", outcome="success", success_rate=0.5)
|
||
)
|
||
await store.record_experience(
|
||
_make_experience(task_type="test", outcome="failure", success_rate=0.0)
|
||
)
|
||
|
||
metrics = await store.get_metrics(task_type="test", time_window="24h")
|
||
assert len(metrics) == 1
|
||
# 2 out of 3 have success_rate < 1.0
|
||
assert metrics[0].retry_rate == pytest.approx(2.0 / 3.0)
|
||
|
||
async def test_metrics_time_window_values(self, store):
|
||
await store.record_experience(
|
||
_make_experience(task_type="test", outcome="success")
|
||
)
|
||
|
||
metrics = await store.get_metrics(task_type="test", time_window="7d")
|
||
assert len(metrics) == 1
|
||
assert metrics[0].time_window == "7d"
|
||
|
||
|
||
# ── 语义搜索集成测试 ──────────────────────────────────────
|
||
|
||
|
||
class TestSemanticSearchIntegration:
|
||
async def test_semantic_search_returns_all_relevant(self, store):
|
||
"""语义搜索应返回所有已记录的经验"""
|
||
await store.record_experience(
|
||
_make_experience(task_type="code_review", goal="Review Python code for bugs")
|
||
)
|
||
await store.record_experience(
|
||
_make_experience(task_type="data_analysis", goal="Analyze quarterly sales report")
|
||
)
|
||
await store.record_experience(
|
||
_make_experience(task_type="code_review", goal="Check Java code style")
|
||
)
|
||
|
||
results = await store.search("Find bugs in Python code", top_k=3)
|
||
assert len(results) == 3
|
||
# 验证所有经验都被检索到
|
||
goals = {r.goal for r in results}
|
||
assert len(goals) == 3
|
||
|
||
async def test_semantic_search_with_filter(self, store):
|
||
"""语义搜索 + task_type 过滤"""
|
||
await store.record_experience(
|
||
_make_experience(task_type="code_review", goal="Review Python code")
|
||
)
|
||
await store.record_experience(
|
||
_make_experience(task_type="data_analysis", goal="Review data quality")
|
||
)
|
||
|
||
results = await store.search("Review", top_k=5, task_type="code_review")
|
||
assert all(r.task_type == "code_review" for r in results)
|
||
|
||
|
||
# ── 端到端流程测试 ─────────────────────────────────────────
|
||
|
||
|
||
class TestEndToEnd:
|
||
async def test_record_and_retrieve(self, store):
|
||
"""记录经验后可检索到"""
|
||
exp = _make_experience(
|
||
task_type="code_review",
|
||
goal="Review PR #123",
|
||
outcome="success",
|
||
duration_seconds=15.0,
|
||
optimization_tips=["Use faster linter"],
|
||
)
|
||
exp_id = await store.record_experience(exp)
|
||
|
||
results = await store.search("Review PR", top_k=5)
|
||
assert len(results) >= 1
|
||
found = [r for r in results if r.experience_id == exp_id]
|
||
assert len(found) == 1
|
||
assert found[0].goal == "Review PR #123"
|
||
assert found[0].optimization_tips == ["Use faster linter"]
|
||
|
||
async def test_failure_experience_retrievable(self, store):
|
||
"""失败经验可被检索"""
|
||
exp = _make_experience(
|
||
task_type="deployment",
|
||
goal="Deploy to production",
|
||
outcome="failure",
|
||
failure_reasons=["Health check failed", "Timeout"],
|
||
)
|
||
exp_id = await store.record_experience(exp)
|
||
|
||
results = await store.search("Deploy to production", top_k=5)
|
||
assert len(results) >= 1
|
||
found = [r for r in results if r.experience_id == exp_id]
|
||
assert len(found) == 1
|
||
assert found[0].failure_reasons == ["Health check failed", "Timeout"]
|
||
|
||
async def test_metrics_after_multiple_records(self, store):
|
||
"""多次记录后指标正确聚合"""
|
||
for i in range(5):
|
||
await store.record_experience(
|
||
_make_experience(
|
||
task_type="code_review",
|
||
outcome="success" if i < 4 else "failure",
|
||
duration_seconds=10.0 + i,
|
||
success_rate=1.0 if i < 4 else 0.0,
|
||
)
|
||
)
|
||
|
||
metrics = await store.get_metrics(task_type="code_review", time_window="24h")
|
||
assert len(metrics) == 1
|
||
m = metrics[0]
|
||
assert m.sample_count == 5
|
||
assert m.completion_rate == pytest.approx(0.8) # 4/5
|
||
assert m.avg_duration == pytest.approx(12.0) # (10+11+12+13+14)/5
|
||
assert m.retry_rate == pytest.approx(0.2) # 1/5
|