"""Unit tests for PLAN_EXEC wiring at chat.py REST + WebSocket paths (G6, U3, U4). Per plan U4 Execution note: characterization-first — verify that existing REWOO/REFLEXION/TEAM_COLLAB modes still fall back to REACT with the warning (no regression). Then add PLAN_EXEC wiring tests. U3: PLAN_EXEC is now wired at both REST and WebSocket paths. REST returns a non-streaming MessageResponse; WS streams phase_violation events alongside the LLM reinjection. KTD5: PLAN_EXEC bypasses the fallback chain. """ from __future__ import annotations from unittest.mock import AsyncMock, MagicMock import pytest from fastapi.testclient import TestClient from agentkit.chat.skill_routing import ExecutionMode, SkillRoutingResult from agentkit.core.phase import PhaseState from agentkit.tools.advance_phase import AdvancePhaseTool # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def app_with_chat(): """Create a FastAPI app with Chat routes and mocked dependencies.""" from fastapi import FastAPI from agentkit.server.routes.chat import router app = FastAPI() app.include_router(router, prefix="/api/v1") from agentkit.session.manager import SessionManager from agentkit.session.store import InMemorySessionStore app.state.session_manager = SessionManager(store=InMemorySessionStore()) app.state.llm_gateway = MagicMock() app.state.agent_pool = MagicMock() app.state.server_config = MagicMock() app.state.server_config.api_key = None app.state.server_config.plan_exec = {} return app @pytest.fixture def client(app_with_chat): return TestClient(app_with_chat) def _make_routing( execution_mode: ExecutionMode = ExecutionMode.REACT, tools: list | None = None, ) -> SkillRoutingResult: """Build a minimal SkillRoutingResult for testing.""" return SkillRoutingResult( execution_mode=execution_mode, tools=tools or [], clean_content="test message", model="default", agent_name="test-agent", system_prompt=None, skill_name=None, ) def _make_websocket_mock(app) -> MagicMock: """Build a mock WebSocket with app.state and async send_json.""" ws = MagicMock() ws.app = app ws.send_json = AsyncMock() return ws def _make_agent_mock() -> MagicMock: """Build a mock Agent with _tool_registry and _react_engine.""" agent = MagicMock() agent.name = "test-agent" agent._tool_registry = MagicMock() agent._tool_registry.list_tools.return_value = [] agent._system_prompt = None # _react_engine is None to force the code path that creates a new engine agent._react_engine = None agent.get_model.return_value = "default" return agent def _make_session_manager_mock() -> MagicMock: """Build a mock SessionManager with async methods.""" sm = MagicMock() # get_session returns a mock session with agent_name="test-agent" session = MagicMock() session.agent_name = "test-agent" session.status = "active" sm.get_session = AsyncMock(return_value=session) sm.get_chat_messages = AsyncMock(return_value=[]) sm.append_message = AsyncMock() return sm def _setup_routing(app, routing: SkillRoutingResult, agent: MagicMock) -> None: """Wire up app.state so _handle_chat_message finds the right routing.""" app.state.agent_pool.get_agent.return_value = agent app.state.request_preprocessor = MagicMock() app.state.request_preprocessor.preprocess = AsyncMock(return_value=routing) # --------------------------------------------------------------------------- # REST — PLAN_EXEC wired (U3, replaces former 501 path) # --------------------------------------------------------------------------- class TestRestPlanExec: """U3: REST send_message with execution_mode=plan_exec now executes PLAN_EXEC (non-streaming) instead of raising 501.""" def test_rest_plan_exec_returns_assistant_message(self, app_with_chat, monkeypatch): """REST PLAN_EXEC happy path → 200 with assistant message.""" from agentkit.server.routes import chat as chat_module # Patch ReActEngine with a stub whose execute() returns a ReActResult-like. class _StubResult: output = "PLAN_EXEC completed" status = "success" class _StubEngine: def __init__(self, **kwargs): self._phase_policy = kwargs.get("phase_policy") self._current_phase = ( kwargs.get("phase_policy").start_phase if kwargs.get("phase_policy") else None ) async def execute(self, **kwargs): return _StubResult() monkeypatch.setattr(chat_module, "ReActEngine", _StubEngine) # Wire agent_pool with a mock agent that has _tool_registry. agent = _make_agent_mock() app_with_chat.state.agent_pool.get_agent.return_value = agent client = TestClient(app_with_chat) create_resp = client.post("/api/v1/chat/sessions", json={"agent_name": "test-agent"}) session_id = create_resp.json()["session_id"] msg_resp = client.post( f"/api/v1/chat/sessions/{session_id}/messages", json={"content": "Build me a hello world", "execution_mode": "plan_exec"}, ) assert msg_resp.status_code == 200 body = msg_resp.json() assert body["content"] == "PLAN_EXEC completed" assert body["role"] == "assistant" def test_rest_plan_exec_bad_config_returns_500(self, app_with_chat): """REST PLAN_EXEC with invalid phase config → 500 with error detail.""" app_with_chat.state.server_config.plan_exec = {"start_phase": "invalid_phase_name"} agent = _make_agent_mock() app_with_chat.state.agent_pool.get_agent.return_value = agent client = TestClient(app_with_chat) create_resp = client.post("/api/v1/chat/sessions", json={"agent_name": "test-agent"}) session_id = create_resp.json()["session_id"] msg_resp = client.post( f"/api/v1/chat/sessions/{session_id}/messages", json={"content": "Hello", "execution_mode": "plan_exec"}, ) assert msg_resp.status_code == 500 assert "phase policy error" in msg_resp.json()["detail"] def test_rest_plan_exec_disabled_falls_through_to_react(self, app_with_chat, monkeypatch): """REST PLAN_EXEC with enabled=False → falls through to REACT path.""" from agentkit.server.routes import chat as chat_module app_with_chat.state.server_config.plan_exec = {"enabled": False} # Track which engine constructor fires. constructed: list = [] class _StubResult: output = "REACT fallback ok" status = "success" class _StubEngine: def __init__(self, **kwargs): constructed.append(kwargs) self._phase_policy = kwargs.get("phase_policy") async def execute(self, **kwargs): return _StubResult() monkeypatch.setattr(chat_module, "ReActEngine", _StubEngine) # execute_with_fallback_chain also constructs ReflexionEngine internally; # patch it to return a ChatExecutionResult-like directly. from agentkit.server._fallback_chain import ChatExecutionResult async def _stub_chain(**kwargs): return ChatExecutionResult(output="REACT fallback ok", status="success") monkeypatch.setattr(chat_module, "execute_with_fallback_chain", _stub_chain) agent = _make_agent_mock() app_with_chat.state.agent_pool.get_agent.return_value = agent client = TestClient(app_with_chat) create_resp = client.post("/api/v1/chat/sessions", json={"agent_name": "test-agent"}) session_id = create_resp.json()["session_id"] msg_resp = client.post( f"/api/v1/chat/sessions/{session_id}/messages", json={"content": "Hello", "execution_mode": "plan_exec"}, ) assert msg_resp.status_code == 200 assert msg_resp.json()["content"] == "REACT fallback ok" # No engine should have been constructed with phase_policy — PLAN_EXEC # was disabled and the REACT path doesn't set phase_policy. assert all(kw.get("phase_policy") is None for kw in constructed) def test_rest_react_mode_still_works(self, client): """REST send_message without execution_mode doesn't 500.""" create_resp = client.post("/api/v1/chat/sessions", json={"agent_name": "test-agent"}) session_id = create_resp.json()["session_id"] # No execution_mode field → should NOT trigger PLAN_EXEC path. # Will likely 500 due to mock llm_gateway, but must NOT be a PLAN_EXEC error. msg_resp = client.post( f"/api/v1/chat/sessions/{session_id}/messages", json={"content": "Hello"}, ) # 500 is acceptable (mock gateway), but it must NOT be the PLAN_EXEC error. assert msg_resp.status_code != 501, "REACT fallback should not return 501" if msg_resp.status_code == 500: assert "phase policy error" not in msg_resp.json().get("detail", "") # --------------------------------------------------------------------------- # Characterization — REWOO still falls back to REACT (no regression) # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_rewoo_still_falls_back_to_react_without_phase_policy(app_with_chat): """Characterization: REWOO via WebSocket → no phase_policy (falls back to REACT).""" from agentkit.server.routes import chat as chat_module agent = _make_agent_mock() routing = _make_routing(execution_mode=ExecutionMode.REWOO) _setup_routing(app_with_chat, routing, agent) sm = _make_session_manager_mock() ws = _make_websocket_mock(app_with_chat) captured_engine_kwargs: dict = {} class _StubEngine: def __init__(self, **kwargs): captured_engine_kwargs.update(kwargs) self._phase_policy = kwargs.get("phase_policy") self._current_phase = None @property def current_phase(self): return self._current_phase def reset(self): pass async def execute_stream(self, **kwargs): return yield # async generator marker with pytest.MonkeyPatch().context() as mp: mp.setattr(chat_module, "ReActEngine", _StubEngine) await chat_module._handle_chat_message( websocket=ws, session_id="test-session", content="test", sm=sm, cancellation_token=MagicMock(), pending_replies={}, pending_confirmations=None, ) # REWOO should NOT build a phase_policy assert captured_engine_kwargs.get("phase_policy") is None # --------------------------------------------------------------------------- # Happy path — PLAN_EXEC builds phase policy + registers AdvancePhaseTool # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_plan_exec_builds_phase_policy_and_registers_advance_phase_tool( app_with_chat, ): """PLAN_EXEC via WebSocket → engine with phase_policy, AdvancePhaseTool registered.""" from agentkit.server.routes import chat as chat_module agent = _make_agent_mock() routing = _make_routing(execution_mode=ExecutionMode.PLAN_EXEC) _setup_routing(app_with_chat, routing, agent) sm = _make_session_manager_mock() sm.get_chat_messages = AsyncMock(return_value=[{"role": "user", "content": "test"}]) ws = _make_websocket_mock(app_with_chat) captured_engine: list = [] captured_tools: list = [] class _StubEngine: def __init__(self, **kwargs): self._phase_policy = kwargs.get("phase_policy") self._current_phase = ( kwargs.get("phase_policy").start_phase if kwargs.get("phase_policy") else None ) @property def current_phase(self): return self._current_phase def reset(self): pass async def execute_stream(self, **kwargs): captured_tools.extend(kwargs.get("tools", [])) captured_engine.append(self) return yield with pytest.MonkeyPatch().context() as mp: mp.setattr(chat_module, "ReActEngine", _StubEngine) await chat_module._handle_chat_message( websocket=ws, session_id="test-session", content="test", sm=sm, cancellation_token=MagicMock(), pending_replies={}, pending_confirmations=None, ) assert len(captured_engine) == 1 engine = captured_engine[0] assert engine._phase_policy is not None assert engine._current_phase == PhaseState.PLANNING # AdvancePhaseTool was registered in the tools list assert any(isinstance(t, AdvancePhaseTool) for t in captured_tools) # --------------------------------------------------------------------------- # Edge cases # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_plan_exec_empty_config_uses_default_policy(app_with_chat): """Edge: plan_exec config absent (empty dict) → default_policy() used.""" from agentkit.server.routes import chat as chat_module app_with_chat.state.server_config.plan_exec = {} agent = _make_agent_mock() routing = _make_routing(execution_mode=ExecutionMode.PLAN_EXEC) _setup_routing(app_with_chat, routing, agent) sm = _make_session_manager_mock() ws = _make_websocket_mock(app_with_chat) captured_policy: list = [] class _StubEngine: def __init__(self, **kwargs): captured_policy.append(kwargs.get("phase_policy")) self._phase_policy = kwargs.get("phase_policy") self._current_phase = ( kwargs.get("phase_policy").start_phase if kwargs.get("phase_policy") else None ) @property def current_phase(self): return self._current_phase def reset(self): pass async def execute_stream(self, **kwargs): return yield with pytest.MonkeyPatch().context() as mp: mp.setattr(chat_module, "ReActEngine", _StubEngine) await chat_module._handle_chat_message( websocket=ws, session_id="test-session", content="test", sm=sm, cancellation_token=MagicMock(), pending_replies={}, pending_confirmations=None, ) assert len(captured_policy) == 1 assert captured_policy[0] is not None # Default policy: PLANNING allows search but not write_file assert "search" in captured_policy[0].whitelist[PhaseState.PLANNING] assert "write_file" not in captured_policy[0].whitelist[PhaseState.PLANNING] @pytest.mark.asyncio async def test_plan_exec_disabled_falls_back_to_react(app_with_chat): """Edge: plan_exec.enabled=False → falls back to REACT (no phase_policy).""" from agentkit.server.routes import chat as chat_module app_with_chat.state.server_config.plan_exec = {"enabled": False} agent = _make_agent_mock() routing = _make_routing(execution_mode=ExecutionMode.PLAN_EXEC) _setup_routing(app_with_chat, routing, agent) sm = _make_session_manager_mock() ws = _make_websocket_mock(app_with_chat) captured_engine_kwargs: dict = {} class _StubEngine: def __init__(self, **kwargs): captured_engine_kwargs.update(kwargs) self._phase_policy = kwargs.get("phase_policy") self._current_phase = None @property def current_phase(self): return self._current_phase def reset(self): pass async def execute_stream(self, **kwargs): return yield with pytest.MonkeyPatch().context() as mp: mp.setattr(chat_module, "ReActEngine", _StubEngine) await chat_module._handle_chat_message( websocket=ws, session_id="test-session", content="test", sm=sm, cancellation_token=MagicMock(), pending_replies={}, pending_confirmations=None, ) # enabled=False → no phase_policy (falls back to REACT) assert captured_engine_kwargs.get("phase_policy") is None # --------------------------------------------------------------------------- # Error path # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_plan_exec_bad_config_sends_error_and_returns(app_with_chat): """Error: phase policy construction fails → error event sent, returns early.""" from agentkit.server.routes import chat as chat_module app_with_chat.state.server_config.plan_exec = {"start_phase": "invalid_phase_name"} agent = _make_agent_mock() routing = _make_routing(execution_mode=ExecutionMode.PLAN_EXEC) _setup_routing(app_with_chat, routing, agent) sm = _make_session_manager_mock() ws = _make_websocket_mock(app_with_chat) engine_constructor_called = [] class _StubEngine: def __init__(self, **kwargs): engine_constructor_called.append(kwargs) async def execute_stream(self, **kwargs): yield with pytest.MonkeyPatch().context() as mp: mp.setattr(chat_module, "ReActEngine", _StubEngine) await chat_module._handle_chat_message( websocket=ws, session_id="test-session", content="test", sm=sm, cancellation_token=MagicMock(), pending_replies={}, pending_confirmations=None, ) sent_messages = [call.args[0] for call in ws.send_json.call_args_list] error_messages = [m for m in sent_messages if m.get("type") == "error"] assert len(error_messages) == 1 assert "phase policy error" in error_messages[0]["data"]["message"] # Engine constructor was NOT called (returned early) assert len(engine_constructor_called) == 0 # --------------------------------------------------------------------------- # phase_changed event emission # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_phase_changed_event_emitted_on_transition(app_with_chat): """phase_changed event sent when current_phase changes during execute_stream.""" from agentkit.server.routes import chat as chat_module app_with_chat.state.server_config.plan_exec = {} agent = _make_agent_mock() routing = _make_routing(execution_mode=ExecutionMode.PLAN_EXEC) _setup_routing(app_with_chat, routing, agent) sm = _make_session_manager_mock() sm.get_chat_messages = AsyncMock(return_value=[{"role": "user", "content": "go"}]) ws = _make_websocket_mock(app_with_chat) class _StubEngine: def __init__(self, **kwargs): self._phase_policy = kwargs.get("phase_policy") self._current_phase = PhaseState.PLANNING @property def current_phase(self): return self._current_phase def reset(self): pass async def execute_stream(self, **kwargs): from agentkit.core.react import ReActEvent yield ReActEvent( event_type="tool_call", step=1, data={"tool": "search", "output": "ok"}, ) # Simulate phase transition (as if AdvancePhaseTool was called) self._current_phase = PhaseState.BUILDING yield ReActEvent( event_type="final_answer", step=2, data={"output": "done"}, ) with pytest.MonkeyPatch().context() as mp: mp.setattr(chat_module, "ReActEngine", _StubEngine) await chat_module._handle_chat_message( websocket=ws, session_id="test-session", content="go", sm=sm, cancellation_token=MagicMock(), pending_replies={}, pending_confirmations=None, ) sent_messages = [call.args[0] for call in ws.send_json.call_args_list] phase_events = [m for m in sent_messages if m.get("type") == "phase_changed"] assert len(phase_events) == 1 assert phase_events[0]["data"]["phase"] == "building" assert phase_events[0]["data"]["previous"] == "planning" @pytest.mark.asyncio async def test_no_phase_changed_event_when_not_plan_exec(app_with_chat): """Characterization: REACT mode → no phase_changed events.""" from agentkit.server.routes import chat as chat_module agent = _make_agent_mock() routing = _make_routing(execution_mode=ExecutionMode.REACT) _setup_routing(app_with_chat, routing, agent) sm = _make_session_manager_mock() sm.get_chat_messages = AsyncMock(return_value=[{"role": "user", "content": "hi"}]) ws = _make_websocket_mock(app_with_chat) class _StubEngine: def __init__(self, **kwargs): self._phase_policy = None self._current_phase = None @property def current_phase(self): return None def reset(self): pass async def execute_stream(self, **kwargs): from agentkit.core.react import ReActEvent yield ReActEvent(event_type="final_answer", step=1, data={"output": "hi"}) with pytest.MonkeyPatch().context() as mp: mp.setattr(chat_module, "ReActEngine", _StubEngine) await chat_module._handle_chat_message( websocket=ws, session_id="test-session", content="hi", sm=sm, cancellation_token=MagicMock(), pending_replies={}, pending_confirmations=None, ) sent_messages = [call.args[0] for call in ws.send_json.call_args_list] phase_events = [m for m in sent_messages if m.get("type") == "phase_changed"] assert len(phase_events) == 0 # --------------------------------------------------------------------------- # Wave 4 U2 — phase_violation event forwarding # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_phase_violation_event_forwarded_to_client(app_with_chat): """When ReActEngine yields a phase_violation ReActEvent, chat.py WS handler must forward it as a `{"type": "phase_violation", "data": ...}` WS message so the frontend PhaseIndicator can react.""" from agentkit.server.routes import chat as chat_module app_with_chat.state.server_config.plan_exec = {} agent = _make_agent_mock() routing = _make_routing(execution_mode=ExecutionMode.PLAN_EXEC) _setup_routing(app_with_chat, routing, agent) sm = _make_session_manager_mock() sm.get_chat_messages = AsyncMock(return_value=[{"role": "user", "content": "go"}]) ws = _make_websocket_mock(app_with_chat) class _StubEngine: def __init__(self, **kwargs): self._phase_policy = kwargs.get("phase_policy") self._current_phase = PhaseState.PLANNING @property def current_phase(self): return self._current_phase def reset(self): pass async def execute_stream(self, **kwargs): from agentkit.core.react import ReActEvent # Simulate: tool_call → tool_result (blocked) → phase_violation yield ReActEvent( event_type="tool_call", step=1, data={"tool_name": "write_file", "arguments": {"path": "/x"}}, ) yield ReActEvent( event_type="tool_result", step=1, data={ "tool_name": "write_file", "result": {"error": "phase_violation", "is_error": True}, }, ) yield ReActEvent( event_type="phase_violation", step=1, data={ "error": "phase_violation", "message": "Tool 'write_file' not allowed in planning phase.", "current_phase": "planning", "tool": "write_file", "is_error": True, "violation_kind": "tool_not_allowed", }, ) yield ReActEvent( event_type="final_answer", step=2, data={"output": "done"}, ) with pytest.MonkeyPatch().context() as mp: mp.setattr(chat_module, "ReActEngine", _StubEngine) await chat_module._handle_chat_message( websocket=ws, session_id="test-session", content="go", sm=sm, cancellation_token=MagicMock(), pending_replies={}, pending_confirmations=None, ) sent_messages = [call.args[0] for call in ws.send_json.call_args_list] violation_messages = [m for m in sent_messages if m.get("type") == "phase_violation"] assert len(violation_messages) == 1 v = violation_messages[0]["data"] assert v["error"] == "phase_violation" assert v["tool"] == "write_file" assert v["current_phase"] == "planning" assert v["violation_kind"] == "tool_not_allowed" @pytest.mark.asyncio async def test_no_phase_violation_event_when_not_plan_exec(app_with_chat): """Characterization: REACT mode → no phase_violation events forwarded (the engine never yields them without a phase_policy).""" from agentkit.server.routes import chat as chat_module agent = _make_agent_mock() routing = _make_routing(execution_mode=ExecutionMode.REACT) _setup_routing(app_with_chat, routing, agent) sm = _make_session_manager_mock() sm.get_chat_messages = AsyncMock(return_value=[{"role": "user", "content": "hi"}]) ws = _make_websocket_mock(app_with_chat) class _StubEngine: def __init__(self, **kwargs): self._phase_policy = None self._current_phase = None @property def current_phase(self): return None def reset(self): pass async def execute_stream(self, **kwargs): from agentkit.core.react import ReActEvent # REACT mode: no phase_violation events yielded. yield ReActEvent(event_type="final_answer", step=1, data={"output": "hi"}) with pytest.MonkeyPatch().context() as mp: mp.setattr(chat_module, "ReActEngine", _StubEngine) await chat_module._handle_chat_message( websocket=ws, session_id="test-session", content="hi", sm=sm, cancellation_token=MagicMock(), pending_replies={}, pending_confirmations=None, ) sent_messages = [call.args[0] for call in ws.send_json.call_args_list] violation_messages = [m for m in sent_messages if m.get("type") == "phase_violation"] assert len(violation_messages) == 0 # --------------------------------------------------------------------------- # _build_phase_engine helper (U3) # --------------------------------------------------------------------------- class TestBuildPhaseEngineHelper: """Direct unit tests for the _build_phase_engine helper extracted in U3.""" def test_returns_none_when_not_plan_exec(self): from agentkit.server.routes.chat import _build_phase_engine engine, tools, err = _build_phase_engine( server_config=None, llm_gateway=MagicMock(), execution_mode=ExecutionMode.REACT, base_tools=[], ) assert engine is None assert tools is None assert err is None def test_returns_none_when_plan_exec_disabled_by_config(self): from agentkit.server.routes.chat import _build_phase_engine server_config = MagicMock() server_config.plan_exec = {"enabled": False} engine, tools, err = _build_phase_engine( server_config=server_config, llm_gateway=MagicMock(), execution_mode=ExecutionMode.PLAN_EXEC, base_tools=[], ) assert engine is None assert tools is None assert err is None def test_returns_none_when_plan_exec_section_absent(self): """Empty plan_exec config → default_policy() used, engine built.""" from agentkit.server.routes.chat import _build_phase_engine server_config = MagicMock() server_config.plan_exec = {} engine, tools, err = _build_phase_engine( server_config=server_config, llm_gateway=MagicMock(), execution_mode=ExecutionMode.PLAN_EXEC, base_tools=[], ) assert engine is not None assert tools is not None assert err is None # Default policy: PLANNING allows search, blocks write_file assert "search" in engine._phase_policy.whitelist[PhaseState.PLANNING] assert "write_file" not in engine._phase_policy.whitelist[PhaseState.PLANNING] def test_returns_error_when_phase_policy_invalid(self): from agentkit.server.routes.chat import _build_phase_engine server_config = MagicMock() server_config.plan_exec = {"start_phase": "invalid_phase_name"} engine, tools, err = _build_phase_engine( server_config=server_config, llm_gateway=MagicMock(), execution_mode=ExecutionMode.PLAN_EXEC, base_tools=[], ) assert engine is None assert tools is None assert err is not None assert "phase policy error" in err def test_appends_advance_phase_tool_to_tools(self): from agentkit.server.routes.chat import _build_phase_engine server_config = MagicMock() server_config.plan_exec = {} base_tool = MagicMock() engine, tools, err = _build_phase_engine( server_config=server_config, llm_gateway=MagicMock(), execution_mode=ExecutionMode.PLAN_EXEC, base_tools=[base_tool], ) assert err is None assert engine is not None assert tools is not None # base_tool preserved + AdvancePhaseTool appended assert len(tools) == 2 assert tools[0] is base_tool assert isinstance(tools[1], AdvancePhaseTool) def test_engine_uses_default_policy_when_config_returns_none(self, monkeypatch): """policy_from_config returning None → default_policy() used.""" from agentkit.server.routes import chat as chat_module def _stub_policy_from_config(cfg): return None monkeypatch.setattr(chat_module, "policy_from_config", _stub_policy_from_config) server_config = MagicMock() server_config.plan_exec = {"enabled": True} engine, tools, err = chat_module._build_phase_engine( server_config=server_config, llm_gateway=MagicMock(), execution_mode=ExecutionMode.PLAN_EXEC, base_tools=[], ) assert err is None assert engine is not None assert engine._phase_policy is not None # Default policy's start phase is PLANNING assert engine._current_phase == PhaseState.PLANNING