From b032e08866334e1c4017bec8951dd166682819c5 Mon Sep 17 00:00:00 2001 From: chiguyong Date: Tue, 30 Jun 2026 10:59:43 +0800 Subject: [PATCH] feat(U3): extract _build_phase_engine helper + wire REST PLAN_EXEC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract the WS path's inline phase_policy construction into a shared _build_phase_engine helper so the REST send_message endpoint can reuse it. Replace the former 501 stub with actual PLAN_EXEC execution: - REST POST /chat/sessions/{id}/messages with execution_mode=plan_exec now builds a phase-policy-backed ReActEngine, calls execute() (non-streaming), and returns a MessageResponse. - KTD5: PLAN_EXEC bypasses execute_with_fallback_chain — phase policy and fallback chain are mutually exclusive. - When plan_exec.enabled=False, REST falls through to the REACT path (matching WS behavior). - WS path refactored to call the same helper; behavior unchanged. Tests: - Replace TestRestPlanExec501 with TestRestPlanExec (happy path, bad config → 500, disabled → falls through to REACT, REACT mode unchanged). - Add TestBuildPhaseEngineHelper covering all return branches: not-PLAN_EXEC, disabled, empty-config, invalid-config, tool append, default-policy fallback. - All 109 tests pass across the three PLAN_EXEC test files. --- src/agentkit/server/routes/chat.py | 183 ++++++++++++++------ tests/unit/test_chat_plan_exec_ws.py | 244 +++++++++++++++++++++++++-- 2 files changed, 368 insertions(+), 59 deletions(-) diff --git a/src/agentkit/server/routes/chat.py b/src/agentkit/server/routes/chat.py index 41422a2..54cfb12 100644 --- a/src/agentkit/server/routes/chat.py +++ b/src/agentkit/server/routes/chat.py @@ -25,7 +25,7 @@ from fastapi.responses import FileResponse from pydantic import BaseModel from agentkit.chat.skill_routing import ExecutionMode -from agentkit.core.phase import PhasePolicy, default_policy, policy_from_config +from agentkit.core.phase import default_policy, policy_from_config from agentkit.core.protocol import CancellationToken from agentkit.core.react import ReActEngine from agentkit.server._fallback_chain import execute_with_fallback_chain @@ -534,6 +534,69 @@ def _message_to_response(msg) -> MessageResponse: ) +def _build_phase_engine( + *, + server_config: Any, + llm_gateway: Any, + execution_mode: ExecutionMode, + base_tools: list, + session_id: str = "", +) -> tuple[ReActEngine | None, list | None, str | None]: + """Build a PLAN_EXEC engine with PhasePolicy + AdvancePhaseTool. + + Encapsulates the WS path's phase_policy construction so the REST path + can reuse it without duplicating config-lookup + policy_from_config + + AdvancePhaseTool registration. KTD5: PLAN_EXEC bypasses the fallback + chain — callers must NOT route the returned engine through + ``execute_with_fallback_chain``. + + Args: + server_config: ``app.state.server_config`` (or None for tests). + llm_gateway: ``app.state.llm_gateway``. + execution_mode: routing.execution_mode (WS) or PLAN_EXEC (REST). + base_tools: routing.tools (WS) or agent tool list (REST). + session_id: included in log lines for traceability only. + + Returns ``(engine, tools_with_advance_phase, error_message)``: + - execution_mode != PLAN_EXEC → ``(None, None, None)`` (fall back to REACT). + - plan_exec.enabled=False → ``(None, None, None)`` (fall back to REACT). + - phase policy construction failed → ``(None, None, error_message)``. + - PLAN_EXEC engaged → ``(engine, tools_with_advance_phase, None)``. + """ + if execution_mode != ExecutionMode.PLAN_EXEC: + return (None, None, None) + + plan_exec_cfg = getattr(server_config, "plan_exec", None) or {} + if plan_exec_cfg.get("enabled", True) is False: + logger.info( + "PLAN_EXEC disabled by config (plan_exec.enabled=False), " + "falling back to REACT for session %s", + session_id, + ) + return (None, None, None) + + try: + phase_policy = policy_from_config(plan_exec_cfg) + if phase_policy is None: + # Empty config (no `plan_exec:` section) → use KTD5 defaults. + phase_policy = default_policy() + except Exception as e: + logger.error( + "PLAN_EXEC phase policy construction failed for session %s: %s", + session_id, + e, + ) + return (None, None, f"phase policy error: {str(e)[:200]}") + + engine = ReActEngine( + llm_gateway=llm_gateway, + phase_policy=phase_policy, + ) + advance_phase_tool = AdvancePhaseTool(engine=engine) + tools_with_advance_phase = list(base_tools) + [advance_phase_tool] + return (engine, tools_with_advance_phase, None) + + # ── REST endpoints ──────────────────────────────────────────────────── @@ -587,12 +650,58 @@ async def send_message(session_id: str, request: SendMessageRequest, req: Reques if session.status == SessionStatus.CLOSED: raise HTTPException(status_code=400, detail=f"Session '{session_id}' is closed") - # KTD4: PLAN_EXEC is wired only at the WebSocket path. REST raises 501. + # U3: PLAN_EXEC via REST — non-streaming, bypasses the fallback chain + # (KTD5: PLAN_EXEC and execute_with_fallback_chain are mutually exclusive). + # When plan_exec is disabled by config, falls through to the REACT path below. if request.execution_mode == "plan_exec": - raise HTTPException( - status_code=501, - detail="PLAN_EXEC via REST not yet supported; use WebSocket", + # Resolve the Agent early — PLAN_EXEC needs its tool list + system prompt. + pool = req.app.state.agent_pool + agent = pool.get_agent(session.agent_name) + if agent is None: + raise HTTPException(status_code=404, detail=f"Agent '{session.agent_name}' not found") + + plan_exec_engine, plan_exec_tools, plan_exec_error = _build_phase_engine( + server_config=getattr(req.app.state, "server_config", None), + llm_gateway=req.app.state.llm_gateway, + execution_mode=ExecutionMode.PLAN_EXEC, + base_tools=agent._tool_registry.list_tools() if agent._tool_registry else [], + session_id=session_id, ) + if plan_exec_error is not None: + raise HTTPException(status_code=500, detail=plan_exec_error) + if plan_exec_engine is not None: + # PLAN_EXEC engaged — append user msg, execute non-streaming, return. + await sm.append_message( + session_id=session_id, + role=MessageRole.USER, + content=request.content, + ) + chat_messages = await sm.get_chat_messages(session_id) + system_prompt = getattr(agent, "_system_prompt", None) or ( + agent.get_system_prompt() if hasattr(agent, "get_system_prompt") else None + ) + try: + plan_exec_result = await plan_exec_engine.execute( + messages=chat_messages, + tools=plan_exec_tools, + model=agent.get_model() + if hasattr(agent, "get_model") + else getattr(agent, "_llm_model", "default"), + agent_name=agent.name, + system_prompt=system_prompt, + ) + except Exception as e: + logger.error(f"PLAN_EXEC execution error for session {session_id}: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + assistant_msg = await sm.append_message( + session_id=session_id, + role=MessageRole.ASSISTANT, + content=plan_exec_result.output, + agent_name=agent.name, + ) + return _message_to_response(assistant_msg) + # else: plan_exec.enabled=False → fall through to REACT path below. # Append user message await sm.append_message( @@ -1090,42 +1199,27 @@ async def _handle_chat_message( await websocket.send_json({"type": "error", "data": {"message": str(e)[:200]}}) return - # U4/G6: PLAN_EXEC — build PhasePolicy from server config (KTD4: WebSocket only). + # U4/G6: PLAN_EXEC — build PhasePolicy from server config. # KTD5 (Wave 2): fallback chain NOT applied to PLAN_EXEC — phase policy and # fallback chain are mutually exclusive. PLAN_EXEC uses its own engine. - phase_policy: PhasePolicy | None = None - if routing.execution_mode == ExecutionMode.PLAN_EXEC: - server_config = getattr(websocket.app.state, "server_config", None) - plan_exec_cfg = getattr(server_config, "plan_exec", None) or {} - - if plan_exec_cfg.get("enabled", True) is False: - # Explicit opt-out → fall back to REACT. - logger.info( - "PLAN_EXEC disabled by config (plan_exec.enabled=False), " - "falling back to REACT for session %s", - session_id, - ) - else: - try: - phase_policy = policy_from_config(plan_exec_cfg) - if phase_policy is None: - # Empty config (no `plan_exec:` section) → use KTD5 defaults. - phase_policy = default_policy() - except Exception as e: - logger.error( - "PLAN_EXEC phase policy construction failed for session %s: %s", - session_id, - e, - ) - await websocket.send_json( - { - "type": "error", - # Truncate to 200 chars to match nearby error paths and - # avoid leaking config internals (see chat.py:1090, 1320). - "data": {"message": f"phase policy error: {str(e)[:200]}"}, - } - ) - return + # U3: logic extracted into _build_phase_engine so REST can reuse it. + plan_exec_engine, plan_exec_tools, plan_exec_error = _build_phase_engine( + server_config=getattr(websocket.app.state, "server_config", None), + llm_gateway=websocket.app.state.llm_gateway, + execution_mode=routing.execution_mode, + base_tools=routing.tools, + session_id=session_id, + ) + if plan_exec_error is not None: + await websocket.send_json( + { + "type": "error", + # Truncate to 200 chars to match nearby error paths and + # avoid leaking config internals (see chat.py:1090, 1320). + "data": {"message": plan_exec_error}, + } + ) + return # Handle advanced execution modes: REWOO/REFLEXION/TEAM_COLLAB # still fall back to REACT with a warning. PLAN_EXEC is handled above. @@ -1143,14 +1237,9 @@ async def _handle_chat_message( # Reuse Agent's ReActEngine if available (U2: Chat pipeline optimization). # PLAN_EXEC creates a fresh engine with phase_policy set (cannot reuse the # agent's _react_engine — it has no policy). - if phase_policy is not None: - react_engine = ReActEngine( - llm_gateway=websocket.app.state.llm_gateway, - phase_policy=phase_policy, - ) - # Register AdvancePhaseTool bound to this engine (LLM's escape hatch). - advance_phase_tool = AdvancePhaseTool(engine=react_engine) - routing.tools = list(routing.tools) + [advance_phase_tool] + if plan_exec_engine is not None: + react_engine = plan_exec_engine + routing.tools = plan_exec_tools else: react_engine = getattr(agent, "_react_engine", None) if react_engine is None: diff --git a/tests/unit/test_chat_plan_exec_ws.py b/tests/unit/test_chat_plan_exec_ws.py index aec66f4..09c8750 100644 --- a/tests/unit/test_chat_plan_exec_ws.py +++ b/tests/unit/test_chat_plan_exec_ws.py @@ -1,10 +1,12 @@ -"""Unit tests for PLAN_EXEC wiring at chat.py WebSocket path (G6, U4). +"""Unit tests for PLAN_EXEC wiring at chat.py REST + WebSocket paths (G6, U3, U4). Per plan U4 Execution note: characterization-first — verify that existing REWOO/REFLEXION/TEAM_COLLAB modes still fall back to REACT with the warning (no regression). Then add PLAN_EXEC wiring tests. -KTD4: PLAN_EXEC is wired only at the WebSocket path; REST raises HTTP 501. +U3: PLAN_EXEC is now wired at both REST and WebSocket paths. REST returns +a non-streaming MessageResponse; WS streams phase_violation events alongside +the LLM reinjection. KTD5: PLAN_EXEC bypasses the fallback chain. """ from __future__ import annotations @@ -109,13 +111,60 @@ def _setup_routing(app, routing: SkillRoutingResult, agent: MagicMock) -> None: # --------------------------------------------------------------------------- -# REST — PLAN_EXEC raises 501 (KTD4) +# REST — PLAN_EXEC wired (U3, replaces former 501 path) # --------------------------------------------------------------------------- -class TestRestPlanExec501: - def test_rest_plan_exec_returns_501(self, client): - """REST send_message with execution_mode=plan_exec → 501.""" +class TestRestPlanExec: + """U3: REST send_message with execution_mode=plan_exec now executes + PLAN_EXEC (non-streaming) instead of raising 501.""" + + def test_rest_plan_exec_returns_assistant_message(self, app_with_chat, monkeypatch): + """REST PLAN_EXEC happy path → 200 with assistant message.""" + from agentkit.server.routes import chat as chat_module + + # Patch ReActEngine with a stub whose execute() returns a ReActResult-like. + class _StubResult: + output = "PLAN_EXEC completed" + status = "success" + + class _StubEngine: + def __init__(self, **kwargs): + self._phase_policy = kwargs.get("phase_policy") + self._current_phase = ( + kwargs.get("phase_policy").start_phase if kwargs.get("phase_policy") else None + ) + + async def execute(self, **kwargs): + return _StubResult() + + monkeypatch.setattr(chat_module, "ReActEngine", _StubEngine) + + # Wire agent_pool with a mock agent that has _tool_registry. + agent = _make_agent_mock() + app_with_chat.state.agent_pool.get_agent.return_value = agent + + client = TestClient(app_with_chat) + create_resp = client.post("/api/v1/chat/sessions", json={"agent_name": "test-agent"}) + session_id = create_resp.json()["session_id"] + + msg_resp = client.post( + f"/api/v1/chat/sessions/{session_id}/messages", + json={"content": "Build me a hello world", "execution_mode": "plan_exec"}, + ) + assert msg_resp.status_code == 200 + body = msg_resp.json() + assert body["content"] == "PLAN_EXEC completed" + assert body["role"] == "assistant" + + def test_rest_plan_exec_bad_config_returns_500(self, app_with_chat): + """REST PLAN_EXEC with invalid phase config → 500 with error detail.""" + app_with_chat.state.server_config.plan_exec = {"start_phase": "invalid_phase_name"} + + agent = _make_agent_mock() + app_with_chat.state.agent_pool.get_agent.return_value = agent + + client = TestClient(app_with_chat) create_resp = client.post("/api/v1/chat/sessions", json={"agent_name": "test-agent"}) session_id = create_resp.json()["session_id"] @@ -123,20 +172,71 @@ class TestRestPlanExec501: f"/api/v1/chat/sessions/{session_id}/messages", json={"content": "Hello", "execution_mode": "plan_exec"}, ) - assert msg_resp.status_code == 501 - assert "PLAN_EXEC via REST not yet supported" in msg_resp.json()["detail"] + assert msg_resp.status_code == 500 + assert "phase policy error" in msg_resp.json()["detail"] - def test_rest_react_mode_still_works(self, client): - """REST send_message without execution_mode doesn't 501.""" + def test_rest_plan_exec_disabled_falls_through_to_react(self, app_with_chat, monkeypatch): + """REST PLAN_EXEC with enabled=False → falls through to REACT path.""" + from agentkit.server.routes import chat as chat_module + + app_with_chat.state.server_config.plan_exec = {"enabled": False} + + # Track which engine constructor fires. + constructed: list = [] + + class _StubResult: + output = "REACT fallback ok" + status = "success" + + class _StubEngine: + def __init__(self, **kwargs): + constructed.append(kwargs) + self._phase_policy = kwargs.get("phase_policy") + + async def execute(self, **kwargs): + return _StubResult() + + monkeypatch.setattr(chat_module, "ReActEngine", _StubEngine) + # execute_with_fallback_chain also constructs ReflexionEngine internally; + # patch it to return a ChatExecutionResult-like directly. + from agentkit.server._fallback_chain import ChatExecutionResult + + async def _stub_chain(**kwargs): + return ChatExecutionResult(output="REACT fallback ok", status="success") + + monkeypatch.setattr(chat_module, "execute_with_fallback_chain", _stub_chain) + + agent = _make_agent_mock() + app_with_chat.state.agent_pool.get_agent.return_value = agent + + client = TestClient(app_with_chat) create_resp = client.post("/api/v1/chat/sessions", json={"agent_name": "test-agent"}) session_id = create_resp.json()["session_id"] - # No execution_mode field → should NOT trigger 501. + msg_resp = client.post( + f"/api/v1/chat/sessions/{session_id}/messages", + json={"content": "Hello", "execution_mode": "plan_exec"}, + ) + assert msg_resp.status_code == 200 + assert msg_resp.json()["content"] == "REACT fallback ok" + # No engine should have been constructed with phase_policy — PLAN_EXEC + # was disabled and the REACT path doesn't set phase_policy. + assert all(kw.get("phase_policy") is None for kw in constructed) + + def test_rest_react_mode_still_works(self, client): + """REST send_message without execution_mode doesn't 500.""" + create_resp = client.post("/api/v1/chat/sessions", json={"agent_name": "test-agent"}) + session_id = create_resp.json()["session_id"] + + # No execution_mode field → should NOT trigger PLAN_EXEC path. + # Will likely 500 due to mock llm_gateway, but must NOT be a PLAN_EXEC error. msg_resp = client.post( f"/api/v1/chat/sessions/{session_id}/messages", json={"content": "Hello"}, ) - assert msg_resp.status_code != 501 + # 500 is acceptable (mock gateway), but it must NOT be the PLAN_EXEC error. + if msg_resp.status_code == 500: + assert "phase policy error" not in msg_resp.json().get("detail", "") # --------------------------------------------------------------------------- @@ -671,3 +771,123 @@ async def test_no_phase_violation_event_when_not_plan_exec(app_with_chat): sent_messages = [call.args[0] for call in ws.send_json.call_args_list] violation_messages = [m for m in sent_messages if m.get("type") == "phase_violation"] assert len(violation_messages) == 0 + + +# --------------------------------------------------------------------------- +# _build_phase_engine helper (U3) +# --------------------------------------------------------------------------- + + +class TestBuildPhaseEngineHelper: + """Direct unit tests for the _build_phase_engine helper extracted in U3.""" + + def test_returns_none_when_not_plan_exec(self): + from agentkit.server.routes.chat import _build_phase_engine + + engine, tools, err = _build_phase_engine( + server_config=None, + llm_gateway=MagicMock(), + execution_mode=ExecutionMode.REACT, + base_tools=[], + ) + assert engine is None + assert tools is None + assert err is None + + def test_returns_none_when_plan_exec_disabled_by_config(self): + from agentkit.server.routes.chat import _build_phase_engine + + server_config = MagicMock() + server_config.plan_exec = {"enabled": False} + + engine, tools, err = _build_phase_engine( + server_config=server_config, + llm_gateway=MagicMock(), + execution_mode=ExecutionMode.PLAN_EXEC, + base_tools=[], + ) + assert engine is None + assert tools is None + assert err is None + + def test_returns_none_when_plan_exec_section_absent(self): + """Empty plan_exec config → default_policy() used, engine built.""" + from agentkit.server.routes.chat import _build_phase_engine + + server_config = MagicMock() + server_config.plan_exec = {} + + engine, tools, err = _build_phase_engine( + server_config=server_config, + llm_gateway=MagicMock(), + execution_mode=ExecutionMode.PLAN_EXEC, + base_tools=[], + ) + assert engine is not None + assert tools is not None + assert err is None + # Default policy: PLANNING allows search, blocks write_file + assert "search" in engine._phase_policy.whitelist[PhaseState.PLANNING] + assert "write_file" not in engine._phase_policy.whitelist[PhaseState.PLANNING] + + def test_returns_error_when_phase_policy_invalid(self): + from agentkit.server.routes.chat import _build_phase_engine + + server_config = MagicMock() + server_config.plan_exec = {"start_phase": "invalid_phase_name"} + + engine, tools, err = _build_phase_engine( + server_config=server_config, + llm_gateway=MagicMock(), + execution_mode=ExecutionMode.PLAN_EXEC, + base_tools=[], + ) + assert engine is None + assert tools is None + assert err is not None + assert "phase policy error" in err + + def test_appends_advance_phase_tool_to_tools(self): + from agentkit.server.routes.chat import _build_phase_engine + + server_config = MagicMock() + server_config.plan_exec = {} + + base_tool = MagicMock() + engine, tools, err = _build_phase_engine( + server_config=server_config, + llm_gateway=MagicMock(), + execution_mode=ExecutionMode.PLAN_EXEC, + base_tools=[base_tool], + ) + assert err is None + assert engine is not None + assert tools is not None + # base_tool preserved + AdvancePhaseTool appended + assert len(tools) == 2 + assert tools[0] is base_tool + assert isinstance(tools[1], AdvancePhaseTool) + + def test_engine_uses_default_policy_when_config_returns_none(self, monkeypatch): + """policy_from_config returning None → default_policy() used.""" + from agentkit.server.routes import chat as chat_module + + def _stub_policy_from_config(cfg): + return None + + monkeypatch.setattr(chat_module, "policy_from_config", _stub_policy_from_config) + + server_config = MagicMock() + server_config.plan_exec = {"enabled": True} + + engine, tools, err = chat_module._build_phase_engine( + server_config=server_config, + llm_gateway=MagicMock(), + execution_mode=ExecutionMode.PLAN_EXEC, + base_tools=[], + ) + assert err is None + assert engine is not None + assert engine._phase_policy is not None + # Default policy's start phase is PLANNING + assert engine._current_phase == PhaseState.PLANNING