"""Tests for U7: pitfall retrieval/injection at planning phase (R12). Covers: - PitfallDetector.check_pitfalls with goal param (semantic similarity retrieval) - build_pitfall_warning_section helper (HIGH gate) - ReActEngine pitfall_warnings param injection into system prompt - PlanExecEngine pitfall_detector integration at planning phase - Backward compatibility with existing callers (evolution_dashboard) - Error/failure paths: None store, search raises, detector None on engine """ from __future__ import annotations from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock, patch import pytest from agentkit.core.plan_exec_engine import PlanExecEngine from agentkit.core.plan_schema import PlanStep, PlanStepStatus from agentkit.core.react import ReActEngine from agentkit.evolution.experience_schema import TaskExperience from agentkit.evolution.experience_store import InMemoryExperienceStore from agentkit.evolution.pitfall_detector import ( PitfallDetector, PitfallWarning, WarningLevel, build_pitfall_warning_section, ) from agentkit.llm.gateway import LLMGateway from agentkit.llm.protocol import LLMResponse, TokenUsage # ── Helpers ────────────────────────────────────────────── def _make_experience( task_type: str = "deployment", goal: str = "Deploy the service", outcome: str = "success", steps_summary: str | list[dict] = "", failure_reasons: list[str] | None = None, optimization_tips: list[str] | None = None, success_rate: float = 1.0, ) -> TaskExperience: return TaskExperience( experience_id="", task_type=task_type, goal=goal, steps_summary=steps_summary, outcome=outcome, duration_seconds=10.0, success_rate=success_rate, failure_reasons=failure_reasons or [], optimization_tips=optimization_tips or [], created_at=datetime.now(timezone.utc), ) def _make_step( name: str = "step", description: str = "do something", step_id: str = "s1", ) -> PlanStep: return PlanStep( step_id=step_id, name=name, description=description, status=PlanStepStatus.PENDING, ) def _make_warning( step_name: str = "Deploy Service", level: WarningLevel = WarningLevel.HIGH, failure_rate: float = 0.8, ) -> PitfallWarning: return PitfallWarning( step_name=step_name, warning_level=level, failure_rate=failure_rate, historical_failures=["Timeout", "Connection refused"], suggestion="Increase timeout and add retry", confidence=0.9, actor="test_agent", ) def _make_response(content: str = "Done") -> LLMResponse: return LLMResponse( content=content, model="test-model", usage=TokenUsage(prompt_tokens=10, completion_tokens=20), tool_calls=[], ) def _make_mock_gateway(responses: list[LLMResponse] | None = None) -> MagicMock: gateway = MagicMock(spec=LLMGateway) if responses is not None: gateway.chat = AsyncMock(side_effect=responses) else: gateway.chat = AsyncMock(return_value=_make_response()) return gateway @pytest.fixture def store(): return InMemoryExperienceStore(decay_rate=0.01, alpha=0.7) @pytest.fixture def detector(store): return PitfallDetector(experience_store=store, similarity_threshold=0.3) # ── build_pitfall_warning_section (HIGH gate) ────────────────────── class TestBuildPitfallWarningSection: def test_high_warnings_produce_section(self): section = build_pitfall_warning_section([_make_warning(step_name="Deploy Service")]) assert "## 历史避坑提示" in section assert "Deploy Service" in section def test_only_high_warnings_injected(self): """Gate by HIGH: MEDIUM/LOW filtered out.""" warnings = [ _make_warning(step_name="High Step", level=WarningLevel.HIGH), _make_warning(step_name="Medium Step", level=WarningLevel.MEDIUM), _make_warning(step_name="Low Step", level=WarningLevel.LOW), ] section = build_pitfall_warning_section(warnings) assert "High Step" in section assert "Medium Step" not in section assert "Low Step" not in section def test_empty_list_returns_empty(self): assert build_pitfall_warning_section([]) == "" def test_no_high_returns_empty(self): warnings = [_make_warning(level=WarningLevel.MEDIUM)] assert build_pitfall_warning_section(warnings) == "" def test_includes_failure_reasons_and_suggestion(self): section = build_pitfall_warning_section([_make_warning()]) assert "Timeout" in section assert "Increase timeout" in section # ── PitfallDetector.check_pitfalls with goal param ───────────────── class TestCheckPitfallsGoalRetrieval: async def test_goal_retrieves_similar_pitfalls(self, detector, store): """Happy path: goal text retrieves similar historical failures.""" for _ in range(6): await store.record_experience( _make_experience( outcome="failure", success_rate=0.0, steps_summary=[ {"step_name": "Deploy Service", "outcome": "failure", "error": "Timeout"}, ], failure_reasons=["Deploy timeout"], ) ) steps = [_make_step(name="Deploy Service", description="Deploy the service")] warnings = await detector.check_pitfalls( task_type="deployment", planned_steps=steps, goal="deploy the service to production", top_k=3, ) assert len(warnings) == 1 assert warnings[0].warning_level == WarningLevel.HIGH assert warnings[0].step_name == "Deploy Service" async def test_goal_without_task_type_retrieves(self, store): """Goal text provided but no task_type → still retrieves by goal similarity.""" await store.record_experience( _make_experience( task_type="ops", outcome="failure", success_rate=0.0, steps_summary=[ {"step_name": "Call API Gateway", "outcome": "failure", "error": "Timeout"}, ], ) ) detector = PitfallDetector(experience_store=store, similarity_threshold=0.1) steps = [_make_step(name="Call API Gateway")] warnings = await detector.check_pitfalls( task_type="", planned_steps=steps, goal="call api gateway endpoint", ) assert len(warnings) >= 1 async def test_empty_planned_steps_returns_empty(self, detector): warnings = await detector.check_pitfalls( task_type="deployment", planned_steps=[], goal="deploy", ) assert warnings == [] async def test_no_pitfalls_in_store_returns_empty(self, detector, store): await store.record_experience(_make_experience(outcome="success", steps_summary=[])) warnings = await detector.check_pitfalls( task_type="deployment", planned_steps=[_make_step(name="Deploy Service")], goal="deploy", ) assert warnings == [] async def test_all_low_severity_returns_warnings_but_no_high(self, detector, store): """All pitfalls low severity → warnings returned but HIGH gate filters injection.""" # Only 1 failure out of 10 → low failure rate → LOW warning for _ in range(9): await store.record_experience( _make_experience( outcome="success", steps_summary=[ {"step_name": "Deploy Service", "outcome": "success"}, ], ) ) await store.record_experience( _make_experience( outcome="failure", success_rate=0.0, steps_summary=[ {"step_name": "Deploy Service", "outcome": "failure", "error": "flake"}, ], ) ) steps = [_make_step(name="Deploy Service")] warnings = await detector.check_pitfalls( task_type="deployment", planned_steps=steps, goal="deploy", ) # Warnings exist but none are HIGH assert len(warnings) >= 1 assert all(w.warning_level != WarningLevel.HIGH for w in warnings) # Section builder should return empty (HIGH gate) assert build_pitfall_warning_section(warnings) == "" async def test_top_k_limits_results(self): """100+ entries → only top-3 by similarity retrieved; search called once.""" mock_store = MagicMock() # 120 experiences all with the same failing step experiences = [ _make_experience( outcome="failure", success_rate=0.0, steps_summary=[ {"step_name": f"Step_{i}", "outcome": "failure", "error": f"err_{i}"}, ], ) for i in range(120) ] mock_store.search = AsyncMock(return_value=experiences) detector = PitfallDetector(experience_store=mock_store, similarity_threshold=0.01) # 5 planned steps matching different historical steps steps = [_make_step(name=f"Step_{i}", step_id=f"s{i}") for i in range(5)] warnings = await detector.check_pitfalls( task_type="deployment", planned_steps=steps, goal="deploy", top_k=3, ) # search called exactly once (no N+1 per step) assert mock_store.search.call_count == 1 # top_k limits final warnings to 3 assert len(warnings) <= 3 # ── Error and failure paths (PitfallDetector) ────────────────────── class TestPitfallDetectorErrorPaths: async def test_store_none_skips_search(self): """experience_store unavailable (None) → skip, no exception.""" detector = PitfallDetector(experience_store=None) warnings = await detector.check_pitfalls( task_type="deployment", planned_steps=[_make_step(name="Deploy")], goal="deploy", ) assert warnings == [] async def test_store_search_raises_returns_empty(self): """experience_store.search() raises → skip injection, continue.""" mock_store = MagicMock() mock_store.search = AsyncMock(side_effect=RuntimeError("DB connection lost")) detector = PitfallDetector(experience_store=mock_store) warnings = await detector.check_pitfalls( task_type="deployment", planned_steps=[_make_step(name="Deploy")], goal="deploy", ) assert warnings == [] async def test_store_search_value_error_returns_empty(self): mock_store = MagicMock() mock_store.search = AsyncMock(side_effect=ValueError("bad query")) detector = PitfallDetector(experience_store=mock_store) warnings = await detector.check_pitfalls( task_type="deployment", planned_steps=[_make_step(name="Deploy")], ) assert warnings == [] # ── ReActEngine pitfall_warnings injection ───────────────────────── class TestReactEnginePitfallInjection: async def test_high_warnings_injected_into_system_prompt(self): """pitfall_warnings param injects HIGH section into system prompt.""" gateway = _make_mock_gateway([_make_response(content="Done")]) engine = ReActEngine(llm_gateway=gateway, max_steps=3) warning = _make_warning(step_name="Deploy Service", failure_rate=0.9) await engine.execute( messages=[{"role": "user", "content": "deploy the service"}], system_prompt="You are a helpful assistant.", pitfall_warnings=[warning], ) call_kwargs = gateway.chat.call_args.kwargs system_content = str(call_kwargs["messages"][0]["content"]) assert "## 历史避坑提示" in system_content assert "Deploy Service" in system_content async def test_no_warnings_no_injection(self): """Empty list or None = no-op (system_prompt unchanged).""" gateway = _make_mock_gateway([_make_response(content="Done")]) engine = ReActEngine(llm_gateway=gateway, max_steps=3) base_prompt = "You are a helpful assistant." await engine.execute( messages=[{"role": "user", "content": "hi"}], system_prompt=base_prompt, pitfall_warnings=None, ) system_content = str(gateway.chat.call_args.kwargs["messages"][0]["content"]) assert "## 历史避坑提示" not in system_content async def test_low_severity_not_injected(self): """Only HIGH severity injected; MEDIUM/LOW filtered out.""" gateway = _make_mock_gateway([_make_response(content="Done")]) engine = ReActEngine(llm_gateway=gateway, max_steps=3) warnings = [ _make_warning(step_name="Medium Step", level=WarningLevel.MEDIUM), _make_warning(step_name="Low Step", level=WarningLevel.LOW), ] await engine.execute( messages=[{"role": "user", "content": "hi"}], system_prompt="base prompt", pitfall_warnings=warnings, ) system_content = str(gateway.chat.call_args.kwargs["messages"][0]["content"]) assert "## 历史避坑提示" not in system_content assert "Medium Step" not in system_content async def test_empty_list_no_injection(self): gateway = _make_mock_gateway([_make_response(content="Done")]) engine = ReActEngine(llm_gateway=gateway, max_steps=3) await engine.execute( messages=[{"role": "user", "content": "hi"}], system_prompt="base prompt", pitfall_warnings=[], ) system_content = str(gateway.chat.call_args.kwargs["messages"][0]["content"]) assert "## 历史避坑提示" not in system_content # ── PlanExecEngine pitfall_detector integration ──────────────────── def _make_plan( goal: str = "deploy the service", steps: list[PlanStep] | None = None, ): if steps is None: steps = [ PlanStep(step_id="s0", name="Deploy Service", description="Deploy the service"), PlanStep(step_id="s1", name="Verify Deployment", description="Check health"), ] from agentkit.core.plan_schema import ExecutionPlan return ExecutionPlan(goal=goal, steps=steps, parallel_groups=[["s0"], ["s1"]]) def _make_plan_result(): from agentkit.core.plan_executor import PlanExecutionResult, StepExecutionResult from agentkit.core.protocol import TaskStatus return PlanExecutionResult( plan_id="test-plan", step_results={ "s0": StepExecutionResult( step_id="s0", status=PlanStepStatus.COMPLETED, result={"ok": True} ), "s1": StepExecutionResult( step_id="s1", status=PlanStepStatus.COMPLETED, result={"ok": True} ), }, status=TaskStatus.COMPLETED, total_duration_ms=100.0, ) class TestPlanExecEnginePitfallInjection: async def test_pitfalls_injected_into_system_prompt(self, store): """Happy path: top-3 HIGH pitfalls injected into system prompt at planning.""" # Seed failure data for _ in range(6): await store.record_experience( _make_experience( outcome="failure", success_rate=0.0, steps_summary=[ {"step_name": "Deploy Service", "outcome": "failure", "error": "Timeout"}, ], failure_reasons=["Deploy timeout"], ) ) detector = PitfallDetector(experience_store=store, similarity_threshold=0.1) engine = PlanExecEngine(llm_gateway=None, pitfall_detector=detector) plan = _make_plan() plan_result = _make_plan_result() with ( patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)), patch("agentkit.core.plan_exec_engine.ReActStepExecutor") as MockStepExec, patch("agentkit.core.plan_exec_engine.PlanExecutor") as MockExecutor, ): mock_exec = MagicMock() mock_exec.execute = AsyncMock(return_value=plan_result) MockExecutor.return_value = mock_exec await engine.execute( messages=[{"role": "user", "content": "deploy the service"}], system_prompt="You are a deployment agent.", ) # system_prompt passed to ReActStepExecutor must contain pitfall section assert MockStepExec.call_count >= 1 sp = MockStepExec.call_args_list[0].kwargs.get("system_prompt") or "" assert "## 历史避坑提示" in sp assert "Deploy Service" in sp async def test_pitfall_detector_none_skips_injection(self): """pitfall_detector is None → skip injection, no error.""" engine = PlanExecEngine(llm_gateway=None, pitfall_detector=None) plan = _make_plan() plan_result = _make_plan_result() with ( patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)), patch("agentkit.core.plan_exec_engine.ReActStepExecutor") as MockStepExec, patch("agentkit.core.plan_exec_engine.PlanExecutor") as MockExecutor, ): mock_exec = MagicMock() mock_exec.execute = AsyncMock(return_value=plan_result) MockExecutor.return_value = mock_exec await engine.execute( messages=[{"role": "user", "content": "deploy"}], system_prompt="base prompt", ) sp = MockStepExec.call_args_list[0].kwargs.get("system_prompt") or "" assert "## 历史避坑提示" not in sp async def test_check_pitfalls_raises_skips_injection(self): """PitfallDetector.check_pitfalls raises → skip injection, continue task.""" mock_detector = MagicMock() mock_detector.check_pitfalls = AsyncMock(side_effect=RuntimeError("store down")) engine = PlanExecEngine(llm_gateway=None, pitfall_detector=mock_detector) plan = _make_plan() plan_result = _make_plan_result() with ( patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)), patch("agentkit.core.plan_exec_engine.ReActStepExecutor") as MockStepExec, patch("agentkit.core.plan_exec_engine.PlanExecutor") as MockExecutor, ): mock_exec = MagicMock() mock_exec.execute = AsyncMock(return_value=plan_result) MockExecutor.return_value = mock_exec # Should not raise result = await engine.execute( messages=[{"role": "user", "content": "deploy"}], system_prompt="base prompt", ) assert result is not None sp = MockStepExec.call_args_list[0].kwargs.get("system_prompt") or "" assert "## 历史避坑提示" not in sp async def test_no_pitfalls_in_store_no_injection(self, store): """No pitfalls in store → no injection (system_prompt unchanged).""" # Only success experiences await store.record_experience(_make_experience(outcome="success", steps_summary=[])) detector = PitfallDetector(experience_store=store) engine = PlanExecEngine(llm_gateway=None, pitfall_detector=detector) plan = _make_plan() plan_result = _make_plan_result() with ( patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)), patch("agentkit.core.plan_exec_engine.ReActStepExecutor") as MockStepExec, patch("agentkit.core.plan_exec_engine.PlanExecutor") as MockExecutor, ): mock_exec = MagicMock() mock_exec.execute = AsyncMock(return_value=plan_result) MockExecutor.return_value = mock_exec await engine.execute( messages=[{"role": "user", "content": "deploy"}], system_prompt="base prompt", ) sp = MockStepExec.call_args_list[0].kwargs.get("system_prompt") or "" assert "## 历史避坑提示" not in sp async def test_all_low_severity_no_injection(self, store): """All pitfalls low severity → none injected (HIGH gate).""" # 9 successes + 1 failure → 10% failure rate → LOW for _ in range(9): await store.record_experience( _make_experience( outcome="success", steps_summary=[{"step_name": "Deploy Service", "outcome": "success"}], ) ) await store.record_experience( _make_experience( outcome="failure", success_rate=0.0, steps_summary=[ {"step_name": "Deploy Service", "outcome": "failure", "error": "flake"}, ], ) ) detector = PitfallDetector(experience_store=store, similarity_threshold=0.1) engine = PlanExecEngine(llm_gateway=None, pitfall_detector=detector) plan = _make_plan() plan_result = _make_plan_result() with ( patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)), patch("agentkit.core.plan_exec_engine.ReActStepExecutor") as MockStepExec, patch("agentkit.core.plan_exec_engine.PlanExecutor") as MockExecutor, ): mock_exec = MagicMock() mock_exec.execute = AsyncMock(return_value=plan_result) MockExecutor.return_value = mock_exec await engine.execute( messages=[{"role": "user", "content": "deploy"}], system_prompt="base prompt", ) sp = MockStepExec.call_args_list[0].kwargs.get("system_prompt") or "" assert "## 历史避坑提示" not in sp def test_constructor_injection_verified(self): """KTD-5: PitfallDetector app-state singleton via constructor injection.""" detector = PitfallDetector(experience_store=InMemoryExperienceStore()) engine = PlanExecEngine(llm_gateway=None, pitfall_detector=detector) assert engine._pitfall_detector is detector def test_constructor_default_none(self): """Default pitfall_detector is None (no injection).""" engine = PlanExecEngine(llm_gateway=None) assert engine._pitfall_detector is None # ── Backward compatibility ───────────────────────────────────────── class TestBackwardCompatibility: async def test_old_call_form_still_works(self, detector, store): """Old call form check_pitfalls(task_type=..., planned_steps=..., actor=...) works.""" for _ in range(6): await store.record_experience( _make_experience( outcome="failure", success_rate=0.0, steps_summary=[ {"step_name": "Deploy Service", "outcome": "failure", "error": "Timeout"}, ], ) ) # Old form: no goal, no top_k warnings = await detector.check_pitfalls( task_type="deployment", planned_steps=[_make_step(name="Deploy Service")], actor="test_agent", ) assert len(warnings) == 1 assert warnings[0].actor == "test_agent" async def test_evolution_dashboard_importable(self): """evolution_dashboard.py caller still works (module imports without error).""" # Importing the module verifies the call site signature is still valid # (check_pitfalls is called with task_type + planned_steps kwargs). import agentkit.server.routes.evolution_dashboard # noqa: F401 async def test_existing_pitfall_detector_tests_compat(self, detector, store): """Existing test pattern (from test_evolution_auto_trigger) still works.""" await store.record_experience( _make_experience( task_type="testing", goal="Run tests", outcome="failure", success_rate=0.0, steps_summary=[ {"step_name": "Test Step", "outcome": "failure", "error": "assertion"}, ], ) ) steps = [ PlanStep( step_id="s1", name="Test Step", description="Run tests", status=PlanStepStatus.PENDING, ) ] warnings = await detector.check_pitfalls( task_type="testing", planned_steps=steps, actor="test_agent" ) assert len(warnings) == 1