fischer-agentkit/tests/unit/test_pitfall_injection.py

649 lines
25 KiB
Python

"""Tests for U7: pitfall retrieval/injection at planning phase (R12).
Covers:
- PitfallDetector.check_pitfalls with goal param (semantic similarity retrieval)
- build_pitfall_warning_section helper (HIGH gate)
- ReActEngine pitfall_warnings param injection into system prompt
- PlanExecEngine pitfall_detector integration at planning phase
- Backward compatibility with existing callers (evolution_dashboard)
- Error/failure paths: None store, search raises, detector None on engine
"""
from __future__ import annotations
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from agentkit.core.plan_exec_engine import PlanExecEngine
from agentkit.core.plan_schema import PlanStep, PlanStepStatus
from agentkit.core.react import ReActEngine
from agentkit.evolution.experience_schema import TaskExperience
from agentkit.evolution.experience_store import InMemoryExperienceStore
from agentkit.evolution.pitfall_detector import (
PitfallDetector,
PitfallWarning,
WarningLevel,
build_pitfall_warning_section,
)
from agentkit.llm.gateway import LLMGateway
from agentkit.llm.protocol import LLMResponse, TokenUsage
# ── Helpers ──────────────────────────────────────────────
def _make_experience(
task_type: str = "deployment",
goal: str = "Deploy the service",
outcome: str = "success",
steps_summary: str | list[dict] = "",
failure_reasons: list[str] | None = None,
optimization_tips: list[str] | None = None,
success_rate: float = 1.0,
) -> TaskExperience:
return TaskExperience(
experience_id="",
task_type=task_type,
goal=goal,
steps_summary=steps_summary,
outcome=outcome,
duration_seconds=10.0,
success_rate=success_rate,
failure_reasons=failure_reasons or [],
optimization_tips=optimization_tips or [],
created_at=datetime.now(timezone.utc),
)
def _make_step(
name: str = "step",
description: str = "do something",
step_id: str = "s1",
) -> PlanStep:
return PlanStep(
step_id=step_id,
name=name,
description=description,
status=PlanStepStatus.PENDING,
)
def _make_warning(
step_name: str = "Deploy Service",
level: WarningLevel = WarningLevel.HIGH,
failure_rate: float = 0.8,
) -> PitfallWarning:
return PitfallWarning(
step_name=step_name,
warning_level=level,
failure_rate=failure_rate,
historical_failures=["Timeout", "Connection refused"],
suggestion="Increase timeout and add retry",
confidence=0.9,
actor="test_agent",
)
def _make_response(content: str = "Done") -> LLMResponse:
return LLMResponse(
content=content,
model="test-model",
usage=TokenUsage(prompt_tokens=10, completion_tokens=20),
tool_calls=[],
)
def _make_mock_gateway(responses: list[LLMResponse] | None = None) -> MagicMock:
gateway = MagicMock(spec=LLMGateway)
if responses is not None:
gateway.chat = AsyncMock(side_effect=responses)
else:
gateway.chat = AsyncMock(return_value=_make_response())
return gateway
@pytest.fixture
def store():
return InMemoryExperienceStore(decay_rate=0.01, alpha=0.7)
@pytest.fixture
def detector(store):
return PitfallDetector(experience_store=store, similarity_threshold=0.3)
# ── build_pitfall_warning_section (HIGH gate) ──────────────────────
class TestBuildPitfallWarningSection:
def test_high_warnings_produce_section(self):
section = build_pitfall_warning_section([_make_warning(step_name="Deploy Service")])
assert "## 历史避坑提示" in section
assert "Deploy Service" in section
def test_only_high_warnings_injected(self):
"""Gate by HIGH: MEDIUM/LOW filtered out."""
warnings = [
_make_warning(step_name="High Step", level=WarningLevel.HIGH),
_make_warning(step_name="Medium Step", level=WarningLevel.MEDIUM),
_make_warning(step_name="Low Step", level=WarningLevel.LOW),
]
section = build_pitfall_warning_section(warnings)
assert "High Step" in section
assert "Medium Step" not in section
assert "Low Step" not in section
def test_empty_list_returns_empty(self):
assert build_pitfall_warning_section([]) == ""
def test_no_high_returns_empty(self):
warnings = [_make_warning(level=WarningLevel.MEDIUM)]
assert build_pitfall_warning_section(warnings) == ""
def test_includes_failure_reasons_and_suggestion(self):
section = build_pitfall_warning_section([_make_warning()])
assert "Timeout" in section
assert "Increase timeout" in section
# ── PitfallDetector.check_pitfalls with goal param ─────────────────
class TestCheckPitfallsGoalRetrieval:
async def test_goal_retrieves_similar_pitfalls(self, detector, store):
"""Happy path: goal text retrieves similar historical failures."""
for _ in range(6):
await store.record_experience(
_make_experience(
outcome="failure",
success_rate=0.0,
steps_summary=[
{"step_name": "Deploy Service", "outcome": "failure", "error": "Timeout"},
],
failure_reasons=["Deploy timeout"],
)
)
steps = [_make_step(name="Deploy Service", description="Deploy the service")]
warnings = await detector.check_pitfalls(
task_type="deployment",
planned_steps=steps,
goal="deploy the service to production",
top_k=3,
)
assert len(warnings) == 1
assert warnings[0].warning_level == WarningLevel.HIGH
assert warnings[0].step_name == "Deploy Service"
async def test_goal_without_task_type_retrieves(self, store):
"""Goal text provided but no task_type → still retrieves by goal similarity."""
await store.record_experience(
_make_experience(
task_type="ops",
outcome="failure",
success_rate=0.0,
steps_summary=[
{"step_name": "Call API Gateway", "outcome": "failure", "error": "Timeout"},
],
)
)
detector = PitfallDetector(experience_store=store, similarity_threshold=0.1)
steps = [_make_step(name="Call API Gateway")]
warnings = await detector.check_pitfalls(
task_type="",
planned_steps=steps,
goal="call api gateway endpoint",
)
assert len(warnings) >= 1
async def test_empty_planned_steps_returns_empty(self, detector):
warnings = await detector.check_pitfalls(
task_type="deployment",
planned_steps=[],
goal="deploy",
)
assert warnings == []
async def test_no_pitfalls_in_store_returns_empty(self, detector, store):
await store.record_experience(_make_experience(outcome="success", steps_summary=[]))
warnings = await detector.check_pitfalls(
task_type="deployment",
planned_steps=[_make_step(name="Deploy Service")],
goal="deploy",
)
assert warnings == []
async def test_all_low_severity_returns_warnings_but_no_high(self, detector, store):
"""All pitfalls low severity → warnings returned but HIGH gate filters injection."""
# Only 1 failure out of 10 → low failure rate → LOW warning
for _ in range(9):
await store.record_experience(
_make_experience(
outcome="success",
steps_summary=[
{"step_name": "Deploy Service", "outcome": "success"},
],
)
)
await store.record_experience(
_make_experience(
outcome="failure",
success_rate=0.0,
steps_summary=[
{"step_name": "Deploy Service", "outcome": "failure", "error": "flake"},
],
)
)
steps = [_make_step(name="Deploy Service")]
warnings = await detector.check_pitfalls(
task_type="deployment",
planned_steps=steps,
goal="deploy",
)
# Warnings exist but none are HIGH
assert len(warnings) >= 1
assert all(w.warning_level != WarningLevel.HIGH for w in warnings)
# Section builder should return empty (HIGH gate)
assert build_pitfall_warning_section(warnings) == ""
async def test_top_k_limits_results(self):
"""100+ entries → only top-3 by similarity retrieved; search called once."""
mock_store = MagicMock()
# 120 experiences all with the same failing step
experiences = [
_make_experience(
outcome="failure",
success_rate=0.0,
steps_summary=[
{"step_name": f"Step_{i}", "outcome": "failure", "error": f"err_{i}"},
],
)
for i in range(120)
]
mock_store.search = AsyncMock(return_value=experiences)
detector = PitfallDetector(experience_store=mock_store, similarity_threshold=0.01)
# 5 planned steps matching different historical steps
steps = [_make_step(name=f"Step_{i}", step_id=f"s{i}") for i in range(5)]
warnings = await detector.check_pitfalls(
task_type="deployment",
planned_steps=steps,
goal="deploy",
top_k=3,
)
# search called exactly once (no N+1 per step)
assert mock_store.search.call_count == 1
# top_k limits final warnings to 3
assert len(warnings) <= 3
# ── Error and failure paths (PitfallDetector) ──────────────────────
class TestPitfallDetectorErrorPaths:
async def test_store_none_skips_search(self):
"""experience_store unavailable (None) → skip, no exception."""
detector = PitfallDetector(experience_store=None)
warnings = await detector.check_pitfalls(
task_type="deployment",
planned_steps=[_make_step(name="Deploy")],
goal="deploy",
)
assert warnings == []
async def test_store_search_raises_returns_empty(self):
"""experience_store.search() raises → skip injection, continue."""
mock_store = MagicMock()
mock_store.search = AsyncMock(side_effect=RuntimeError("DB connection lost"))
detector = PitfallDetector(experience_store=mock_store)
warnings = await detector.check_pitfalls(
task_type="deployment",
planned_steps=[_make_step(name="Deploy")],
goal="deploy",
)
assert warnings == []
async def test_store_search_value_error_returns_empty(self):
mock_store = MagicMock()
mock_store.search = AsyncMock(side_effect=ValueError("bad query"))
detector = PitfallDetector(experience_store=mock_store)
warnings = await detector.check_pitfalls(
task_type="deployment",
planned_steps=[_make_step(name="Deploy")],
)
assert warnings == []
# ── ReActEngine pitfall_warnings injection ─────────────────────────
class TestReactEnginePitfallInjection:
async def test_high_warnings_injected_into_system_prompt(self):
"""pitfall_warnings param injects HIGH section into system prompt."""
gateway = _make_mock_gateway([_make_response(content="Done")])
engine = ReActEngine(llm_gateway=gateway, max_steps=3)
warning = _make_warning(step_name="Deploy Service", failure_rate=0.9)
await engine.execute(
messages=[{"role": "user", "content": "deploy the service"}],
system_prompt="You are a helpful assistant.",
pitfall_warnings=[warning],
)
call_kwargs = gateway.chat.call_args.kwargs
system_content = str(call_kwargs["messages"][0]["content"])
assert "## 历史避坑提示" in system_content
assert "Deploy Service" in system_content
async def test_no_warnings_no_injection(self):
"""Empty list or None = no-op (system_prompt unchanged)."""
gateway = _make_mock_gateway([_make_response(content="Done")])
engine = ReActEngine(llm_gateway=gateway, max_steps=3)
base_prompt = "You are a helpful assistant."
await engine.execute(
messages=[{"role": "user", "content": "hi"}],
system_prompt=base_prompt,
pitfall_warnings=None,
)
system_content = str(gateway.chat.call_args.kwargs["messages"][0]["content"])
assert "## 历史避坑提示" not in system_content
async def test_low_severity_not_injected(self):
"""Only HIGH severity injected; MEDIUM/LOW filtered out."""
gateway = _make_mock_gateway([_make_response(content="Done")])
engine = ReActEngine(llm_gateway=gateway, max_steps=3)
warnings = [
_make_warning(step_name="Medium Step", level=WarningLevel.MEDIUM),
_make_warning(step_name="Low Step", level=WarningLevel.LOW),
]
await engine.execute(
messages=[{"role": "user", "content": "hi"}],
system_prompt="base prompt",
pitfall_warnings=warnings,
)
system_content = str(gateway.chat.call_args.kwargs["messages"][0]["content"])
assert "## 历史避坑提示" not in system_content
assert "Medium Step" not in system_content
async def test_empty_list_no_injection(self):
gateway = _make_mock_gateway([_make_response(content="Done")])
engine = ReActEngine(llm_gateway=gateway, max_steps=3)
await engine.execute(
messages=[{"role": "user", "content": "hi"}],
system_prompt="base prompt",
pitfall_warnings=[],
)
system_content = str(gateway.chat.call_args.kwargs["messages"][0]["content"])
assert "## 历史避坑提示" not in system_content
# ── PlanExecEngine pitfall_detector integration ────────────────────
def _make_plan(
goal: str = "deploy the service",
steps: list[PlanStep] | None = None,
):
if steps is None:
steps = [
PlanStep(step_id="s0", name="Deploy Service", description="Deploy the service"),
PlanStep(step_id="s1", name="Verify Deployment", description="Check health"),
]
from agentkit.core.plan_schema import ExecutionPlan
return ExecutionPlan(goal=goal, steps=steps, parallel_groups=[["s0"], ["s1"]])
def _make_plan_result():
from agentkit.core.plan_executor import PlanExecutionResult, StepExecutionResult
from agentkit.core.protocol import TaskStatus
return PlanExecutionResult(
plan_id="test-plan",
step_results={
"s0": StepExecutionResult(
step_id="s0", status=PlanStepStatus.COMPLETED, result={"ok": True}
),
"s1": StepExecutionResult(
step_id="s1", status=PlanStepStatus.COMPLETED, result={"ok": True}
),
},
status=TaskStatus.COMPLETED,
total_duration_ms=100.0,
)
class TestPlanExecEnginePitfallInjection:
async def test_pitfalls_injected_into_system_prompt(self, store):
"""Happy path: top-3 HIGH pitfalls injected into system prompt at planning."""
# Seed failure data
for _ in range(6):
await store.record_experience(
_make_experience(
outcome="failure",
success_rate=0.0,
steps_summary=[
{"step_name": "Deploy Service", "outcome": "failure", "error": "Timeout"},
],
failure_reasons=["Deploy timeout"],
)
)
detector = PitfallDetector(experience_store=store, similarity_threshold=0.1)
engine = PlanExecEngine(llm_gateway=None, pitfall_detector=detector)
plan = _make_plan()
plan_result = _make_plan_result()
with (
patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)),
patch("agentkit.core.plan_exec_engine.ReActStepExecutor") as MockStepExec,
patch("agentkit.core.plan_exec_engine.PlanExecutor") as MockExecutor,
):
mock_exec = MagicMock()
mock_exec.execute = AsyncMock(return_value=plan_result)
MockExecutor.return_value = mock_exec
await engine.execute(
messages=[{"role": "user", "content": "deploy the service"}],
system_prompt="You are a deployment agent.",
)
# system_prompt passed to ReActStepExecutor must contain pitfall section
assert MockStepExec.call_count >= 1
sp = MockStepExec.call_args_list[0].kwargs.get("system_prompt") or ""
assert "## 历史避坑提示" in sp
assert "Deploy Service" in sp
async def test_pitfall_detector_none_skips_injection(self):
"""pitfall_detector is None → skip injection, no error."""
engine = PlanExecEngine(llm_gateway=None, pitfall_detector=None)
plan = _make_plan()
plan_result = _make_plan_result()
with (
patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)),
patch("agentkit.core.plan_exec_engine.ReActStepExecutor") as MockStepExec,
patch("agentkit.core.plan_exec_engine.PlanExecutor") as MockExecutor,
):
mock_exec = MagicMock()
mock_exec.execute = AsyncMock(return_value=plan_result)
MockExecutor.return_value = mock_exec
await engine.execute(
messages=[{"role": "user", "content": "deploy"}],
system_prompt="base prompt",
)
sp = MockStepExec.call_args_list[0].kwargs.get("system_prompt") or ""
assert "## 历史避坑提示" not in sp
async def test_check_pitfalls_raises_skips_injection(self):
"""PitfallDetector.check_pitfalls raises → skip injection, continue task."""
mock_detector = MagicMock()
mock_detector.check_pitfalls = AsyncMock(side_effect=RuntimeError("store down"))
engine = PlanExecEngine(llm_gateway=None, pitfall_detector=mock_detector)
plan = _make_plan()
plan_result = _make_plan_result()
with (
patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)),
patch("agentkit.core.plan_exec_engine.ReActStepExecutor") as MockStepExec,
patch("agentkit.core.plan_exec_engine.PlanExecutor") as MockExecutor,
):
mock_exec = MagicMock()
mock_exec.execute = AsyncMock(return_value=plan_result)
MockExecutor.return_value = mock_exec
# Should not raise
result = await engine.execute(
messages=[{"role": "user", "content": "deploy"}],
system_prompt="base prompt",
)
assert result is not None
sp = MockStepExec.call_args_list[0].kwargs.get("system_prompt") or ""
assert "## 历史避坑提示" not in sp
async def test_no_pitfalls_in_store_no_injection(self, store):
"""No pitfalls in store → no injection (system_prompt unchanged)."""
# Only success experiences
await store.record_experience(_make_experience(outcome="success", steps_summary=[]))
detector = PitfallDetector(experience_store=store)
engine = PlanExecEngine(llm_gateway=None, pitfall_detector=detector)
plan = _make_plan()
plan_result = _make_plan_result()
with (
patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)),
patch("agentkit.core.plan_exec_engine.ReActStepExecutor") as MockStepExec,
patch("agentkit.core.plan_exec_engine.PlanExecutor") as MockExecutor,
):
mock_exec = MagicMock()
mock_exec.execute = AsyncMock(return_value=plan_result)
MockExecutor.return_value = mock_exec
await engine.execute(
messages=[{"role": "user", "content": "deploy"}],
system_prompt="base prompt",
)
sp = MockStepExec.call_args_list[0].kwargs.get("system_prompt") or ""
assert "## 历史避坑提示" not in sp
async def test_all_low_severity_no_injection(self, store):
"""All pitfalls low severity → none injected (HIGH gate)."""
# 9 successes + 1 failure → 10% failure rate → LOW
for _ in range(9):
await store.record_experience(
_make_experience(
outcome="success",
steps_summary=[{"step_name": "Deploy Service", "outcome": "success"}],
)
)
await store.record_experience(
_make_experience(
outcome="failure",
success_rate=0.0,
steps_summary=[
{"step_name": "Deploy Service", "outcome": "failure", "error": "flake"},
],
)
)
detector = PitfallDetector(experience_store=store, similarity_threshold=0.1)
engine = PlanExecEngine(llm_gateway=None, pitfall_detector=detector)
plan = _make_plan()
plan_result = _make_plan_result()
with (
patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)),
patch("agentkit.core.plan_exec_engine.ReActStepExecutor") as MockStepExec,
patch("agentkit.core.plan_exec_engine.PlanExecutor") as MockExecutor,
):
mock_exec = MagicMock()
mock_exec.execute = AsyncMock(return_value=plan_result)
MockExecutor.return_value = mock_exec
await engine.execute(
messages=[{"role": "user", "content": "deploy"}],
system_prompt="base prompt",
)
sp = MockStepExec.call_args_list[0].kwargs.get("system_prompt") or ""
assert "## 历史避坑提示" not in sp
def test_constructor_injection_verified(self):
"""KTD-5: PitfallDetector app-state singleton via constructor injection."""
detector = PitfallDetector(experience_store=InMemoryExperienceStore())
engine = PlanExecEngine(llm_gateway=None, pitfall_detector=detector)
assert engine._pitfall_detector is detector
def test_constructor_default_none(self):
"""Default pitfall_detector is None (no injection)."""
engine = PlanExecEngine(llm_gateway=None)
assert engine._pitfall_detector is None
# ── Backward compatibility ─────────────────────────────────────────
class TestBackwardCompatibility:
async def test_old_call_form_still_works(self, detector, store):
"""Old call form check_pitfalls(task_type=..., planned_steps=..., actor=...) works."""
for _ in range(6):
await store.record_experience(
_make_experience(
outcome="failure",
success_rate=0.0,
steps_summary=[
{"step_name": "Deploy Service", "outcome": "failure", "error": "Timeout"},
],
)
)
# Old form: no goal, no top_k
warnings = await detector.check_pitfalls(
task_type="deployment",
planned_steps=[_make_step(name="Deploy Service")],
actor="test_agent",
)
assert len(warnings) == 1
assert warnings[0].actor == "test_agent"
async def test_evolution_dashboard_importable(self):
"""evolution_dashboard.py caller still works (module imports without error)."""
# Importing the module verifies the call site signature is still valid
# (check_pitfalls is called with task_type + planned_steps kwargs).
import agentkit.server.routes.evolution_dashboard # noqa: F401
async def test_existing_pitfall_detector_tests_compat(self, detector, store):
"""Existing test pattern (from test_evolution_auto_trigger) still works."""
await store.record_experience(
_make_experience(
task_type="testing",
goal="Run tests",
outcome="failure",
success_rate=0.0,
steps_summary=[
{"step_name": "Test Step", "outcome": "failure", "error": "assertion"},
],
)
)
steps = [
PlanStep(
step_id="s1",
name="Test Step",
description="Run tests",
status=PlanStepStatus.PENDING,
)
]
warnings = await detector.check_pitfalls(
task_type="testing", planned_steps=steps, actor="test_agent"
)
assert len(warnings) == 1