From a7633960114940c03e3ed0784ca33e21bc48ef4c Mon Sep 17 00:00:00 2001 From: chiguyong Date: Fri, 3 Jul 2026 14:27:48 +0800 Subject: [PATCH] feat(evolution): pitfall retrieval/injection at planning phase (U7, R12) --- src/agentkit/core/plan_exec_engine.py | 92 +++ src/agentkit/core/react.py | 21 + src/agentkit/evolution/pitfall_detector.py | 69 ++- tests/unit/test_pitfall_injection.py | 648 +++++++++++++++++++++ 4 files changed, 824 insertions(+), 6 deletions(-) create mode 100644 tests/unit/test_pitfall_injection.py diff --git a/src/agentkit/core/plan_exec_engine.py b/src/agentkit/core/plan_exec_engine.py index a3c37b1..d15aff6 100644 --- a/src/agentkit/core/plan_exec_engine.py +++ b/src/agentkit/core/plan_exec_engine.py @@ -32,6 +32,7 @@ from agentkit.orchestrator.pipeline_schema import Pipeline, PipelineResult, Stag if TYPE_CHECKING: from agentkit.core.compressor import CompressionStrategy from agentkit.core.trace import TraceRecorder + from agentkit.evolution.pitfall_detector import PitfallDetector from agentkit.memory.retriever import MemoryRetriever from agentkit.llm.gateway import LLMGateway from agentkit.tools.base import Tool @@ -90,6 +91,11 @@ class PlanExecEngine: # (think=7, verify=2, reflect=1). Threaded through to each step's # ReActEngine. phase_budgets: dict[str, int] | None = None, + # U7/R12: PitfallDetector app-state singleton (KTD-5). Threaded + # through from app startup; used at planning phase to retrieve + # historical pitfalls by goal/skill similarity and inject into + # system prompt. None = skip injection (no error). + pitfall_detector: "PitfallDetector | None" = None, ): """ Args: @@ -107,6 +113,9 @@ class PlanExecEngine: own defaults (pytest -x -q, ruff check src/). phase_budgets: U4/R11 — per-phase step quotas. None = use _DEFAULT_PHASE_BUDGETS (think=7, verify=2, reflect=1). + pitfall_detector: U7/R12 — PitfallDetector 单例(KTD-5)。 + 规划阶段按 goal/skill 相似度检索历史 pitfall 并注入 system + prompt。None 表示跳过注入(不报错)。 """ self._llm_gateway = llm_gateway self._max_replans = max_replans @@ -117,6 +126,8 @@ class PlanExecEngine: self._confirmation_handler: Any | None = None self._verification_enabled = verification_enabled self._verification_commands = verification_commands + # U7/R12: app-state singleton (KTD-5) — constructor injection. + self._pitfall_detector = pitfall_detector # U4/R11: copy the default to avoid mutating the module-level dict. self._phase_budgets = ( dict(phase_budgets) if phase_budgets is not None else dict(_DEFAULT_PHASE_BUDGETS) @@ -305,6 +316,17 @@ class PlanExecEngine: ) ) + # U7/R12: retrieve historical pitfalls by goal/skill similarity + # and inject into system prompt before step execution. Only HIGH + # severity warnings are injected (gate by HIGH); top-3 by severity. + system_prompt = await self._inject_pitfall_warnings( + goal=goal, + plan_steps=plan.steps, + task_type=task_type, + actor=agent_name, + system_prompt=system_prompt, + ) + # Persist plan as Spec if spec_manager is provided if self._spec_manager is not None: spec = self._plan_to_spec(plan) @@ -503,6 +525,66 @@ class PlanExecEngine: # 内部实现 # ------------------------------------------------------------------ + async def _inject_pitfall_warnings( + self, + goal: str, + plan_steps: list[Any], + task_type: str, + actor: str, + system_prompt: str | None, + ) -> str | None: + """U7/R12: 检索历史 pitfall 并注入 system prompt + + 在规划阶段按 goal/skill 相似度检索历史 pitfall,将 HIGH 级别预警 + 注入 system prompt 的 "## 历史避坑提示" section。 + + KTD-5: pitfall_detector 通过构造函数注入(app-state singleton)。 + 若 pitfall_detector 为 None 则跳过注入(不报错)。 + + Args: + goal: 任务目标文本(用于语义相似度检索) + plan_steps: 计划步骤列表(PlanStep 对象) + task_type: 任务类型 + actor: agent/expert 标识(R6 actor marking) + system_prompt: 当前 system prompt + + Returns: + 注入避坑提示后的 system prompt(或原值,若无需注入) + """ + if self._pitfall_detector is None: + return system_prompt + + try: + warnings = await self._pitfall_detector.check_pitfalls( + task_type=task_type, + planned_steps=plan_steps, + actor=actor, + goal=goal, + top_k=3, + ) + except (RuntimeError, ValueError, KeyError, AttributeError) as e: + # ponytail: broad catch mirrors existing _search_experiences + # error handling — pitfall retrieval must never fail the task. + logger.warning(f"Pitfall retrieval failed, skipping injection: {e}") + return system_prompt + + if not warnings: + return system_prompt + + from agentkit.evolution.pitfall_detector import build_pitfall_warning_section + + section = build_pitfall_warning_section(warnings) + if not section: + return system_prompt + + logger.info( + f"U7/R12: injecting {sum(1 for w in warnings if w.warning_level.value == 'high')} " + f"HIGH pitfall warnings into system prompt for goal={goal[:50]}" + ) + if system_prompt: + return f"{system_prompt}\n\n{section}" + return section + async def _execute_loop( self, messages: list[dict[str, str]], @@ -595,6 +677,16 @@ class PlanExecEngine: ) ) + # U7/R12: retrieve historical pitfalls by goal/skill similarity + # and inject into system prompt before step execution. + system_prompt = await self._inject_pitfall_warnings( + goal=goal, + plan_steps=plan.steps, + task_type=task_type, + actor=agent_name, + system_prompt=system_prompt, + ) + # Persist plan as Spec if spec_manager is provided if self._spec_manager is not None: spec = self._plan_to_spec(plan) diff --git a/src/agentkit/core/react.py b/src/agentkit/core/react.py index 47ff13e..9b03aca 100644 --- a/src/agentkit/core/react.py +++ b/src/agentkit/core/react.py @@ -39,6 +39,7 @@ if TYPE_CHECKING: from agentkit.core.phase import PhasePolicy, PhaseState from agentkit.core.sandbox import WorkspaceSandbox from agentkit.core.trace import TraceRecorder + from agentkit.evolution.pitfall_detector import PitfallWarning from agentkit.memory.retriever import MemoryRetriever logger = logging.getLogger(__name__) @@ -579,6 +580,7 @@ class ReActEngine: cancellation_token: CancellationToken | None = None, timeout_seconds: float | None = None, confirmation_handler: Callable[..., Awaitable[object]] | None = None, + pitfall_warnings: "list[PitfallWarning] | None" = None, ) -> ReActResult: """执行 ReAct 循环 @@ -633,6 +635,7 @@ class ReActEngine: confirmation_handler=confirmation_handler, stream=False, effective_timeout=effective_timeout, + pitfall_warnings=pitfall_warnings, ) try: @@ -672,6 +675,7 @@ class ReActEngine: confirmation_handler=confirmation_handler, stream=False, effective_timeout=effective_timeout, + pitfall_warnings=pitfall_warnings, ), timeout=effective_timeout, ) @@ -692,6 +696,7 @@ class ReActEngine: confirmation_handler=confirmation_handler, stream=False, effective_timeout=effective_timeout, + pitfall_warnings=pitfall_warnings, ) except asyncio.TimeoutError: raise TaskTimeoutError( @@ -738,6 +743,7 @@ class ReActEngine: confirmation_handler: Callable[..., Awaitable[object]] | None = None, stream: bool = False, effective_timeout: float = 0.0, + pitfall_warnings: "list[PitfallWarning] | None" = None, ) -> AsyncGenerator[ReActEvent, None]: """Unified ReAct loop — async generator yielding ReActEvent objects. @@ -779,6 +785,18 @@ class ReActEngine: elif tools and system_prompt is None: system_prompt = self._build_tool_use_prompt(tools) + # U7/R12: inject HIGH-severity pitfall warnings into system prompt. + # Only HIGH warnings are injected (gate by HIGH) to avoid noise; + # empty list or None is a no-op. + if pitfall_warnings: + from agentkit.evolution.pitfall_detector import build_pitfall_warning_section + + pitfall_section = build_pitfall_warning_section(pitfall_warnings) + if pitfall_section: + system_prompt = ( + f"{system_prompt}\n\n{pitfall_section}" if system_prompt else pitfall_section + ) + # Telemetry: record agent request agent_request_counter().add( 1, {"agent.name": agent_name, "agent.type": task_type or "react"} @@ -1773,6 +1791,7 @@ class ReActEngine: cancellation_token: CancellationToken | None = None, timeout_seconds: float | None = None, confirmation_handler: Callable[..., Awaitable[object]] | None = None, + pitfall_warnings: "list[PitfallWarning] | None" = None, ) -> AsyncGenerator[ReActEvent, None]: """Execute ReAct loop, yielding ReActEvent objects. @@ -1784,6 +1803,7 @@ class ReActEngine: Args: compressor: 压缩策略,None 时使用实例默认压缩器 timeout_seconds: 超时秒数,0 表示无超时,None 使用 default_timeout + pitfall_warnings: U7/R12 — HIGH 级别避坑预警,注入 system prompt """ effective_compressor = compressor if compressor is not None else self._compressor effective_timeout = ( @@ -1807,6 +1827,7 @@ class ReActEngine: confirmation_handler=confirmation_handler, stream=True, effective_timeout=effective_timeout, + pitfall_warnings=pitfall_warnings, ): yield event diff --git a/src/agentkit/evolution/pitfall_detector.py b/src/agentkit/evolution/pitfall_detector.py index e95610e..a6f56ea 100644 --- a/src/agentkit/evolution/pitfall_detector.py +++ b/src/agentkit/evolution/pitfall_detector.py @@ -117,6 +117,9 @@ class PitfallDetector: task_type: str, planned_steps: list[Any], actor: str = "", + *, + goal: str = "", + top_k: int | None = None, ) -> list[PitfallWarning]: """检查计划步骤中的潜在陷阱 @@ -124,6 +127,11 @@ class PitfallDetector: task_type: 任务类型 planned_steps: 计划步骤列表(PlanStep 对象或具有 name/description 属性的对象) actor: 产生此检测请求的 agent/expert 标识(R6 actor marking) + goal: U7/R12 — 任务目标文本,用于语义相似度检索历史 pitfall。 + 提供时以 goal 作为检索 query(仍按 task_type 过滤); + 为空时回退到 task_type 作为 query(向后兼容)。 + top_k: U7/R12 — 限制返回的预警数量(按严重程度排序后取前 top_k)。 + None 表示不限制(向后兼容)。 Returns: 按严重程度排序的预警列表(HIGH → MEDIUM → LOW) @@ -131,10 +139,16 @@ class PitfallDetector: if not planned_steps: return [] + # U7/R12: 当提供 goal 时,使用 goal 作为语义检索 query(更精准的 + # goal 相似度匹配);否则回退到 task_type(向后兼容)。 + # ponytail: Jaccard similarity on tokenized goal — upgrade path: + # embedding-based retrieval if precision matters. + query = goal if goal else task_type + # 1. 检索同类任务的所有经验(包含成功和失败,用于计算步骤级失败率) - all_experiences = await self._search_experiences(task_type) + all_experiences = await self._search_experiences(query, task_type) if not all_experiences: - logger.debug(f"No experiences found for task_type={task_type}") + logger.debug(f"No experiences found for task_type={task_type} goal={goal[:50]}") return [] # 2. 从经验中提取步骤级别的失败统计 @@ -146,6 +160,10 @@ class PitfallDetector: # 4. 按严重程度排序(HIGH → MEDIUM → LOW),同级别按失败率降序 warnings.sort(key=lambda w: (_warning_level_order(w.warning_level), -w.failure_rate)) + # U7/R12: 限制返回数量(top_k),仅保留最高严重度的 top_k 条 + if top_k is not None and top_k > 0: + warnings = warnings[:top_k] + if warnings: logger.info( f"PitfallDetector found {len(warnings)} warnings for task_type={task_type}: " @@ -156,13 +174,21 @@ class PitfallDetector: return warnings - async def _search_experiences(self, task_type: str) -> list[Any]: - """检索指定任务类型的所有经验(包含成功和失败)""" + async def _search_experiences(self, query: str, task_type: str = "") -> list[Any]: + """检索指定任务类型的所有经验(包含成功和失败) + + Args: + query: 语义检索 query(U7: 优先使用 goal 文本) + task_type: 任务类型过滤;空字符串表示不过滤 + """ + if self._store is None: + logger.warning("PitfallDetector experience_store is None, skipping search") + return [] try: results = await self._store.search( - query=task_type, + query=query, top_k=self._max_search_results, - task_type=task_type, + task_type=task_type or None, ) return results except (RuntimeError, ValueError, KeyError) as e: @@ -472,3 +498,34 @@ def _build_suggestion(stats: _StepFailureStats, failure_rate: float) -> str: parts.append(f"建议:{tips_str}") return "。".join(parts) + + +# U7/R12: 历史避坑提示 section 构建(仅 HIGH 级别注入 prompt 上下文) +_PITFALL_SECTION_HEADER = "## 历史避坑提示" + + +def build_pitfall_warning_section(warnings: list[PitfallWarning]) -> str: + """构建历史避坑提示 section,仅包含 HIGH 级别预警(U7/R12) + + 根据 plan "gate by HIGH" 要求,只有 HIGH 级别预警注入 prompt 上下文, + MEDIUM/LOW 不注入(避免噪声)。 + + Args: + warnings: 预警列表(将过滤仅保留 HIGH 级别) + + Returns: + 格式化的 "## 历史避坑提示" section 字符串;无 HIGH 预警时返回空字符串 + """ + high_warnings = [w for w in warnings if w.warning_level == WarningLevel.HIGH] + if not high_warnings: + return "" + + lines = [_PITFALL_SECTION_HEADER] + for w in high_warnings: + lines.append(f"- 步骤「{w.step_name}」: 历史失败率 {w.failure_rate:.0%}") + if w.historical_failures: + reasons = "、".join(w.historical_failures[:3]) + lines.append(f" 常见失败原因: {reasons}") + if w.suggestion: + lines.append(f" 建议: {w.suggestion}") + return "\n".join(lines) diff --git a/tests/unit/test_pitfall_injection.py b/tests/unit/test_pitfall_injection.py new file mode 100644 index 0000000..b677993 --- /dev/null +++ b/tests/unit/test_pitfall_injection.py @@ -0,0 +1,648 @@ +"""Tests for U7: pitfall retrieval/injection at planning phase (R12). + +Covers: +- PitfallDetector.check_pitfalls with goal param (semantic similarity retrieval) +- build_pitfall_warning_section helper (HIGH gate) +- ReActEngine pitfall_warnings param injection into system prompt +- PlanExecEngine pitfall_detector integration at planning phase +- Backward compatibility with existing callers (evolution_dashboard) +- Error/failure paths: None store, search raises, detector None on engine +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from agentkit.core.plan_exec_engine import PlanExecEngine +from agentkit.core.plan_schema import PlanStep, PlanStepStatus +from agentkit.core.react import ReActEngine +from agentkit.evolution.experience_schema import TaskExperience +from agentkit.evolution.experience_store import InMemoryExperienceStore +from agentkit.evolution.pitfall_detector import ( + PitfallDetector, + PitfallWarning, + WarningLevel, + build_pitfall_warning_section, +) +from agentkit.llm.gateway import LLMGateway +from agentkit.llm.protocol import LLMResponse, TokenUsage + + +# ── Helpers ────────────────────────────────────────────── + + +def _make_experience( + task_type: str = "deployment", + goal: str = "Deploy the service", + outcome: str = "success", + steps_summary: str | list[dict] = "", + failure_reasons: list[str] | None = None, + optimization_tips: list[str] | None = None, + success_rate: float = 1.0, +) -> TaskExperience: + return TaskExperience( + experience_id="", + task_type=task_type, + goal=goal, + steps_summary=steps_summary, + outcome=outcome, + duration_seconds=10.0, + success_rate=success_rate, + failure_reasons=failure_reasons or [], + optimization_tips=optimization_tips or [], + created_at=datetime.now(timezone.utc), + ) + + +def _make_step( + name: str = "step", + description: str = "do something", + step_id: str = "s1", +) -> PlanStep: + return PlanStep( + step_id=step_id, + name=name, + description=description, + status=PlanStepStatus.PENDING, + ) + + +def _make_warning( + step_name: str = "Deploy Service", + level: WarningLevel = WarningLevel.HIGH, + failure_rate: float = 0.8, +) -> PitfallWarning: + return PitfallWarning( + step_name=step_name, + warning_level=level, + failure_rate=failure_rate, + historical_failures=["Timeout", "Connection refused"], + suggestion="Increase timeout and add retry", + confidence=0.9, + actor="test_agent", + ) + + +def _make_response(content: str = "Done") -> LLMResponse: + return LLMResponse( + content=content, + model="test-model", + usage=TokenUsage(prompt_tokens=10, completion_tokens=20), + tool_calls=[], + ) + + +def _make_mock_gateway(responses: list[LLMResponse] | None = None) -> MagicMock: + gateway = MagicMock(spec=LLMGateway) + if responses is not None: + gateway.chat = AsyncMock(side_effect=responses) + else: + gateway.chat = AsyncMock(return_value=_make_response()) + return gateway + + +@pytest.fixture +def store(): + return InMemoryExperienceStore(decay_rate=0.01, alpha=0.7) + + +@pytest.fixture +def detector(store): + return PitfallDetector(experience_store=store, similarity_threshold=0.3) + + +# ── build_pitfall_warning_section (HIGH gate) ────────────────────── + + +class TestBuildPitfallWarningSection: + def test_high_warnings_produce_section(self): + section = build_pitfall_warning_section([_make_warning(step_name="Deploy Service")]) + assert "## 历史避坑提示" in section + assert "Deploy Service" in section + + def test_only_high_warnings_injected(self): + """Gate by HIGH: MEDIUM/LOW filtered out.""" + warnings = [ + _make_warning(step_name="High Step", level=WarningLevel.HIGH), + _make_warning(step_name="Medium Step", level=WarningLevel.MEDIUM), + _make_warning(step_name="Low Step", level=WarningLevel.LOW), + ] + section = build_pitfall_warning_section(warnings) + assert "High Step" in section + assert "Medium Step" not in section + assert "Low Step" not in section + + def test_empty_list_returns_empty(self): + assert build_pitfall_warning_section([]) == "" + + def test_no_high_returns_empty(self): + warnings = [_make_warning(level=WarningLevel.MEDIUM)] + assert build_pitfall_warning_section(warnings) == "" + + def test_includes_failure_reasons_and_suggestion(self): + section = build_pitfall_warning_section([_make_warning()]) + assert "Timeout" in section + assert "Increase timeout" in section + + +# ── PitfallDetector.check_pitfalls with goal param ───────────────── + + +class TestCheckPitfallsGoalRetrieval: + async def test_goal_retrieves_similar_pitfalls(self, detector, store): + """Happy path: goal text retrieves similar historical failures.""" + for _ in range(6): + await store.record_experience( + _make_experience( + outcome="failure", + success_rate=0.0, + steps_summary=[ + {"step_name": "Deploy Service", "outcome": "failure", "error": "Timeout"}, + ], + failure_reasons=["Deploy timeout"], + ) + ) + steps = [_make_step(name="Deploy Service", description="Deploy the service")] + warnings = await detector.check_pitfalls( + task_type="deployment", + planned_steps=steps, + goal="deploy the service to production", + top_k=3, + ) + assert len(warnings) == 1 + assert warnings[0].warning_level == WarningLevel.HIGH + assert warnings[0].step_name == "Deploy Service" + + async def test_goal_without_task_type_retrieves(self, store): + """Goal text provided but no task_type → still retrieves by goal similarity.""" + await store.record_experience( + _make_experience( + task_type="ops", + outcome="failure", + success_rate=0.0, + steps_summary=[ + {"step_name": "Call API Gateway", "outcome": "failure", "error": "Timeout"}, + ], + ) + ) + detector = PitfallDetector(experience_store=store, similarity_threshold=0.1) + steps = [_make_step(name="Call API Gateway")] + warnings = await detector.check_pitfalls( + task_type="", + planned_steps=steps, + goal="call api gateway endpoint", + ) + assert len(warnings) >= 1 + + async def test_empty_planned_steps_returns_empty(self, detector): + warnings = await detector.check_pitfalls( + task_type="deployment", + planned_steps=[], + goal="deploy", + ) + assert warnings == [] + + async def test_no_pitfalls_in_store_returns_empty(self, detector, store): + await store.record_experience(_make_experience(outcome="success", steps_summary=[])) + warnings = await detector.check_pitfalls( + task_type="deployment", + planned_steps=[_make_step(name="Deploy Service")], + goal="deploy", + ) + assert warnings == [] + + async def test_all_low_severity_returns_warnings_but_no_high(self, detector, store): + """All pitfalls low severity → warnings returned but HIGH gate filters injection.""" + # Only 1 failure out of 10 → low failure rate → LOW warning + for _ in range(9): + await store.record_experience( + _make_experience( + outcome="success", + steps_summary=[ + {"step_name": "Deploy Service", "outcome": "success"}, + ], + ) + ) + await store.record_experience( + _make_experience( + outcome="failure", + success_rate=0.0, + steps_summary=[ + {"step_name": "Deploy Service", "outcome": "failure", "error": "flake"}, + ], + ) + ) + steps = [_make_step(name="Deploy Service")] + warnings = await detector.check_pitfalls( + task_type="deployment", + planned_steps=steps, + goal="deploy", + ) + # Warnings exist but none are HIGH + assert len(warnings) >= 1 + assert all(w.warning_level != WarningLevel.HIGH for w in warnings) + # Section builder should return empty (HIGH gate) + assert build_pitfall_warning_section(warnings) == "" + + async def test_top_k_limits_results(self): + """100+ entries → only top-3 by similarity retrieved; search called once.""" + mock_store = MagicMock() + # 120 experiences all with the same failing step + experiences = [ + _make_experience( + outcome="failure", + success_rate=0.0, + steps_summary=[ + {"step_name": f"Step_{i}", "outcome": "failure", "error": f"err_{i}"}, + ], + ) + for i in range(120) + ] + mock_store.search = AsyncMock(return_value=experiences) + detector = PitfallDetector(experience_store=mock_store, similarity_threshold=0.01) + + # 5 planned steps matching different historical steps + steps = [_make_step(name=f"Step_{i}", step_id=f"s{i}") for i in range(5)] + warnings = await detector.check_pitfalls( + task_type="deployment", + planned_steps=steps, + goal="deploy", + top_k=3, + ) + # search called exactly once (no N+1 per step) + assert mock_store.search.call_count == 1 + # top_k limits final warnings to 3 + assert len(warnings) <= 3 + + +# ── Error and failure paths (PitfallDetector) ────────────────────── + + +class TestPitfallDetectorErrorPaths: + async def test_store_none_skips_search(self): + """experience_store unavailable (None) → skip, no exception.""" + detector = PitfallDetector(experience_store=None) + warnings = await detector.check_pitfalls( + task_type="deployment", + planned_steps=[_make_step(name="Deploy")], + goal="deploy", + ) + assert warnings == [] + + async def test_store_search_raises_returns_empty(self): + """experience_store.search() raises → skip injection, continue.""" + mock_store = MagicMock() + mock_store.search = AsyncMock(side_effect=RuntimeError("DB connection lost")) + detector = PitfallDetector(experience_store=mock_store) + warnings = await detector.check_pitfalls( + task_type="deployment", + planned_steps=[_make_step(name="Deploy")], + goal="deploy", + ) + assert warnings == [] + + async def test_store_search_value_error_returns_empty(self): + mock_store = MagicMock() + mock_store.search = AsyncMock(side_effect=ValueError("bad query")) + detector = PitfallDetector(experience_store=mock_store) + warnings = await detector.check_pitfalls( + task_type="deployment", + planned_steps=[_make_step(name="Deploy")], + ) + assert warnings == [] + + +# ── ReActEngine pitfall_warnings injection ───────────────────────── + + +class TestReactEnginePitfallInjection: + async def test_high_warnings_injected_into_system_prompt(self): + """pitfall_warnings param injects HIGH section into system prompt.""" + gateway = _make_mock_gateway([_make_response(content="Done")]) + engine = ReActEngine(llm_gateway=gateway, max_steps=3) + + warning = _make_warning(step_name="Deploy Service", failure_rate=0.9) + await engine.execute( + messages=[{"role": "user", "content": "deploy the service"}], + system_prompt="You are a helpful assistant.", + pitfall_warnings=[warning], + ) + + call_kwargs = gateway.chat.call_args.kwargs + system_content = str(call_kwargs["messages"][0]["content"]) + assert "## 历史避坑提示" in system_content + assert "Deploy Service" in system_content + + async def test_no_warnings_no_injection(self): + """Empty list or None = no-op (system_prompt unchanged).""" + gateway = _make_mock_gateway([_make_response(content="Done")]) + engine = ReActEngine(llm_gateway=gateway, max_steps=3) + + base_prompt = "You are a helpful assistant." + await engine.execute( + messages=[{"role": "user", "content": "hi"}], + system_prompt=base_prompt, + pitfall_warnings=None, + ) + system_content = str(gateway.chat.call_args.kwargs["messages"][0]["content"]) + assert "## 历史避坑提示" not in system_content + + async def test_low_severity_not_injected(self): + """Only HIGH severity injected; MEDIUM/LOW filtered out.""" + gateway = _make_mock_gateway([_make_response(content="Done")]) + engine = ReActEngine(llm_gateway=gateway, max_steps=3) + + warnings = [ + _make_warning(step_name="Medium Step", level=WarningLevel.MEDIUM), + _make_warning(step_name="Low Step", level=WarningLevel.LOW), + ] + await engine.execute( + messages=[{"role": "user", "content": "hi"}], + system_prompt="base prompt", + pitfall_warnings=warnings, + ) + system_content = str(gateway.chat.call_args.kwargs["messages"][0]["content"]) + assert "## 历史避坑提示" not in system_content + assert "Medium Step" not in system_content + + async def test_empty_list_no_injection(self): + gateway = _make_mock_gateway([_make_response(content="Done")]) + engine = ReActEngine(llm_gateway=gateway, max_steps=3) + + await engine.execute( + messages=[{"role": "user", "content": "hi"}], + system_prompt="base prompt", + pitfall_warnings=[], + ) + system_content = str(gateway.chat.call_args.kwargs["messages"][0]["content"]) + assert "## 历史避坑提示" not in system_content + + +# ── PlanExecEngine pitfall_detector integration ──────────────────── + + +def _make_plan( + goal: str = "deploy the service", + steps: list[PlanStep] | None = None, +): + if steps is None: + steps = [ + PlanStep(step_id="s0", name="Deploy Service", description="Deploy the service"), + PlanStep(step_id="s1", name="Verify Deployment", description="Check health"), + ] + from agentkit.core.plan_schema import ExecutionPlan + + return ExecutionPlan(goal=goal, steps=steps, parallel_groups=[["s0"], ["s1"]]) + + +def _make_plan_result(): + from agentkit.core.plan_executor import PlanExecutionResult, StepExecutionResult + from agentkit.core.protocol import TaskStatus + + return PlanExecutionResult( + plan_id="test-plan", + step_results={ + "s0": StepExecutionResult( + step_id="s0", status=PlanStepStatus.COMPLETED, result={"ok": True} + ), + "s1": StepExecutionResult( + step_id="s1", status=PlanStepStatus.COMPLETED, result={"ok": True} + ), + }, + status=TaskStatus.COMPLETED, + total_duration_ms=100.0, + ) + + +class TestPlanExecEnginePitfallInjection: + async def test_pitfalls_injected_into_system_prompt(self, store): + """Happy path: top-3 HIGH pitfalls injected into system prompt at planning.""" + # Seed failure data + for _ in range(6): + await store.record_experience( + _make_experience( + outcome="failure", + success_rate=0.0, + steps_summary=[ + {"step_name": "Deploy Service", "outcome": "failure", "error": "Timeout"}, + ], + failure_reasons=["Deploy timeout"], + ) + ) + detector = PitfallDetector(experience_store=store, similarity_threshold=0.1) + engine = PlanExecEngine(llm_gateway=None, pitfall_detector=detector) + + plan = _make_plan() + plan_result = _make_plan_result() + + with ( + patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)), + patch("agentkit.core.plan_exec_engine.ReActStepExecutor") as MockStepExec, + patch("agentkit.core.plan_exec_engine.PlanExecutor") as MockExecutor, + ): + mock_exec = MagicMock() + mock_exec.execute = AsyncMock(return_value=plan_result) + MockExecutor.return_value = mock_exec + + await engine.execute( + messages=[{"role": "user", "content": "deploy the service"}], + system_prompt="You are a deployment agent.", + ) + + # system_prompt passed to ReActStepExecutor must contain pitfall section + assert MockStepExec.call_count >= 1 + sp = MockStepExec.call_args_list[0].kwargs.get("system_prompt") or "" + assert "## 历史避坑提示" in sp + assert "Deploy Service" in sp + + async def test_pitfall_detector_none_skips_injection(self): + """pitfall_detector is None → skip injection, no error.""" + engine = PlanExecEngine(llm_gateway=None, pitfall_detector=None) + plan = _make_plan() + plan_result = _make_plan_result() + + with ( + patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)), + patch("agentkit.core.plan_exec_engine.ReActStepExecutor") as MockStepExec, + patch("agentkit.core.plan_exec_engine.PlanExecutor") as MockExecutor, + ): + mock_exec = MagicMock() + mock_exec.execute = AsyncMock(return_value=plan_result) + MockExecutor.return_value = mock_exec + + await engine.execute( + messages=[{"role": "user", "content": "deploy"}], + system_prompt="base prompt", + ) + + sp = MockStepExec.call_args_list[0].kwargs.get("system_prompt") or "" + assert "## 历史避坑提示" not in sp + + async def test_check_pitfalls_raises_skips_injection(self): + """PitfallDetector.check_pitfalls raises → skip injection, continue task.""" + mock_detector = MagicMock() + mock_detector.check_pitfalls = AsyncMock(side_effect=RuntimeError("store down")) + engine = PlanExecEngine(llm_gateway=None, pitfall_detector=mock_detector) + + plan = _make_plan() + plan_result = _make_plan_result() + + with ( + patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)), + patch("agentkit.core.plan_exec_engine.ReActStepExecutor") as MockStepExec, + patch("agentkit.core.plan_exec_engine.PlanExecutor") as MockExecutor, + ): + mock_exec = MagicMock() + mock_exec.execute = AsyncMock(return_value=plan_result) + MockExecutor.return_value = mock_exec + + # Should not raise + result = await engine.execute( + messages=[{"role": "user", "content": "deploy"}], + system_prompt="base prompt", + ) + assert result is not None + + sp = MockStepExec.call_args_list[0].kwargs.get("system_prompt") or "" + assert "## 历史避坑提示" not in sp + + async def test_no_pitfalls_in_store_no_injection(self, store): + """No pitfalls in store → no injection (system_prompt unchanged).""" + # Only success experiences + await store.record_experience(_make_experience(outcome="success", steps_summary=[])) + detector = PitfallDetector(experience_store=store) + engine = PlanExecEngine(llm_gateway=None, pitfall_detector=detector) + + plan = _make_plan() + plan_result = _make_plan_result() + + with ( + patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)), + patch("agentkit.core.plan_exec_engine.ReActStepExecutor") as MockStepExec, + patch("agentkit.core.plan_exec_engine.PlanExecutor") as MockExecutor, + ): + mock_exec = MagicMock() + mock_exec.execute = AsyncMock(return_value=plan_result) + MockExecutor.return_value = mock_exec + + await engine.execute( + messages=[{"role": "user", "content": "deploy"}], + system_prompt="base prompt", + ) + + sp = MockStepExec.call_args_list[0].kwargs.get("system_prompt") or "" + assert "## 历史避坑提示" not in sp + + async def test_all_low_severity_no_injection(self, store): + """All pitfalls low severity → none injected (HIGH gate).""" + # 9 successes + 1 failure → 10% failure rate → LOW + for _ in range(9): + await store.record_experience( + _make_experience( + outcome="success", + steps_summary=[{"step_name": "Deploy Service", "outcome": "success"}], + ) + ) + await store.record_experience( + _make_experience( + outcome="failure", + success_rate=0.0, + steps_summary=[ + {"step_name": "Deploy Service", "outcome": "failure", "error": "flake"}, + ], + ) + ) + detector = PitfallDetector(experience_store=store, similarity_threshold=0.1) + engine = PlanExecEngine(llm_gateway=None, pitfall_detector=detector) + + plan = _make_plan() + plan_result = _make_plan_result() + + with ( + patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)), + patch("agentkit.core.plan_exec_engine.ReActStepExecutor") as MockStepExec, + patch("agentkit.core.plan_exec_engine.PlanExecutor") as MockExecutor, + ): + mock_exec = MagicMock() + mock_exec.execute = AsyncMock(return_value=plan_result) + MockExecutor.return_value = mock_exec + + await engine.execute( + messages=[{"role": "user", "content": "deploy"}], + system_prompt="base prompt", + ) + + sp = MockStepExec.call_args_list[0].kwargs.get("system_prompt") or "" + assert "## 历史避坑提示" not in sp + + def test_constructor_injection_verified(self): + """KTD-5: PitfallDetector app-state singleton via constructor injection.""" + detector = PitfallDetector(experience_store=InMemoryExperienceStore()) + engine = PlanExecEngine(llm_gateway=None, pitfall_detector=detector) + assert engine._pitfall_detector is detector + + def test_constructor_default_none(self): + """Default pitfall_detector is None (no injection).""" + engine = PlanExecEngine(llm_gateway=None) + assert engine._pitfall_detector is None + + +# ── Backward compatibility ───────────────────────────────────────── + + +class TestBackwardCompatibility: + async def test_old_call_form_still_works(self, detector, store): + """Old call form check_pitfalls(task_type=..., planned_steps=..., actor=...) works.""" + for _ in range(6): + await store.record_experience( + _make_experience( + outcome="failure", + success_rate=0.0, + steps_summary=[ + {"step_name": "Deploy Service", "outcome": "failure", "error": "Timeout"}, + ], + ) + ) + # Old form: no goal, no top_k + warnings = await detector.check_pitfalls( + task_type="deployment", + planned_steps=[_make_step(name="Deploy Service")], + actor="test_agent", + ) + assert len(warnings) == 1 + assert warnings[0].actor == "test_agent" + + async def test_evolution_dashboard_importable(self): + """evolution_dashboard.py caller still works (module imports without error).""" + # Importing the module verifies the call site signature is still valid + # (check_pitfalls is called with task_type + planned_steps kwargs). + import agentkit.server.routes.evolution_dashboard # noqa: F401 + + async def test_existing_pitfall_detector_tests_compat(self, detector, store): + """Existing test pattern (from test_evolution_auto_trigger) still works.""" + await store.record_experience( + _make_experience( + task_type="testing", + goal="Run tests", + outcome="failure", + success_rate=0.0, + steps_summary=[ + {"step_name": "Test Step", "outcome": "failure", "error": "assertion"}, + ], + ) + ) + steps = [ + PlanStep( + step_id="s1", + name="Test Step", + description="Run tests", + status=PlanStepStatus.PENDING, + ) + ] + warnings = await detector.check_pitfalls( + task_type="testing", planned_steps=steps, actor="test_agent" + ) + assert len(warnings) == 1