fischer-agentkit/tests/unit/test_team_collab_routing.py

595 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Unit tests for TEAM_COLLAB routing (U9, R7).
Verifies that ``ExecutionMode.TEAM_COLLAB`` reached via the non-@team-prefix
path (RequestPreprocessor / skill routing) surfaces an error to the user
instead of silently falling back to REACT. The @team prefix itself is handled
earlier by ``_execute_team_collab`` and is out of scope here — this test only
covers the routing decision at the fall-back block.
REWOO / REFLEXION-as-mode keep their deferred REACT fall-back (RV10).
"""
from __future__ import annotations
import logging
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from agentkit.chat.skill_routing import ExecutionMode, SkillRoutingResult
# ---------------------------------------------------------------------------
# Fixtures and helpers (mirrors test_chat_plan_exec_ws.py patterns)
# ---------------------------------------------------------------------------
REPO_ROOT = Path(__file__).resolve().parents[2]
AGENTS_MD = REPO_ROOT / "AGENTS.md"
TEAM_COLLAB_ERROR_HINT = "@team"
@pytest.fixture
def app_with_chat():
"""Create a FastAPI app with Chat routes and mocked dependencies."""
from fastapi import FastAPI
from agentkit.server.routes.chat import router
app = FastAPI()
app.include_router(router, prefix="/api/v1")
from agentkit.session.manager import SessionManager
from agentkit.session.store import InMemorySessionStore
app.state.session_manager = SessionManager(store=InMemorySessionStore())
app.state.llm_gateway = MagicMock()
app.state.agent_pool = MagicMock()
app.state.server_config = MagicMock()
app.state.server_config.api_key = None
app.state.server_config.plan_exec = {}
return app
def _make_routing(
execution_mode: ExecutionMode = ExecutionMode.REACT,
tools: list | None = None,
system_prompt: str | None = None,
) -> SkillRoutingResult:
"""Build a minimal SkillRoutingResult for testing."""
return SkillRoutingResult(
execution_mode=execution_mode,
tools=tools or [],
clean_content="test message",
model="default",
agent_name="test-agent",
system_prompt=system_prompt,
skill_name=None,
)
def _make_websocket_mock(app) -> MagicMock:
"""Build a mock WebSocket with app.state and async send_json."""
ws = MagicMock()
ws.app = app
ws.send_json = AsyncMock()
return ws
def _make_agent_mock() -> MagicMock:
"""Build a mock Agent with _tool_registry and _react_engine."""
agent = MagicMock()
agent.name = "test-agent"
agent._tool_registry = MagicMock()
agent._tool_registry.list_tools.return_value = []
agent._system_prompt = None
# _react_engine is None to force the code path that creates a new engine
agent._react_engine = None
agent.get_model.return_value = "default"
return agent
def _make_session_manager_mock() -> MagicMock:
"""Build a mock SessionManager with async methods."""
sm = MagicMock()
session = MagicMock()
session.agent_name = "test-agent"
session.status = "active"
sm.get_session = AsyncMock(return_value=session)
sm.get_chat_messages = AsyncMock(return_value=[])
sm.append_message = AsyncMock()
return sm
def _setup_routing(app, routing: SkillRoutingResult, agent: MagicMock) -> None:
"""Wire up app.state so _handle_chat_message finds the right routing."""
app.state.agent_pool.get_agent.return_value = agent
app.state.request_preprocessor = MagicMock()
app.state.request_preprocessor.preprocess = AsyncMock(return_value=routing)
class _ToolStub:
"""Minimal tool stub with a name attribute (for tool_names logging)."""
def __init__(self, name: str) -> None:
self.name = name
def _make_stub_engine_class(
constructed_engines: list,
stream_calls: list,
) -> type:
"""Build a stub ReActEngine subclass that records construction + stream calls.
The stub is a valid async generator (uses ``return; yield`` per project rule
so Python treats it as an async generator even when the body returns first).
"""
class _StubEngine:
def __init__(self, **kwargs) -> None:
constructed_engines.append(self)
self._phase_policy = kwargs.get("phase_policy")
self._current_phase = (
kwargs.get("phase_policy").start_phase if kwargs.get("phase_policy") else None
)
@property
def current_phase(self):
return self._current_phase
def reset(self) -> None:
pass
async def execute_stream(self, **kwargs):
stream_calls.append(kwargs)
return
yield # async generator marker (project rule)
return _StubEngine
def _sent_messages(ws: MagicMock) -> list[dict]:
return [call.args[0] for call in ws.send_json.call_args_list]
# ---------------------------------------------------------------------------
# Happy path — TEAM_COLLAB (non-prefix) surfaces error, no REACT fall-back
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_team_collab_non_prefix_sends_error_and_aborts(app_with_chat):
"""Happy path: TEAM_COLLAB without @team prefix → error with @team guidance,
execution aborted (no ReActEngine.execute_stream call)."""
from agentkit.server.routes import chat as chat_module
agent = _make_agent_mock()
routing = _make_routing(execution_mode=ExecutionMode.TEAM_COLLAB)
_setup_routing(app_with_chat, routing, agent)
sm = _make_session_manager_mock()
ws = _make_websocket_mock(app_with_chat)
constructed: list = []
stream_calls: list = []
stub_engine = _make_stub_engine_class(constructed, stream_calls)
with pytest.MonkeyPatch().context() as mp:
mp.setattr(chat_module, "ReActEngine", stub_engine)
await chat_module._handle_chat_message(
websocket=ws,
session_id="test-session",
content="test",
sm=sm,
cancellation_token=MagicMock(),
pending_replies={},
pending_confirmations=None,
)
sent = _sent_messages(ws)
error_messages = [m for m in sent if m.get("type") == "error"]
assert len(error_messages) == 1, f"expected exactly one error, got {sent}"
message = error_messages[0]["data"]["message"]
assert TEAM_COLLAB_ERROR_HINT in message, f"error message must mention @team: {message}"
# No REACT engine was constructed for execution (fall-back NOT taken)
assert len(constructed) == 0, "ReActEngine should not be constructed for TEAM_COLLAB"
assert len(stream_calls) == 0, "execute_stream must not be called for TEAM_COLLAB"
# ---------------------------------------------------------------------------
# Edge cases — other modes do NOT trigger the TEAM_COLLAB error block
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_react_mode_continues_without_team_collab_error(app_with_chat):
"""Edge: REACT mode → no TEAM_COLLAB error, normal execution continues."""
from agentkit.server.routes import chat as chat_module
agent = _make_agent_mock()
routing = _make_routing(execution_mode=ExecutionMode.REACT)
_setup_routing(app_with_chat, routing, agent)
sm = _make_session_manager_mock()
ws = _make_websocket_mock(app_with_chat)
constructed: list = []
stream_calls: list = []
stub_engine = _make_stub_engine_class(constructed, stream_calls)
with pytest.MonkeyPatch().context() as mp:
mp.setattr(chat_module, "ReActEngine", stub_engine)
await chat_module._handle_chat_message(
websocket=ws,
session_id="test-session",
content="test",
sm=sm,
cancellation_token=MagicMock(),
pending_replies={},
pending_confirmations=None,
)
sent = _sent_messages(ws)
team_errors = [
m
for m in sent
if m.get("type") == "error"
and TEAM_COLLAB_ERROR_HINT in m.get("data", {}).get("message", "")
]
assert len(team_errors) == 0, "REACT must not trigger TEAM_COLLAB error"
# REACT executes via the fallback path → engine constructed + stream called
assert len(stream_calls) == 1, "REACT should invoke execute_stream once"
@pytest.mark.asyncio
async def test_skill_react_mode_continues_without_team_collab_error(app_with_chat):
"""Edge: SKILL_REACT mode → no TEAM_COLLAB error, normal execution continues."""
from agentkit.server.routes import chat as chat_module
agent = _make_agent_mock()
routing = _make_routing(execution_mode=ExecutionMode.SKILL_REACT)
_setup_routing(app_with_chat, routing, agent)
sm = _make_session_manager_mock()
ws = _make_websocket_mock(app_with_chat)
constructed: list = []
stream_calls: list = []
stub_engine = _make_stub_engine_class(constructed, stream_calls)
with pytest.MonkeyPatch().context() as mp:
mp.setattr(chat_module, "ReActEngine", stub_engine)
await chat_module._handle_chat_message(
websocket=ws,
session_id="test-session",
content="test",
sm=sm,
cancellation_token=MagicMock(),
pending_replies={},
pending_confirmations=None,
)
sent = _sent_messages(ws)
team_errors = [
m
for m in sent
if m.get("type") == "error"
and TEAM_COLLAB_ERROR_HINT in m.get("data", {}).get("message", "")
]
assert len(team_errors) == 0, "SKILL_REACT must not trigger TEAM_COLLAB error"
assert len(stream_calls) == 1, "SKILL_REACT should invoke execute_stream once"
@pytest.mark.asyncio
async def test_plan_exec_mode_does_not_trigger_fallback_block(app_with_chat):
"""Edge: PLAN_EXEC → handled earlier, fall-back block must not trigger."""
from agentkit.server.routes import chat as chat_module
app_with_chat.state.server_config.plan_exec = {}
agent = _make_agent_mock()
routing = _make_routing(execution_mode=ExecutionMode.PLAN_EXEC)
_setup_routing(app_with_chat, routing, agent)
sm = _make_session_manager_mock()
sm.get_chat_messages = AsyncMock(return_value=[{"role": "user", "content": "test"}])
ws = _make_websocket_mock(app_with_chat)
constructed: list = []
stream_calls: list = []
stub_engine = _make_stub_engine_class(constructed, stream_calls)
with pytest.MonkeyPatch().context() as mp:
mp.setattr(chat_module, "ReActEngine", stub_engine)
await chat_module._handle_chat_message(
websocket=ws,
session_id="test-session",
content="test",
sm=sm,
cancellation_token=MagicMock(),
pending_replies={},
pending_confirmations=None,
)
sent = _sent_messages(ws)
team_errors = [
m
for m in sent
if m.get("type") == "error"
and TEAM_COLLAB_ERROR_HINT in m.get("data", {}).get("message", "")
]
assert len(team_errors) == 0, "PLAN_EXEC must not trigger TEAM_COLLAB error"
# PLAN_EXEC builds a phase engine and runs execute_stream
assert len(stream_calls) == 1, "PLAN_EXEC should invoke execute_stream once"
@pytest.mark.asyncio
async def test_rewoo_falls_back_to_react_with_deferred_log(app_with_chat, caplog):
"""Edge: REWOO → falls back to REACT with deferred (RV10) log, NOT a user error."""
from agentkit.server.routes import chat as chat_module
agent = _make_agent_mock()
routing = _make_routing(execution_mode=ExecutionMode.REWOO)
_setup_routing(app_with_chat, routing, agent)
sm = _make_session_manager_mock()
ws = _make_websocket_mock(app_with_chat)
constructed: list = []
stream_calls: list = []
stub_engine = _make_stub_engine_class(constructed, stream_calls)
with pytest.MonkeyPatch().context() as mp:
mp.setattr(chat_module, "ReActEngine", stub_engine)
with caplog.at_level(logging.WARNING, logger="agentkit.server.routes.chat"):
await chat_module._handle_chat_message(
websocket=ws,
session_id="test-session",
content="test",
sm=sm,
cancellation_token=MagicMock(),
pending_replies={},
pending_confirmations=None,
)
# REWOO falls back to REACT — execute_stream IS called
assert len(stream_calls) == 1, "REWOO should fall back to REACT execute_stream"
# A deferred (RV10) warning was logged
deferred_logs = [r for r in caplog.records if "deferred (RV10)" in r.message]
assert len(deferred_logs) == 1, f"expected deferred RV10 log, got {caplog.records}"
assert "rewoo" in deferred_logs[0].message.lower()
# No TEAM_COLLAB-style error was sent to the user
sent = _sent_messages(ws)
team_errors = [
m
for m in sent
if m.get("type") == "error"
and TEAM_COLLAB_ERROR_HINT in m.get("data", {}).get("message", "")
]
assert len(team_errors) == 0, "REWOO fall-back must not surface a TEAM_COLLAB error"
@pytest.mark.asyncio
async def test_reflexion_falls_back_to_react_with_deferred_log(app_with_chat, caplog):
"""Edge: REFLEXION → falls back to REACT with deferred (RV10) log, NOT a user error."""
from agentkit.server.routes import chat as chat_module
agent = _make_agent_mock()
routing = _make_routing(execution_mode=ExecutionMode.REFLEXION)
_setup_routing(app_with_chat, routing, agent)
sm = _make_session_manager_mock()
ws = _make_websocket_mock(app_with_chat)
constructed: list = []
stream_calls: list = []
stub_engine = _make_stub_engine_class(constructed, stream_calls)
with pytest.MonkeyPatch().context() as mp:
mp.setattr(chat_module, "ReActEngine", stub_engine)
with caplog.at_level(logging.WARNING, logger="agentkit.server.routes.chat"):
await chat_module._handle_chat_message(
websocket=ws,
session_id="test-session",
content="test",
sm=sm,
cancellation_token=MagicMock(),
pending_replies={},
pending_confirmations=None,
)
assert len(stream_calls) == 1, "REFLEXION should fall back to REACT execute_stream"
deferred_logs = [r for r in caplog.records if "deferred (RV10)" in r.message]
assert len(deferred_logs) == 1, f"expected deferred RV10 log, got {caplog.records}"
assert "reflexion" in deferred_logs[0].message.lower()
sent = _sent_messages(ws)
team_errors = [
m
for m in sent
if m.get("type") == "error"
and TEAM_COLLAB_ERROR_HINT in m.get("data", {}).get("message", "")
]
assert len(team_errors) == 0, "REFLEXION fall-back must not surface a TEAM_COLLAB error"
@pytest.mark.asyncio
async def test_direct_chat_does_not_trigger_fallback_block(app_with_chat, monkeypatch):
"""Edge: DIRECT_CHAT → handled earlier, fall-back block not reached."""
from agentkit.server.routes import chat as chat_module
agent = _make_agent_mock()
routing = _make_routing(execution_mode=ExecutionMode.DIRECT_CHAT)
_setup_routing(app_with_chat, routing, agent)
sm = _make_session_manager_mock()
ws = _make_websocket_mock(app_with_chat)
# DIRECT_CHAT calls _resolve_ws_dept_context + llm_gateway.chat
monkeypatch.setattr(
chat_module,
"_resolve_ws_dept_context",
AsyncMock(return_value=(None, [], None)),
)
response = MagicMock()
response.content = "direct reply"
app_with_chat.state.llm_gateway.chat = AsyncMock(return_value=response)
constructed: list = []
stream_calls: list = []
stub_engine = _make_stub_engine_class(constructed, stream_calls)
with pytest.MonkeyPatch().context() as mp:
mp.setattr(chat_module, "ReActEngine", stub_engine)
await chat_module._handle_chat_message(
websocket=ws,
session_id="test-session",
content="test",
sm=sm,
cancellation_token=MagicMock(),
pending_replies={},
pending_confirmations=None,
)
sent = _sent_messages(ws)
team_errors = [
m
for m in sent
if m.get("type") == "error"
and TEAM_COLLAB_ERROR_HINT in m.get("data", {}).get("message", "")
]
assert len(team_errors) == 0, "DIRECT_CHAT must not trigger TEAM_COLLAB error"
# DIRECT_CHAT returns before the engine block — no engine, no stream
assert len(constructed) == 0, "DIRECT_CHAT should not construct ReActEngine"
assert len(stream_calls) == 0, "DIRECT_CHAT should not call execute_stream"
# DIRECT_CHAT emits a final_answer
final_answers = [m for m in sent if m.get("type") == "final_answer"]
assert len(final_answers) == 1
# ---------------------------------------------------------------------------
# Error and failure paths — ordering + no side effects
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_team_collab_error_sent_before_any_engine_execution(app_with_chat):
"""Failure path: error is sent and execution aborts — ReActEngine is never
constructed (engine construction happens after the TEAM_COLLAB return)."""
from agentkit.server.routes import chat as chat_module
agent = _make_agent_mock()
routing = _make_routing(execution_mode=ExecutionMode.TEAM_COLLAB)
_setup_routing(app_with_chat, routing, agent)
sm = _make_session_manager_mock()
ws = _make_websocket_mock(app_with_chat)
constructed: list = []
stream_calls: list = []
stub_engine = _make_stub_engine_class(constructed, stream_calls)
with pytest.MonkeyPatch().context() as mp:
mp.setattr(chat_module, "ReActEngine", stub_engine)
await chat_module._handle_chat_message(
websocket=ws,
session_id="test-session",
content="test",
sm=sm,
cancellation_token=MagicMock(),
pending_replies={},
pending_confirmations=None,
)
# Engine never constructed → execute_stream could not have run before error
assert len(constructed) == 0, "engine must not be constructed before error"
assert len(stream_calls) == 0, "execute_stream must not run before error"
sent = _sent_messages(ws)
# The error was sent (ordering verified: error present, no engine work done)
assert any(m.get("type") == "error" for m in sent), "error must be sent"
@pytest.mark.asyncio
async def test_team_collab_does_not_mutate_routing_tools_or_system_prompt(app_with_chat):
"""Failure path: TEAM_COLLAB error path does not mutate routing.tools or
routing.system_prompt (no side effects before the early return)."""
from agentkit.server.routes import chat as chat_module
agent = _make_agent_mock()
sentinel_tool = _ToolStub("sentinel")
routing = _make_routing(
execution_mode=ExecutionMode.TEAM_COLLAB,
tools=[sentinel_tool],
system_prompt="original-system-prompt",
)
_setup_routing(app_with_chat, routing, agent)
sm = _make_session_manager_mock()
ws = _make_websocket_mock(app_with_chat)
tools_before_id = id(routing.tools)
tools_before_copy = list(routing.tools)
system_prompt_before = routing.system_prompt
constructed: list = []
stream_calls: list = []
stub_engine = _make_stub_engine_class(constructed, stream_calls)
with pytest.MonkeyPatch().context() as mp:
mp.setattr(chat_module, "ReActEngine", stub_engine)
await chat_module._handle_chat_message(
websocket=ws,
session_id="test-session",
content="test",
sm=sm,
cancellation_token=MagicMock(),
pending_replies={},
pending_confirmations=None,
)
# routing.tools not replaced (same object) and not mutated (same contents)
assert id(routing.tools) == tools_before_id, "routing.tools must not be replaced"
assert routing.tools == tools_before_copy, "routing.tools contents must be unchanged"
assert routing.tools[0] is sentinel_tool, "routing.tools[0] identity must be unchanged"
assert routing.system_prompt == system_prompt_before, "system_prompt must be unchanged"
# ---------------------------------------------------------------------------
# Integration — AGENTS.md reflects actual behavior (regression guard)
# ---------------------------------------------------------------------------
def test_agents_md_contains_updated_team_collab_wording():
"""Integration: AGENTS.md documents TEAM_COLLAB routing + R7 (no REACT fall-back)."""
text = AGENTS_MD.read_text(encoding="utf-8")
assert "TEAM_COLLAB 通过 @team 前缀路由到 TeamOrchestratorR7不回退到 REACT" in text, (
"AGENTS.md must document TEAM_COLLAB @team routing with R7 no-fall-back"
)
assert "ExecutionMode.TEAM_COLLAB 非前缀触发时向用户报错并提示使用 @team" in text, (
"AGENTS.md must document the non-prefix TEAM_COLLAB error path"
)
assert "REWOO / REFLEXION-as-mode 暂时回退到 REACTRV10 deferred" in text, (
"AGENTS.md must document REWOO/REFLEXION-as-mode deferred fall-back"
)
def test_agents_md_no_longer_claims_not_yet_supported_for_chat_handler():
"""Integration: AGENTS.md no longer carries the stale '抛出 not yet supported' claim."""
text = AGENTS_MD.read_text(encoding="utf-8")
# The stale phrase attributed the chat handler as raising "not yet supported"
# for unsupported modes. That is no longer true (PLAN_EXEC + TEAM_COLLAB
# routing are wired; REWOO/REFLEXION fall back).
assert '抛出 "not yet supported"' not in text, (
"AGENTS.md must not claim chat handler raises 'not yet supported'"
)
assert "其余抛出" not in text, (
"AGENTS.md must not claim the remaining modes raise (they route/fall back)"
)