"""Unit tests for TEAM_COLLAB routing (U9, R7). Verifies that ``ExecutionMode.TEAM_COLLAB`` reached via the non-@team-prefix path (RequestPreprocessor / skill routing) surfaces an error to the user instead of silently falling back to REACT. The @team prefix itself is handled earlier by ``_execute_team_collab`` and is out of scope here — this test only covers the routing decision at the fall-back block. REWOO / REFLEXION-as-mode keep their deferred REACT fall-back (RV10). """ from __future__ import annotations import logging from pathlib import Path from unittest.mock import AsyncMock, MagicMock import pytest from agentkit.chat.skill_routing import ExecutionMode, SkillRoutingResult # --------------------------------------------------------------------------- # Fixtures and helpers (mirrors test_chat_plan_exec_ws.py patterns) # --------------------------------------------------------------------------- REPO_ROOT = Path(__file__).resolve().parents[2] AGENTS_MD = REPO_ROOT / "AGENTS.md" TEAM_COLLAB_ERROR_HINT = "@team" @pytest.fixture def app_with_chat(): """Create a FastAPI app with Chat routes and mocked dependencies.""" from fastapi import FastAPI from agentkit.server.routes.chat import router app = FastAPI() app.include_router(router, prefix="/api/v1") from agentkit.session.manager import SessionManager from agentkit.session.store import InMemorySessionStore app.state.session_manager = SessionManager(store=InMemorySessionStore()) app.state.llm_gateway = MagicMock() app.state.agent_pool = MagicMock() app.state.server_config = MagicMock() app.state.server_config.api_key = None app.state.server_config.plan_exec = {} return app def _make_routing( execution_mode: ExecutionMode = ExecutionMode.REACT, tools: list | None = None, system_prompt: str | None = None, ) -> SkillRoutingResult: """Build a minimal SkillRoutingResult for testing.""" return SkillRoutingResult( execution_mode=execution_mode, tools=tools or [], clean_content="test message", model="default", agent_name="test-agent", system_prompt=system_prompt, skill_name=None, ) def _make_websocket_mock(app) -> MagicMock: """Build a mock WebSocket with app.state and async send_json.""" ws = MagicMock() ws.app = app ws.send_json = AsyncMock() return ws def _make_agent_mock() -> MagicMock: """Build a mock Agent with _tool_registry and _react_engine.""" agent = MagicMock() agent.name = "test-agent" agent._tool_registry = MagicMock() agent._tool_registry.list_tools.return_value = [] agent._system_prompt = None # _react_engine is None to force the code path that creates a new engine agent._react_engine = None agent.get_model.return_value = "default" return agent def _make_session_manager_mock() -> MagicMock: """Build a mock SessionManager with async methods.""" sm = MagicMock() session = MagicMock() session.agent_name = "test-agent" session.status = "active" sm.get_session = AsyncMock(return_value=session) sm.get_chat_messages = AsyncMock(return_value=[]) sm.append_message = AsyncMock() return sm def _setup_routing(app, routing: SkillRoutingResult, agent: MagicMock) -> None: """Wire up app.state so _handle_chat_message finds the right routing.""" app.state.agent_pool.get_agent.return_value = agent app.state.request_preprocessor = MagicMock() app.state.request_preprocessor.preprocess = AsyncMock(return_value=routing) class _ToolStub: """Minimal tool stub with a name attribute (for tool_names logging).""" def __init__(self, name: str) -> None: self.name = name def _make_stub_engine_class( constructed_engines: list, stream_calls: list, ) -> type: """Build a stub ReActEngine subclass that records construction + stream calls. The stub is a valid async generator (uses ``return; yield`` per project rule so Python treats it as an async generator even when the body returns first). """ class _StubEngine: def __init__(self, **kwargs) -> None: constructed_engines.append(self) self._phase_policy = kwargs.get("phase_policy") self._current_phase = ( kwargs.get("phase_policy").start_phase if kwargs.get("phase_policy") else None ) @property def current_phase(self): return self._current_phase def reset(self) -> None: pass async def execute_stream(self, **kwargs): stream_calls.append(kwargs) return yield # async generator marker (project rule) return _StubEngine def _sent_messages(ws: MagicMock) -> list[dict]: return [call.args[0] for call in ws.send_json.call_args_list] # --------------------------------------------------------------------------- # Happy path — TEAM_COLLAB (non-prefix) surfaces error, no REACT fall-back # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_team_collab_non_prefix_sends_error_and_aborts(app_with_chat): """Happy path: TEAM_COLLAB without @team prefix → error with @team guidance, execution aborted (no ReActEngine.execute_stream call).""" from agentkit.server.routes import chat as chat_module agent = _make_agent_mock() routing = _make_routing(execution_mode=ExecutionMode.TEAM_COLLAB) _setup_routing(app_with_chat, routing, agent) sm = _make_session_manager_mock() ws = _make_websocket_mock(app_with_chat) constructed: list = [] stream_calls: list = [] stub_engine = _make_stub_engine_class(constructed, stream_calls) with pytest.MonkeyPatch().context() as mp: mp.setattr(chat_module, "ReActEngine", stub_engine) await chat_module._handle_chat_message( websocket=ws, session_id="test-session", content="test", sm=sm, cancellation_token=MagicMock(), pending_replies={}, pending_confirmations=None, ) sent = _sent_messages(ws) error_messages = [m for m in sent if m.get("type") == "error"] assert len(error_messages) == 1, f"expected exactly one error, got {sent}" message = error_messages[0]["data"]["message"] assert TEAM_COLLAB_ERROR_HINT in message, f"error message must mention @team: {message}" # No REACT engine was constructed for execution (fall-back NOT taken) assert len(constructed) == 0, "ReActEngine should not be constructed for TEAM_COLLAB" assert len(stream_calls) == 0, "execute_stream must not be called for TEAM_COLLAB" # --------------------------------------------------------------------------- # Edge cases — other modes do NOT trigger the TEAM_COLLAB error block # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_react_mode_continues_without_team_collab_error(app_with_chat): """Edge: REACT mode → no TEAM_COLLAB error, normal execution continues.""" from agentkit.server.routes import chat as chat_module agent = _make_agent_mock() routing = _make_routing(execution_mode=ExecutionMode.REACT) _setup_routing(app_with_chat, routing, agent) sm = _make_session_manager_mock() ws = _make_websocket_mock(app_with_chat) constructed: list = [] stream_calls: list = [] stub_engine = _make_stub_engine_class(constructed, stream_calls) with pytest.MonkeyPatch().context() as mp: mp.setattr(chat_module, "ReActEngine", stub_engine) await chat_module._handle_chat_message( websocket=ws, session_id="test-session", content="test", sm=sm, cancellation_token=MagicMock(), pending_replies={}, pending_confirmations=None, ) sent = _sent_messages(ws) team_errors = [ m for m in sent if m.get("type") == "error" and TEAM_COLLAB_ERROR_HINT in m.get("data", {}).get("message", "") ] assert len(team_errors) == 0, "REACT must not trigger TEAM_COLLAB error" # REACT executes via the fallback path → engine constructed + stream called assert len(stream_calls) == 1, "REACT should invoke execute_stream once" @pytest.mark.asyncio async def test_skill_react_mode_continues_without_team_collab_error(app_with_chat): """Edge: SKILL_REACT mode → no TEAM_COLLAB error, normal execution continues.""" from agentkit.server.routes import chat as chat_module agent = _make_agent_mock() routing = _make_routing(execution_mode=ExecutionMode.SKILL_REACT) _setup_routing(app_with_chat, routing, agent) sm = _make_session_manager_mock() ws = _make_websocket_mock(app_with_chat) constructed: list = [] stream_calls: list = [] stub_engine = _make_stub_engine_class(constructed, stream_calls) with pytest.MonkeyPatch().context() as mp: mp.setattr(chat_module, "ReActEngine", stub_engine) await chat_module._handle_chat_message( websocket=ws, session_id="test-session", content="test", sm=sm, cancellation_token=MagicMock(), pending_replies={}, pending_confirmations=None, ) sent = _sent_messages(ws) team_errors = [ m for m in sent if m.get("type") == "error" and TEAM_COLLAB_ERROR_HINT in m.get("data", {}).get("message", "") ] assert len(team_errors) == 0, "SKILL_REACT must not trigger TEAM_COLLAB error" assert len(stream_calls) == 1, "SKILL_REACT should invoke execute_stream once" @pytest.mark.asyncio async def test_plan_exec_mode_does_not_trigger_fallback_block(app_with_chat): """Edge: PLAN_EXEC → handled earlier, fall-back block must not trigger.""" from agentkit.server.routes import chat as chat_module app_with_chat.state.server_config.plan_exec = {} agent = _make_agent_mock() routing = _make_routing(execution_mode=ExecutionMode.PLAN_EXEC) _setup_routing(app_with_chat, routing, agent) sm = _make_session_manager_mock() sm.get_chat_messages = AsyncMock(return_value=[{"role": "user", "content": "test"}]) ws = _make_websocket_mock(app_with_chat) constructed: list = [] stream_calls: list = [] stub_engine = _make_stub_engine_class(constructed, stream_calls) with pytest.MonkeyPatch().context() as mp: mp.setattr(chat_module, "ReActEngine", stub_engine) await chat_module._handle_chat_message( websocket=ws, session_id="test-session", content="test", sm=sm, cancellation_token=MagicMock(), pending_replies={}, pending_confirmations=None, ) sent = _sent_messages(ws) team_errors = [ m for m in sent if m.get("type") == "error" and TEAM_COLLAB_ERROR_HINT in m.get("data", {}).get("message", "") ] assert len(team_errors) == 0, "PLAN_EXEC must not trigger TEAM_COLLAB error" # PLAN_EXEC builds a phase engine and runs execute_stream assert len(stream_calls) == 1, "PLAN_EXEC should invoke execute_stream once" @pytest.mark.asyncio async def test_rewoo_falls_back_to_react_with_deferred_log(app_with_chat, caplog): """Edge: REWOO → falls back to REACT with deferred (RV10) log, NOT a user error.""" from agentkit.server.routes import chat as chat_module agent = _make_agent_mock() routing = _make_routing(execution_mode=ExecutionMode.REWOO) _setup_routing(app_with_chat, routing, agent) sm = _make_session_manager_mock() ws = _make_websocket_mock(app_with_chat) constructed: list = [] stream_calls: list = [] stub_engine = _make_stub_engine_class(constructed, stream_calls) with pytest.MonkeyPatch().context() as mp: mp.setattr(chat_module, "ReActEngine", stub_engine) with caplog.at_level(logging.WARNING, logger="agentkit.server.routes.chat"): await chat_module._handle_chat_message( websocket=ws, session_id="test-session", content="test", sm=sm, cancellation_token=MagicMock(), pending_replies={}, pending_confirmations=None, ) # REWOO falls back to REACT — execute_stream IS called assert len(stream_calls) == 1, "REWOO should fall back to REACT execute_stream" # A deferred (RV10) warning was logged deferred_logs = [r for r in caplog.records if "deferred (RV10)" in r.message] assert len(deferred_logs) == 1, f"expected deferred RV10 log, got {caplog.records}" assert "rewoo" in deferred_logs[0].message.lower() # No TEAM_COLLAB-style error was sent to the user sent = _sent_messages(ws) team_errors = [ m for m in sent if m.get("type") == "error" and TEAM_COLLAB_ERROR_HINT in m.get("data", {}).get("message", "") ] assert len(team_errors) == 0, "REWOO fall-back must not surface a TEAM_COLLAB error" @pytest.mark.asyncio async def test_reflexion_falls_back_to_react_with_deferred_log(app_with_chat, caplog): """Edge: REFLEXION → falls back to REACT with deferred (RV10) log, NOT a user error.""" from agentkit.server.routes import chat as chat_module agent = _make_agent_mock() routing = _make_routing(execution_mode=ExecutionMode.REFLEXION) _setup_routing(app_with_chat, routing, agent) sm = _make_session_manager_mock() ws = _make_websocket_mock(app_with_chat) constructed: list = [] stream_calls: list = [] stub_engine = _make_stub_engine_class(constructed, stream_calls) with pytest.MonkeyPatch().context() as mp: mp.setattr(chat_module, "ReActEngine", stub_engine) with caplog.at_level(logging.WARNING, logger="agentkit.server.routes.chat"): await chat_module._handle_chat_message( websocket=ws, session_id="test-session", content="test", sm=sm, cancellation_token=MagicMock(), pending_replies={}, pending_confirmations=None, ) assert len(stream_calls) == 1, "REFLEXION should fall back to REACT execute_stream" deferred_logs = [r for r in caplog.records if "deferred (RV10)" in r.message] assert len(deferred_logs) == 1, f"expected deferred RV10 log, got {caplog.records}" assert "reflexion" in deferred_logs[0].message.lower() sent = _sent_messages(ws) team_errors = [ m for m in sent if m.get("type") == "error" and TEAM_COLLAB_ERROR_HINT in m.get("data", {}).get("message", "") ] assert len(team_errors) == 0, "REFLEXION fall-back must not surface a TEAM_COLLAB error" @pytest.mark.asyncio async def test_direct_chat_does_not_trigger_fallback_block(app_with_chat, monkeypatch): """Edge: DIRECT_CHAT → handled earlier, fall-back block not reached.""" from agentkit.server.routes import chat as chat_module agent = _make_agent_mock() routing = _make_routing(execution_mode=ExecutionMode.DIRECT_CHAT) _setup_routing(app_with_chat, routing, agent) sm = _make_session_manager_mock() ws = _make_websocket_mock(app_with_chat) # DIRECT_CHAT calls _resolve_ws_dept_context + llm_gateway.chat monkeypatch.setattr( chat_module, "_resolve_ws_dept_context", AsyncMock(return_value=(None, [], None)), ) response = MagicMock() response.content = "direct reply" app_with_chat.state.llm_gateway.chat = AsyncMock(return_value=response) constructed: list = [] stream_calls: list = [] stub_engine = _make_stub_engine_class(constructed, stream_calls) with pytest.MonkeyPatch().context() as mp: mp.setattr(chat_module, "ReActEngine", stub_engine) await chat_module._handle_chat_message( websocket=ws, session_id="test-session", content="test", sm=sm, cancellation_token=MagicMock(), pending_replies={}, pending_confirmations=None, ) sent = _sent_messages(ws) team_errors = [ m for m in sent if m.get("type") == "error" and TEAM_COLLAB_ERROR_HINT in m.get("data", {}).get("message", "") ] assert len(team_errors) == 0, "DIRECT_CHAT must not trigger TEAM_COLLAB error" # DIRECT_CHAT returns before the engine block — no engine, no stream assert len(constructed) == 0, "DIRECT_CHAT should not construct ReActEngine" assert len(stream_calls) == 0, "DIRECT_CHAT should not call execute_stream" # DIRECT_CHAT emits a final_answer final_answers = [m for m in sent if m.get("type") == "final_answer"] assert len(final_answers) == 1 # --------------------------------------------------------------------------- # Error and failure paths — ordering + no side effects # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_team_collab_error_sent_before_any_engine_execution(app_with_chat): """Failure path: error is sent and execution aborts — ReActEngine is never constructed (engine construction happens after the TEAM_COLLAB return).""" from agentkit.server.routes import chat as chat_module agent = _make_agent_mock() routing = _make_routing(execution_mode=ExecutionMode.TEAM_COLLAB) _setup_routing(app_with_chat, routing, agent) sm = _make_session_manager_mock() ws = _make_websocket_mock(app_with_chat) constructed: list = [] stream_calls: list = [] stub_engine = _make_stub_engine_class(constructed, stream_calls) with pytest.MonkeyPatch().context() as mp: mp.setattr(chat_module, "ReActEngine", stub_engine) await chat_module._handle_chat_message( websocket=ws, session_id="test-session", content="test", sm=sm, cancellation_token=MagicMock(), pending_replies={}, pending_confirmations=None, ) # Engine never constructed → execute_stream could not have run before error assert len(constructed) == 0, "engine must not be constructed before error" assert len(stream_calls) == 0, "execute_stream must not run before error" sent = _sent_messages(ws) # The error was sent (ordering verified: error present, no engine work done) assert any(m.get("type") == "error" for m in sent), "error must be sent" @pytest.mark.asyncio async def test_team_collab_does_not_mutate_routing_tools_or_system_prompt(app_with_chat): """Failure path: TEAM_COLLAB error path does not mutate routing.tools or routing.system_prompt (no side effects before the early return).""" from agentkit.server.routes import chat as chat_module agent = _make_agent_mock() sentinel_tool = _ToolStub("sentinel") routing = _make_routing( execution_mode=ExecutionMode.TEAM_COLLAB, tools=[sentinel_tool], system_prompt="original-system-prompt", ) _setup_routing(app_with_chat, routing, agent) sm = _make_session_manager_mock() ws = _make_websocket_mock(app_with_chat) tools_before_id = id(routing.tools) tools_before_copy = list(routing.tools) system_prompt_before = routing.system_prompt constructed: list = [] stream_calls: list = [] stub_engine = _make_stub_engine_class(constructed, stream_calls) with pytest.MonkeyPatch().context() as mp: mp.setattr(chat_module, "ReActEngine", stub_engine) await chat_module._handle_chat_message( websocket=ws, session_id="test-session", content="test", sm=sm, cancellation_token=MagicMock(), pending_replies={}, pending_confirmations=None, ) # routing.tools not replaced (same object) and not mutated (same contents) assert id(routing.tools) == tools_before_id, "routing.tools must not be replaced" assert routing.tools == tools_before_copy, "routing.tools contents must be unchanged" assert routing.tools[0] is sentinel_tool, "routing.tools[0] identity must be unchanged" assert routing.system_prompt == system_prompt_before, "system_prompt must be unchanged" # --------------------------------------------------------------------------- # Integration — AGENTS.md reflects actual behavior (regression guard) # --------------------------------------------------------------------------- def test_agents_md_contains_updated_team_collab_wording(): """Integration: AGENTS.md documents TEAM_COLLAB routing + R7 (no REACT fall-back).""" text = AGENTS_MD.read_text(encoding="utf-8") assert "TEAM_COLLAB 通过 @team 前缀路由到 TeamOrchestrator(R7,不回退到 REACT)" in text, ( "AGENTS.md must document TEAM_COLLAB @team routing with R7 no-fall-back" ) assert "ExecutionMode.TEAM_COLLAB 非前缀触发时向用户报错并提示使用 @team" in text, ( "AGENTS.md must document the non-prefix TEAM_COLLAB error path" ) assert "REWOO / REFLEXION-as-mode 暂时回退到 REACT(RV10 deferred)" in text, ( "AGENTS.md must document REWOO/REFLEXION-as-mode deferred fall-back" ) def test_agents_md_no_longer_claims_not_yet_supported_for_chat_handler(): """Integration: AGENTS.md no longer carries the stale '抛出 not yet supported' claim.""" text = AGENTS_MD.read_text(encoding="utf-8") # The stale phrase attributed the chat handler as raising "not yet supported" # for unsupported modes. That is no longer true (PLAN_EXEC + TEAM_COLLAB # routing are wired; REWOO/REFLEXION fall back). assert '抛出 "not yet supported"' not in text, ( "AGENTS.md must not claim chat handler raises 'not yet supported'" ) assert "其余抛出" not in text, ( "AGENTS.md must not claim the remaining modes raise (they route/fall back)" )