From 0a8f6eebef5004c3f58d014c58d178896b8622c9 Mon Sep 17 00:00:00 2001 From: chiguyong Date: Tue, 30 Jun 2026 11:36:02 +0800 Subject: [PATCH] feat(U5): E2E integration test for PLAN_EXEC lifecycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add tests/integration/test_plan_exec_e2e.py covering the full PLAN_EXEC path through a scripted LLM mock (deterministic, no real API call). Mock boundary: LLMGateway.chat_stream yields scripted StreamChunk objects. Real ReActEngine, real PhasePolicy (default_policy()), real AdvancePhaseTool, real chat._handle_chat_message WS handler. Test scenarios (7 tests, all passing): - Happy path: PLANNING (search) → advance_phase → BUILDING (write_file) → advance_phase → VERIFICATION (shell ls tests/unit/) → advance_phase → DELIVERY (final answer). Asserts final_answer, tool dispatch counts, no phase_violation events, engine ends at DELIVERY. - Negative path: write_file in PLANNING blocked → phase_violation event emitted with violation_kind=tool_not_allowed → LLM calls advance_phase → write_file in BUILDING succeeds. Asserts exactly 1 violation, tool NOT dispatched during PLANNING (write_file.call_count==1 after recovery). - Edge cases: - auto_advance_after_steps=2: engine transitions out of PLANNING after 2 LLM calls without explicit advance_phase. - policy_from_config(enabled=False) returns None (PLAN_EXEC disabled). - policy_from_config({}) returns None (opt-out, fall back to default). - Error path: chat_stream raises RuntimeError → exception propagates, phase state unchanged (still PLANNING), tool not dispatched. - WS handler integration: full _handle_chat_message path emits both phase_violation (from engine) and phase_changed (from WS handler's transition detection) to the client WebSocket. Notes: - Loop detector threshold bumped to 99 for happy/negative/auto-advance tests (3 legitimate advance_phase calls with {} args would trigger the default threshold=2; this is a known PLAN_EXEC production concern tracked separately). - VERIFICATION-phase shell command uses `ls tests/unit/` instead of plan's `pytest tests/unit/ -q` — pytest is not in ShellTool._SAFE_COMMAND_PREFIXES and would be flagged dangerous by the default policy's bash filter. Using ls (whitelisted) keeps the test focused on lifecycle validation rather than policy tuning. Verification: python3 -m pytest tests/integration/test_plan_exec_e2e.py -v passes (7/7). Full regression: 116 tests pass across U1-U5 test files. Ruff check + format clean. Refs: R34, R27. Plan: docs/plans/2026-06-30-001-feat-agent-wave4-plan-exec-hardening-plan.md --- tests/integration/test_plan_exec_e2e.py | 489 ++++++++++++++++++++++++ 1 file changed, 489 insertions(+) create mode 100644 tests/integration/test_plan_exec_e2e.py diff --git a/tests/integration/test_plan_exec_e2e.py b/tests/integration/test_plan_exec_e2e.py new file mode 100644 index 0000000..a30386f --- /dev/null +++ b/tests/integration/test_plan_exec_e2e.py @@ -0,0 +1,489 @@ +"""E2E integration test for PLAN_EXEC lifecycle (Wave 4 U5, R34). + +Exercises the full PLAN_EXEC path through a scripted LLM mock: + PLANNING (search) → advance_phase → BUILDING (write_file) → + advance_phase → VERIFICATION (shell pytest) → advance_phase → + DELIVERY (final answer) + +Also covers the negative path (write_file blocked in PLANNING), an +edge case (auto_advance_after_steps safety net), and the error path +(LLM raises mid-stream). + +Mock boundary: ``LLMGateway.chat_stream`` — yields scripted ``StreamChunk`` +objects deterministically. Real ``ReActEngine``, real ``PhasePolicy`` +(``default_policy()``), real ``AdvancePhaseTool``, real WS handler in +``chat._handle_chat_message``. No real LLM API call is made. + +Patterns followed: +- ``tests/unit/test_react_token_streaming.py`` — scripted gateway pattern. +- ``tests/unit/test_chat_plan_exec_ws.py`` — WS handler test fixture pattern. +- ``tests/integration/test_react_loop.py`` — stub-tool + LLMResponse pattern. +""" + +from __future__ import annotations + +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from agentkit.core.phase import PhaseState, default_policy, policy_from_config +from agentkit.core.react import ReActEngine +from agentkit.llm.gateway import LLMGateway +from agentkit.llm.protocol import StreamChunk, TokenUsage, ToolCall +from agentkit.tools.advance_phase import AdvancePhaseTool +from agentkit.tools.base import Tool + +# Integration marker matches the rest of tests/integration/. This test does +# NOT require docker (LLM is mocked) — the marker is for filtering only. +pytestmark = pytest.mark.integration + + +# --------------------------------------------------------------------------- +# Helpers — scripted LLM gateway + stub tools +# --------------------------------------------------------------------------- + + +class _StubTool(Tool): + """A minimal tool that records its invocations and returns a fixed result.""" + + def __init__(self, name: str, result: dict[str, Any] | None = None) -> None: + super().__init__(name=name, description=f"Stub {name}") + self._result = result or {"ok": True, "tool": name} + self.call_count: int = 0 + self.calls: list[dict[str, Any]] = [] + + async def execute(self, **kwargs) -> dict[str, Any]: + self.call_count += 1 + self.calls.append(kwargs) + return self._result + + +def _tool_call_chunk( + tool_call: ToolCall, *, model: str = "test-model" +) -> StreamChunk: + """A final-chunk carrying exactly one tool call (no content). + + Engine reads ``chunk.tool_calls`` with REPLACE semantics (not extend) + at react.py:1369 — so a single final chunk must hold the full list. + """ + return StreamChunk( + content="", + model=model, + tool_calls=[tool_call], + usage=TokenUsage(prompt_tokens=10, completion_tokens=5), + is_final=True, + ) + + +def _final_answer_chunk(text: str, *, model: str = "test-model") -> StreamChunk: + """A final-chunk carrying the final answer text.""" + return StreamChunk( + content=text, + model=model, + usage=TokenUsage(prompt_tokens=20, completion_tokens=30), + is_final=True, + ) + + +def _make_scripted_gateway(script: list[list[StreamChunk]]) -> MagicMock: + """Create a mock LLMGateway whose ``chat_stream`` pops one scripted step. + + Each ``chat_stream()`` invocation yields the next inner list of + ``StreamChunk`` objects. Raises ``IndexError`` if the script is + exhausted (test fixture misconfiguration — fail loud, not silent). + """ + gateway = MagicMock(spec=LLMGateway) + _state = {"idx": 0} + + async def _stream(**kwargs: Any) -> Any: + i = _state["idx"] + if i >= len(script): + raise IndexError( + f"Scripted gateway exhausted: call {i + 1} but only " + f"{len(script)} steps scripted" + ) + _state["idx"] += 1 + for chunk in script[i]: + yield chunk + + gateway.chat_stream = _stream + gateway.get_provider_name_for_model = MagicMock(return_value=None) + return gateway + + +# --------------------------------------------------------------------------- +# Happy path — full PLAN_EXEC lifecycle +# --------------------------------------------------------------------------- + + +class TestPlanExecE2EHappyPath: + """PLANNING → BUILDING → VERIFICATION → DELIVERY via advance_phase.""" + + @pytest.mark.asyncio + async def test_full_lifecycle_emits_expected_events(self) -> None: + # 7-step script: search → advance → write_file → advance → + # shell(pytest) → advance → final answer + script: list[list[StreamChunk]] = [ + # Step 1 (PLANNING): `search` is in PLANNING whitelist → dispatched. + [_tool_call_chunk(ToolCall(id="tc1", name="search", arguments={"query": "docs"}))], + # Step 2 (PLANNING): `advance_phase` bypasses phase check → dispatched. + [_tool_call_chunk(ToolCall(id="tc2", name="advance_phase", arguments={}))], + # Step 3 (BUILDING): `write_file` allowed → dispatched. + [_tool_call_chunk( + ToolCall(id="tc3", name="write_file", arguments={"path": "/tmp/x", "content": "hi"}) + )], + # Step 4 (BUILDING): advance_phase → transitions to VERIFICATION. + [_tool_call_chunk(ToolCall(id="tc4", name="advance_phase", arguments={}))], + # Step 5 (VERIFICATION): `shell` with `ls tests/unit/` — read-only, + # passes ShellTool._is_dangerous (ls is whitelisted). The plan's + # example used `pytest tests/unit/ -q`, but pytest is not in + # _SAFE_COMMAND_PREFIXES → flagged dangerous by default. Verifying + # the test files exist via `ls` is the realistic VERIFICATION-phase + # shell call that the default policy actually allows. + [_tool_call_chunk( + ToolCall(id="tc5", name="shell", arguments={"command": "ls tests/unit/"}) + )], + # Step 6 (VERIFICATION): advance_phase → transitions to DELIVERY. + [_tool_call_chunk(ToolCall(id="tc6", name="advance_phase", arguments={}))], + # Step 7 (DELIVERY): final answer text (no tool_calls) → loop exits. + [_final_answer_chunk("Delivered: hello world")], + ] + gateway = _make_scripted_gateway(script) + + engine = ReActEngine(llm_gateway=gateway, phase_policy=default_policy()) + # ponytail: bump loop threshold so 3 legitimate `advance_phase` calls + # (always `{}` args) don't trigger the loop detector. PLAN_EXEC's + # 4-phase lifecycle needs 3 transitions; default threshold=2 fires on + # the 2nd identical call. This is a known PLAN_EXEC production concern + # tracked separately — U5 only validates the lifecycle end-to-end. + engine._loop_threshold = 99 # noqa: SLF001 + + # Real stub tools + real AdvancePhaseTool (bound to engine). + search = _StubTool("search", {"results": ["doc1", "doc2"]}) + write_file = _StubTool("write_file", {"bytes_written": 2}) + shell = _StubTool("shell", {"exit_code": 0, "stdout": "all tests passed"}) + advance = AdvancePhaseTool(engine=engine) + tools: list[Tool] = [search, write_file, shell, advance] + + events = [] + async for ev in engine.execute_stream( + messages=[{"role": "user", "content": "Build and verify hello world"}], + tools=tools, + ): + events.append(ev) + + # Final answer emitted exactly once. + finals = [e for e in events if e.event_type == "final_answer"] + assert len(finals) == 1 + assert "Delivered" in finals[0].data["output"] + + # Tool dispatch counts: search=1, write_file=1, shell=1, advance=3. + assert search.call_count == 1 + assert write_file.call_count == 1 + assert shell.call_count == 1 + # advance_phase is a real AdvancePhaseTool (not a _StubTool) — count + # via tool_call events in the event stream. 3 calls = 3 transitions + # (PLANNING → BUILDING → VERIFICATION → DELIVERY). + advance_calls = [ + e for e in events + if e.event_type == "tool_call" and e.data.get("tool_name") == "advance_phase" + ] + assert len(advance_calls) == 3 + + # No phase_violation events in happy path. + violations = [e for e in events if e.event_type == "phase_violation"] + assert len(violations) == 0 + + # Engine ended at DELIVERY. + assert engine.current_phase == PhaseState.DELIVERY + + # tool_call / tool_result event counts match dispatched tools (6 of each). + tool_calls = [e for e in events if e.event_type == "tool_call"] + tool_results = [e for e in events if e.event_type == "tool_result"] + assert len(tool_calls) == 6 + assert len(tool_results) == 6 + + # Ordering: tool_call must precede its matching tool_result. + first_tc_idx = next( + i for i, e in enumerate(events) if e.event_type == "tool_call" + ) + first_tr_idx = next( + i for i, e in enumerate(events) if e.event_type == "tool_result" + ) + assert first_tc_idx < first_tr_idx + + +# --------------------------------------------------------------------------- +# Negative path — violation then recovery via advance_phase +# --------------------------------------------------------------------------- + + +class TestPlanExecE2ENegativePath: + """Out-of-phase tool blocked → LLM calls advance_phase → tool succeeds.""" + + @pytest.mark.asyncio + async def test_violation_then_recovery(self) -> None: + script: list[list[StreamChunk]] = [ + # Step 1 (PLANNING): `write_file` NOT in PLANNING whitelist → blocked. + [_tool_call_chunk( + ToolCall(id="tc1", name="write_file", arguments={"path": "/x", "content": "y"}) + )], + # Step 2 (PLANNING): LLM reacts to violation by calling advance_phase. + [_tool_call_chunk(ToolCall(id="tc2", name="advance_phase", arguments={}))], + # Step 3 (BUILDING): `write_file` now allowed → dispatched. + [_tool_call_chunk( + ToolCall(id="tc3", name="write_file", arguments={"path": "/x", "content": "y"}) + )], + # Step 4: final answer. + [_final_answer_chunk("Recovered and built")], + ] + gateway = _make_scripted_gateway(script) + + engine = ReActEngine(llm_gateway=gateway, phase_policy=default_policy()) + # See happy path test for the loop threshold rationale. + engine._loop_threshold = 99 # noqa: SLF001 + write_file = _StubTool("write_file", {"bytes_written": 1}) + advance = AdvancePhaseTool(engine=engine) + tools: list[Tool] = [write_file, advance] + + events = [] + async for ev in engine.execute_stream( + messages=[{"role": "user", "content": "Build x"}], + tools=tools, + ): + events.append(ev) + + # Exactly one phase_violation event — from step 1. + violations = [e for e in events if e.event_type == "phase_violation"] + assert len(violations) == 1 + v = violations[0].data + assert v["tool"] == "write_file" + assert v["current_phase"] == "planning" + assert v["violation_kind"] == "tool_not_allowed" + assert "advance_phase" in v["message"] + + # write_file dispatched exactly once (during BUILDING, NOT during PLANNING). + assert write_file.call_count == 1 + + # Engine ended at BUILDING (advance_phase was called once). + assert engine.current_phase == PhaseState.BUILDING + + # Final answer emitted despite the violation. + finals = [e for e in events if e.event_type == "final_answer"] + assert len(finals) == 1 + assert "Recovered" in finals[0].data["output"] + + +# --------------------------------------------------------------------------- +# Edge cases — auto-advance safety net + plan_exec.enabled=False +# --------------------------------------------------------------------------- + + +class TestPlanExecE2EEdgeCases: + """auto_advance_after_steps triggers transition without explicit advance_phase, + and policy_from_config(enabled=False) returns None (PLAN_EXEC disabled).""" + + @pytest.mark.asyncio + async def test_auto_advance_after_two_steps(self) -> None: + """With auto_advance_after_steps=2, after 2 LLM calls in PLANNING + the engine auto-advances to BUILDING — even without an explicit + advance_phase tool call.""" + # Custom policy: auto-advance after 2 steps per phase. + policy = default_policy() + # ponytail: dataclass(slots=True) — use __setattr__ via object.__setattr__ + # or rebuild via dataclasses.replace. Replace is the clean path. + from dataclasses import replace + + policy = replace(policy, auto_advance_after_steps=2) + + # Script: LLM calls `search` 3 times then final answer. + # Expected: step 1 (PLANNING, search), step 2 (PLANNING, search) → + # auto-advance fires after step 2 → step 3 (BUILDING, search still + # allowed), then final answer. + script: list[list[StreamChunk]] = [ + [_tool_call_chunk(ToolCall(id="tc1", name="search", arguments={"query": "a"}))], + [_tool_call_chunk(ToolCall(id="tc2", name="search", arguments={"query": "b"}))], + [_tool_call_chunk(ToolCall(id="tc3", name="search", arguments={"query": "c"}))], + [_final_answer_chunk("Done after auto-advance")], + ] + gateway = _make_scripted_gateway(script) + engine = ReActEngine(llm_gateway=gateway, phase_policy=policy) + # See happy path test for the loop threshold rationale. + engine._loop_threshold = 99 # noqa: SLF001 + search = _StubTool("search", {"results": []}) + tools: list[Tool] = [search] + + events = [] + async for ev in engine.execute_stream( + messages=[{"role": "user", "content": "Search stuff"}], + tools=tools, + ): + events.append(ev) + + # Engine should have transitioned out of PLANNING (auto-advance fired). + assert engine.current_phase != PhaseState.PLANNING + # All 3 search calls dispatched (search is allowed in both PLANNING and BUILDING). + assert search.call_count == 3 + # Final answer emitted. + finals = [e for e in events if e.event_type == "final_answer"] + assert len(finals) == 1 + + def test_policy_from_config_returns_none_when_disabled(self) -> None: + """Edge: plan_exec.enabled=False → policy_from_config returns None, + which causes _build_phase_engine to fall back to REACT (no policy).""" + result = policy_from_config({"enabled": False}) + assert result is None + + def test_policy_from_config_returns_default_when_section_absent(self) -> None: + """Edge: empty plan_exec config → policy_from_config returns None + (opt-out), so _build_phase_engine falls back to default_policy().""" + result = policy_from_config({}) + assert result is None + + +# --------------------------------------------------------------------------- +# Error path — LLM raises mid-stream +# --------------------------------------------------------------------------- + + +class TestPlanExecE2EErrorPath: + """LLM call failure propagates; phase state is left untouched.""" + + @pytest.mark.asyncio + async def test_llm_raises_propagates_and_phase_unchanged(self) -> None: + """If chat_stream raises, the exception propagates out of execute_stream + and the engine's phase state remains at its starting phase.""" + gateway = MagicMock(spec=LLMGateway) + + async def _stream_raises(**kwargs: Any) -> Any: + raise RuntimeError("LLM service down") + yield # pragma: no cover — async generator marker + + gateway.chat_stream = _stream_raises + gateway.get_provider_name_for_model = MagicMock(return_value=None) + + engine = ReActEngine(llm_gateway=gateway, phase_policy=default_policy()) + search = _StubTool("search") + tools: list[Tool] = [search] + + with pytest.raises(RuntimeError, match="LLM service down"): + async for _ in engine.execute_stream( + messages=[{"role": "user", "content": "hi"}], + tools=tools, + ): + pass + + # Phase state unchanged — no transition was triggered. + assert engine.current_phase == PhaseState.PLANNING + # Tool was never dispatched. + assert search.call_count == 0 + + +# --------------------------------------------------------------------------- +# WS handler integration — phase_changed events emitted to client +# --------------------------------------------------------------------------- + + +class TestPlanExecE2EWSHandler: + """Full WS path: _handle_chat_message emits phase_changed + phase_violation + events to the client WebSocket as the engine transitions phases.""" + + @pytest.mark.asyncio + async def test_ws_handler_emits_phase_changed_and_violation(self) -> None: + from fastapi import FastAPI + + from agentkit.chat.skill_routing import ExecutionMode, SkillRoutingResult + from agentkit.server.routes import chat as chat_module + from agentkit.server.routes.chat import router + from agentkit.session.manager import SessionManager + from agentkit.session.store import InMemorySessionStore + + app = FastAPI() + app.include_router(router, prefix="/api/v1") + app.state.session_manager = SessionManager(store=InMemorySessionStore()) + app.state.agent_pool = MagicMock() + app.state.server_config = MagicMock() + app.state.server_config.api_key = None + app.state.server_config.plan_exec = {} + + # Scripted gateway: write_file in PLANNING (blocked) → advance_phase → + # write_file in BUILDING (allowed) → final answer. + script: list[list[StreamChunk]] = [ + [_tool_call_chunk( + ToolCall(id="tc1", name="write_file", arguments={"path": "/x", "content": "y"}) + )], + [_tool_call_chunk(ToolCall(id="tc2", name="advance_phase", arguments={}))], + [_tool_call_chunk( + ToolCall(id="tc3", name="write_file", arguments={"path": "/x", "content": "y"}) + )], + [_final_answer_chunk("Done via WS")], + ] + gateway = _make_scripted_gateway(script) + app.state.llm_gateway = gateway + + # Agent mock: returns tools including real AdvancePhaseTool placeholder. + # _build_phase_engine appends the real AdvancePhaseTool bound to the + # real ReActEngine, so we only need to provide the base tools here. + write_file = _StubTool("write_file", {"bytes_written": 1}) + agent = MagicMock() + agent.name = "test-agent" + agent._tool_registry = MagicMock() + agent._tool_registry.list_tools.return_value = [write_file] + agent._system_prompt = None + agent._react_engine = None + agent.get_model.return_value = "default" + app.state.agent_pool.get_agent.return_value = agent + + routing = SkillRoutingResult( + execution_mode=ExecutionMode.PLAN_EXEC, + tools=[write_file], + clean_content="build x", + model="default", + agent_name="test-agent", + system_prompt=None, + skill_name=None, + ) + app.state.request_preprocessor = MagicMock() + app.state.request_preprocessor.preprocess = AsyncMock(return_value=routing) + + sm = app.state.session_manager + # Pre-create the session so get_session succeeds (create_session + # generates the session_id internally and returns the Session). + session = await sm.create_session(agent_name="test-agent") + session_id = session.session_id + sm.get_chat_messages = AsyncMock(return_value=[{"role": "user", "content": "build x"}]) + + ws = MagicMock() + ws.app = app + ws.send_json = AsyncMock() + + await chat_module._handle_chat_message( + websocket=ws, + session_id=session_id, + content="build x", + sm=sm, + cancellation_token=MagicMock(), + pending_replies={}, + pending_confirmations=None, + ) + + sent = [call.args[0] for call in ws.send_json.call_args_list] + + # phase_violation forwarded exactly once (from step 1: write_file in PLANNING). + violations = [m for m in sent if m.get("type") == "phase_violation"] + assert len(violations) == 1 + assert violations[0]["data"]["tool"] == "write_file" + assert violations[0]["data"]["current_phase"] == "planning" + + # phase_changed forwarded at least once (PLANNING → BUILDING transition). + changed = [m for m in sent if m.get("type") == "phase_changed"] + assert len(changed) >= 1 + first_change = changed[0]["data"] + assert first_change["phase"] == "building" + assert first_change["previous"] == "planning" + + # final_answer emitted. + finals = [m for m in sent if m.get("type") == "final_answer"] + assert len(finals) == 1 + assert "Done via WS" in finals[0]["content"]