feat(U3): extract _build_phase_engine helper + wire REST PLAN_EXEC

Extract the WS path's inline phase_policy construction into a shared
_build_phase_engine helper so the REST send_message endpoint can reuse
it. Replace the former 501 stub with actual PLAN_EXEC execution:

- REST POST /chat/sessions/{id}/messages with execution_mode=plan_exec
  now builds a phase-policy-backed ReActEngine, calls execute()
  (non-streaming), and returns a MessageResponse.
- KTD5: PLAN_EXEC bypasses execute_with_fallback_chain — phase policy
  and fallback chain are mutually exclusive.
- When plan_exec.enabled=False, REST falls through to the REACT path
  (matching WS behavior).
- WS path refactored to call the same helper; behavior unchanged.

Tests:
- Replace TestRestPlanExec501 with TestRestPlanExec (happy path, bad
  config → 500, disabled → falls through to REACT, REACT mode unchanged).
- Add TestBuildPhaseEngineHelper covering all return branches:
  not-PLAN_EXEC, disabled, empty-config, invalid-config, tool append,
  default-policy fallback.
- All 109 tests pass across the three PLAN_EXEC test files.
This commit is contained in:
chiguyong 2026-06-30 10:59:43 +08:00
parent 4dc58c24bc
commit b032e08866
2 changed files with 368 additions and 59 deletions

View File

@ -25,7 +25,7 @@ from fastapi.responses import FileResponse
from pydantic import BaseModel
from agentkit.chat.skill_routing import ExecutionMode
from agentkit.core.phase import PhasePolicy, default_policy, policy_from_config
from agentkit.core.phase import default_policy, policy_from_config
from agentkit.core.protocol import CancellationToken
from agentkit.core.react import ReActEngine
from agentkit.server._fallback_chain import execute_with_fallback_chain
@ -534,6 +534,69 @@ def _message_to_response(msg) -> MessageResponse:
)
def _build_phase_engine(
*,
server_config: Any,
llm_gateway: Any,
execution_mode: ExecutionMode,
base_tools: list,
session_id: str = "",
) -> tuple[ReActEngine | None, list | None, str | None]:
"""Build a PLAN_EXEC engine with PhasePolicy + AdvancePhaseTool.
Encapsulates the WS path's phase_policy construction so the REST path
can reuse it without duplicating config-lookup + policy_from_config +
AdvancePhaseTool registration. KTD5: PLAN_EXEC bypasses the fallback
chain callers must NOT route the returned engine through
``execute_with_fallback_chain``.
Args:
server_config: ``app.state.server_config`` (or None for tests).
llm_gateway: ``app.state.llm_gateway``.
execution_mode: routing.execution_mode (WS) or PLAN_EXEC (REST).
base_tools: routing.tools (WS) or agent tool list (REST).
session_id: included in log lines for traceability only.
Returns ``(engine, tools_with_advance_phase, error_message)``:
- execution_mode != PLAN_EXEC ``(None, None, None)`` (fall back to REACT).
- plan_exec.enabled=False ``(None, None, None)`` (fall back to REACT).
- phase policy construction failed ``(None, None, error_message)``.
- PLAN_EXEC engaged ``(engine, tools_with_advance_phase, None)``.
"""
if execution_mode != ExecutionMode.PLAN_EXEC:
return (None, None, None)
plan_exec_cfg = getattr(server_config, "plan_exec", None) or {}
if plan_exec_cfg.get("enabled", True) is False:
logger.info(
"PLAN_EXEC disabled by config (plan_exec.enabled=False), "
"falling back to REACT for session %s",
session_id,
)
return (None, None, None)
try:
phase_policy = policy_from_config(plan_exec_cfg)
if phase_policy is None:
# Empty config (no `plan_exec:` section) → use KTD5 defaults.
phase_policy = default_policy()
except Exception as e:
logger.error(
"PLAN_EXEC phase policy construction failed for session %s: %s",
session_id,
e,
)
return (None, None, f"phase policy error: {str(e)[:200]}")
engine = ReActEngine(
llm_gateway=llm_gateway,
phase_policy=phase_policy,
)
advance_phase_tool = AdvancePhaseTool(engine=engine)
tools_with_advance_phase = list(base_tools) + [advance_phase_tool]
return (engine, tools_with_advance_phase, None)
# ── REST endpoints ────────────────────────────────────────────────────
@ -587,12 +650,58 @@ async def send_message(session_id: str, request: SendMessageRequest, req: Reques
if session.status == SessionStatus.CLOSED:
raise HTTPException(status_code=400, detail=f"Session '{session_id}' is closed")
# KTD4: PLAN_EXEC is wired only at the WebSocket path. REST raises 501.
# U3: PLAN_EXEC via REST — non-streaming, bypasses the fallback chain
# (KTD5: PLAN_EXEC and execute_with_fallback_chain are mutually exclusive).
# When plan_exec is disabled by config, falls through to the REACT path below.
if request.execution_mode == "plan_exec":
raise HTTPException(
status_code=501,
detail="PLAN_EXEC via REST not yet supported; use WebSocket",
# Resolve the Agent early — PLAN_EXEC needs its tool list + system prompt.
pool = req.app.state.agent_pool
agent = pool.get_agent(session.agent_name)
if agent is None:
raise HTTPException(status_code=404, detail=f"Agent '{session.agent_name}' not found")
plan_exec_engine, plan_exec_tools, plan_exec_error = _build_phase_engine(
server_config=getattr(req.app.state, "server_config", None),
llm_gateway=req.app.state.llm_gateway,
execution_mode=ExecutionMode.PLAN_EXEC,
base_tools=agent._tool_registry.list_tools() if agent._tool_registry else [],
session_id=session_id,
)
if plan_exec_error is not None:
raise HTTPException(status_code=500, detail=plan_exec_error)
if plan_exec_engine is not None:
# PLAN_EXEC engaged — append user msg, execute non-streaming, return.
await sm.append_message(
session_id=session_id,
role=MessageRole.USER,
content=request.content,
)
chat_messages = await sm.get_chat_messages(session_id)
system_prompt = getattr(agent, "_system_prompt", None) or (
agent.get_system_prompt() if hasattr(agent, "get_system_prompt") else None
)
try:
plan_exec_result = await plan_exec_engine.execute(
messages=chat_messages,
tools=plan_exec_tools,
model=agent.get_model()
if hasattr(agent, "get_model")
else getattr(agent, "_llm_model", "default"),
agent_name=agent.name,
system_prompt=system_prompt,
)
except Exception as e:
logger.error(f"PLAN_EXEC execution error for session {session_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
assistant_msg = await sm.append_message(
session_id=session_id,
role=MessageRole.ASSISTANT,
content=plan_exec_result.output,
agent_name=agent.name,
)
return _message_to_response(assistant_msg)
# else: plan_exec.enabled=False → fall through to REACT path below.
# Append user message
await sm.append_message(
@ -1090,42 +1199,27 @@ async def _handle_chat_message(
await websocket.send_json({"type": "error", "data": {"message": str(e)[:200]}})
return
# U4/G6: PLAN_EXEC — build PhasePolicy from server config (KTD4: WebSocket only).
# U4/G6: PLAN_EXEC — build PhasePolicy from server config.
# KTD5 (Wave 2): fallback chain NOT applied to PLAN_EXEC — phase policy and
# fallback chain are mutually exclusive. PLAN_EXEC uses its own engine.
phase_policy: PhasePolicy | None = None
if routing.execution_mode == ExecutionMode.PLAN_EXEC:
server_config = getattr(websocket.app.state, "server_config", None)
plan_exec_cfg = getattr(server_config, "plan_exec", None) or {}
if plan_exec_cfg.get("enabled", True) is False:
# Explicit opt-out → fall back to REACT.
logger.info(
"PLAN_EXEC disabled by config (plan_exec.enabled=False), "
"falling back to REACT for session %s",
session_id,
)
else:
try:
phase_policy = policy_from_config(plan_exec_cfg)
if phase_policy is None:
# Empty config (no `plan_exec:` section) → use KTD5 defaults.
phase_policy = default_policy()
except Exception as e:
logger.error(
"PLAN_EXEC phase policy construction failed for session %s: %s",
session_id,
e,
)
await websocket.send_json(
{
"type": "error",
# Truncate to 200 chars to match nearby error paths and
# avoid leaking config internals (see chat.py:1090, 1320).
"data": {"message": f"phase policy error: {str(e)[:200]}"},
}
)
return
# U3: logic extracted into _build_phase_engine so REST can reuse it.
plan_exec_engine, plan_exec_tools, plan_exec_error = _build_phase_engine(
server_config=getattr(websocket.app.state, "server_config", None),
llm_gateway=websocket.app.state.llm_gateway,
execution_mode=routing.execution_mode,
base_tools=routing.tools,
session_id=session_id,
)
if plan_exec_error is not None:
await websocket.send_json(
{
"type": "error",
# Truncate to 200 chars to match nearby error paths and
# avoid leaking config internals (see chat.py:1090, 1320).
"data": {"message": plan_exec_error},
}
)
return
# Handle advanced execution modes: REWOO/REFLEXION/TEAM_COLLAB
# still fall back to REACT with a warning. PLAN_EXEC is handled above.
@ -1143,14 +1237,9 @@ async def _handle_chat_message(
# Reuse Agent's ReActEngine if available (U2: Chat pipeline optimization).
# PLAN_EXEC creates a fresh engine with phase_policy set (cannot reuse the
# agent's _react_engine — it has no policy).
if phase_policy is not None:
react_engine = ReActEngine(
llm_gateway=websocket.app.state.llm_gateway,
phase_policy=phase_policy,
)
# Register AdvancePhaseTool bound to this engine (LLM's escape hatch).
advance_phase_tool = AdvancePhaseTool(engine=react_engine)
routing.tools = list(routing.tools) + [advance_phase_tool]
if plan_exec_engine is not None:
react_engine = plan_exec_engine
routing.tools = plan_exec_tools
else:
react_engine = getattr(agent, "_react_engine", None)
if react_engine is None:

View File

@ -1,10 +1,12 @@
"""Unit tests for PLAN_EXEC wiring at chat.py WebSocket path (G6, U4).
"""Unit tests for PLAN_EXEC wiring at chat.py REST + WebSocket paths (G6, U3, U4).
Per plan U4 Execution note: characterization-first verify that existing
REWOO/REFLEXION/TEAM_COLLAB modes still fall back to REACT with the warning
(no regression). Then add PLAN_EXEC wiring tests.
KTD4: PLAN_EXEC is wired only at the WebSocket path; REST raises HTTP 501.
U3: PLAN_EXEC is now wired at both REST and WebSocket paths. REST returns
a non-streaming MessageResponse; WS streams phase_violation events alongside
the LLM reinjection. KTD5: PLAN_EXEC bypasses the fallback chain.
"""
from __future__ import annotations
@ -109,13 +111,60 @@ def _setup_routing(app, routing: SkillRoutingResult, agent: MagicMock) -> None:
# ---------------------------------------------------------------------------
# REST — PLAN_EXEC raises 501 (KTD4)
# REST — PLAN_EXEC wired (U3, replaces former 501 path)
# ---------------------------------------------------------------------------
class TestRestPlanExec501:
def test_rest_plan_exec_returns_501(self, client):
"""REST send_message with execution_mode=plan_exec → 501."""
class TestRestPlanExec:
"""U3: REST send_message with execution_mode=plan_exec now executes
PLAN_EXEC (non-streaming) instead of raising 501."""
def test_rest_plan_exec_returns_assistant_message(self, app_with_chat, monkeypatch):
"""REST PLAN_EXEC happy path → 200 with assistant message."""
from agentkit.server.routes import chat as chat_module
# Patch ReActEngine with a stub whose execute() returns a ReActResult-like.
class _StubResult:
output = "PLAN_EXEC completed"
status = "success"
class _StubEngine:
def __init__(self, **kwargs):
self._phase_policy = kwargs.get("phase_policy")
self._current_phase = (
kwargs.get("phase_policy").start_phase if kwargs.get("phase_policy") else None
)
async def execute(self, **kwargs):
return _StubResult()
monkeypatch.setattr(chat_module, "ReActEngine", _StubEngine)
# Wire agent_pool with a mock agent that has _tool_registry.
agent = _make_agent_mock()
app_with_chat.state.agent_pool.get_agent.return_value = agent
client = TestClient(app_with_chat)
create_resp = client.post("/api/v1/chat/sessions", json={"agent_name": "test-agent"})
session_id = create_resp.json()["session_id"]
msg_resp = client.post(
f"/api/v1/chat/sessions/{session_id}/messages",
json={"content": "Build me a hello world", "execution_mode": "plan_exec"},
)
assert msg_resp.status_code == 200
body = msg_resp.json()
assert body["content"] == "PLAN_EXEC completed"
assert body["role"] == "assistant"
def test_rest_plan_exec_bad_config_returns_500(self, app_with_chat):
"""REST PLAN_EXEC with invalid phase config → 500 with error detail."""
app_with_chat.state.server_config.plan_exec = {"start_phase": "invalid_phase_name"}
agent = _make_agent_mock()
app_with_chat.state.agent_pool.get_agent.return_value = agent
client = TestClient(app_with_chat)
create_resp = client.post("/api/v1/chat/sessions", json={"agent_name": "test-agent"})
session_id = create_resp.json()["session_id"]
@ -123,20 +172,71 @@ class TestRestPlanExec501:
f"/api/v1/chat/sessions/{session_id}/messages",
json={"content": "Hello", "execution_mode": "plan_exec"},
)
assert msg_resp.status_code == 501
assert "PLAN_EXEC via REST not yet supported" in msg_resp.json()["detail"]
assert msg_resp.status_code == 500
assert "phase policy error" in msg_resp.json()["detail"]
def test_rest_react_mode_still_works(self, client):
"""REST send_message without execution_mode doesn't 501."""
def test_rest_plan_exec_disabled_falls_through_to_react(self, app_with_chat, monkeypatch):
"""REST PLAN_EXEC with enabled=False → falls through to REACT path."""
from agentkit.server.routes import chat as chat_module
app_with_chat.state.server_config.plan_exec = {"enabled": False}
# Track which engine constructor fires.
constructed: list = []
class _StubResult:
output = "REACT fallback ok"
status = "success"
class _StubEngine:
def __init__(self, **kwargs):
constructed.append(kwargs)
self._phase_policy = kwargs.get("phase_policy")
async def execute(self, **kwargs):
return _StubResult()
monkeypatch.setattr(chat_module, "ReActEngine", _StubEngine)
# execute_with_fallback_chain also constructs ReflexionEngine internally;
# patch it to return a ChatExecutionResult-like directly.
from agentkit.server._fallback_chain import ChatExecutionResult
async def _stub_chain(**kwargs):
return ChatExecutionResult(output="REACT fallback ok", status="success")
monkeypatch.setattr(chat_module, "execute_with_fallback_chain", _stub_chain)
agent = _make_agent_mock()
app_with_chat.state.agent_pool.get_agent.return_value = agent
client = TestClient(app_with_chat)
create_resp = client.post("/api/v1/chat/sessions", json={"agent_name": "test-agent"})
session_id = create_resp.json()["session_id"]
# No execution_mode field → should NOT trigger 501.
msg_resp = client.post(
f"/api/v1/chat/sessions/{session_id}/messages",
json={"content": "Hello", "execution_mode": "plan_exec"},
)
assert msg_resp.status_code == 200
assert msg_resp.json()["content"] == "REACT fallback ok"
# No engine should have been constructed with phase_policy — PLAN_EXEC
# was disabled and the REACT path doesn't set phase_policy.
assert all(kw.get("phase_policy") is None for kw in constructed)
def test_rest_react_mode_still_works(self, client):
"""REST send_message without execution_mode doesn't 500."""
create_resp = client.post("/api/v1/chat/sessions", json={"agent_name": "test-agent"})
session_id = create_resp.json()["session_id"]
# No execution_mode field → should NOT trigger PLAN_EXEC path.
# Will likely 500 due to mock llm_gateway, but must NOT be a PLAN_EXEC error.
msg_resp = client.post(
f"/api/v1/chat/sessions/{session_id}/messages",
json={"content": "Hello"},
)
assert msg_resp.status_code != 501
# 500 is acceptable (mock gateway), but it must NOT be the PLAN_EXEC error.
if msg_resp.status_code == 500:
assert "phase policy error" not in msg_resp.json().get("detail", "")
# ---------------------------------------------------------------------------
@ -671,3 +771,123 @@ async def test_no_phase_violation_event_when_not_plan_exec(app_with_chat):
sent_messages = [call.args[0] for call in ws.send_json.call_args_list]
violation_messages = [m for m in sent_messages if m.get("type") == "phase_violation"]
assert len(violation_messages) == 0
# ---------------------------------------------------------------------------
# _build_phase_engine helper (U3)
# ---------------------------------------------------------------------------
class TestBuildPhaseEngineHelper:
"""Direct unit tests for the _build_phase_engine helper extracted in U3."""
def test_returns_none_when_not_plan_exec(self):
from agentkit.server.routes.chat import _build_phase_engine
engine, tools, err = _build_phase_engine(
server_config=None,
llm_gateway=MagicMock(),
execution_mode=ExecutionMode.REACT,
base_tools=[],
)
assert engine is None
assert tools is None
assert err is None
def test_returns_none_when_plan_exec_disabled_by_config(self):
from agentkit.server.routes.chat import _build_phase_engine
server_config = MagicMock()
server_config.plan_exec = {"enabled": False}
engine, tools, err = _build_phase_engine(
server_config=server_config,
llm_gateway=MagicMock(),
execution_mode=ExecutionMode.PLAN_EXEC,
base_tools=[],
)
assert engine is None
assert tools is None
assert err is None
def test_returns_none_when_plan_exec_section_absent(self):
"""Empty plan_exec config → default_policy() used, engine built."""
from agentkit.server.routes.chat import _build_phase_engine
server_config = MagicMock()
server_config.plan_exec = {}
engine, tools, err = _build_phase_engine(
server_config=server_config,
llm_gateway=MagicMock(),
execution_mode=ExecutionMode.PLAN_EXEC,
base_tools=[],
)
assert engine is not None
assert tools is not None
assert err is None
# Default policy: PLANNING allows search, blocks write_file
assert "search" in engine._phase_policy.whitelist[PhaseState.PLANNING]
assert "write_file" not in engine._phase_policy.whitelist[PhaseState.PLANNING]
def test_returns_error_when_phase_policy_invalid(self):
from agentkit.server.routes.chat import _build_phase_engine
server_config = MagicMock()
server_config.plan_exec = {"start_phase": "invalid_phase_name"}
engine, tools, err = _build_phase_engine(
server_config=server_config,
llm_gateway=MagicMock(),
execution_mode=ExecutionMode.PLAN_EXEC,
base_tools=[],
)
assert engine is None
assert tools is None
assert err is not None
assert "phase policy error" in err
def test_appends_advance_phase_tool_to_tools(self):
from agentkit.server.routes.chat import _build_phase_engine
server_config = MagicMock()
server_config.plan_exec = {}
base_tool = MagicMock()
engine, tools, err = _build_phase_engine(
server_config=server_config,
llm_gateway=MagicMock(),
execution_mode=ExecutionMode.PLAN_EXEC,
base_tools=[base_tool],
)
assert err is None
assert engine is not None
assert tools is not None
# base_tool preserved + AdvancePhaseTool appended
assert len(tools) == 2
assert tools[0] is base_tool
assert isinstance(tools[1], AdvancePhaseTool)
def test_engine_uses_default_policy_when_config_returns_none(self, monkeypatch):
"""policy_from_config returning None → default_policy() used."""
from agentkit.server.routes import chat as chat_module
def _stub_policy_from_config(cfg):
return None
monkeypatch.setattr(chat_module, "policy_from_config", _stub_policy_from_config)
server_config = MagicMock()
server_config.plan_exec = {"enabled": True}
engine, tools, err = chat_module._build_phase_engine(
server_config=server_config,
llm_gateway=MagicMock(),
execution_mode=ExecutionMode.PLAN_EXEC,
base_tools=[],
)
assert err is None
assert engine is not None
assert engine._phase_policy is not None
# Default policy's start phase is PLANNING
assert engine._current_phase == PhaseState.PLANNING