"""PhaseExecutor streaming tests (U3) Tests streaming execution in _run_agent_steps: - token/final_answer events forwarded as expert_result_chunk - expert_result(completed) broadcast after stream completes - expert_result(error) broadcast on mid-stream exception - retry contract: expert_result_chunk_reset before retry - TeamOrchestrator synthesis streams team_synthesis_chunk """ from __future__ import annotations from unittest.mock import AsyncMock, MagicMock import pytest from agentkit.core.handoff_transport import InProcessHandoffTransport from agentkit.core.react import ReActEvent from agentkit.experts._review_gate import ReviewResult from agentkit.experts.config import ExpertConfig from agentkit.experts.expert import Expert from agentkit.experts.orchestrator import TeamOrchestrator from agentkit.experts.plan import PlanPhase, TeamPlan from agentkit.experts.team import ExpertTeam # ── 辅助函数 ────────────────────────────────────────────── def _make_expert_config(name: str = "test_expert", color: str = "#1890ff") -> ExpertConfig: return ExpertConfig( name=name, agent_type="expert", persona="测试专家", thinking_style="逻辑推理", bound_skills=["skill_a"], color=color, prompt={"system": "你是测试专家"}, # ponytail: llm_generate 模式校验要求 prompt ) def _make_mock_expert(name: str = "test_expert", color: str = "#1890ff") -> MagicMock: config = _make_expert_config(name=name, color=color) expert = MagicMock(spec=Expert) expert.config = config expert.is_active = True return expert def _make_stream_agent(events: list[ReActEvent]) -> MagicMock: """Create a mock agent whose execute_stream yields the given ReActEvents.""" async def _execute_stream(task): for e in events: yield e agent = MagicMock() agent.execute_stream = _execute_stream return agent def _make_error_stream_agent( events_before_error: list[ReActEvent], error: Exception ) -> MagicMock: """Agent that yields some events then raises an error.""" async def _execute_stream(task): for e in events_before_error: yield e raise error agent = MagicMock() agent.execute_stream = _execute_stream return agent def _make_retry_stream_agent( fail_events: list[ReActEvent], success_events: list[ReActEvent], error: Exception, ) -> MagicMock: """Agent that fails on first call (after yielding fail_events), succeeds on second.""" call_count = [0] async def _execute_stream(task): call_count[0] += 1 if call_count[0] == 1: for e in fail_events: yield e raise error for e in success_events: yield e agent = MagicMock() agent.execute_stream = _execute_stream return agent def _make_phase( phase_id: str = "phase_1", name: str = "Test Phase", expert_name: str = "test_expert", depends_on: list[str] | None = None, ) -> PlanPhase: return PlanPhase( id=phase_id, name=name, assigned_expert=expert_name, task_description="完成测试任务", depends_on=depends_on or [], ) def _make_orchestrator_for_streaming() -> TeamOrchestrator: """Create a TeamOrchestrator with mocked broadcast + review for _run_agent_steps tests.""" team = ExpertTeam() team._handoff_transport = MagicMock(spec=InProcessHandoffTransport) orchestrator = TeamOrchestrator(team) orchestrator._broadcast_event = AsyncMock() orchestrator._review_phase_output = AsyncMock( return_value=ReviewResult(passed=True, degraded=False, feedback="") ) return orchestrator def _make_simple_plan() -> TeamPlan: """Create a minimal TeamPlan with no phases (not accessed when deps/contracts empty).""" plan = MagicMock(spec=TeamPlan) plan.id = "test_plan" plan.phases = [] return plan # ── 流式 token/final_answer 转发测试 ────────────────────── class TestStreamEventForwarding: """execute_stream 事件转发到 _broadcast_event。""" @pytest.mark.asyncio async def test_token_events_forwarded_as_expert_result_chunk(self): """execute_stream 产出 token 事件时,_broadcast_event 转发 expert_result_chunk。""" events = [ ReActEvent(event_type="token", step=0, data={"content": "Hello"}), ReActEvent(event_type="token", step=0, data={"content": " World"}), ] agent = _make_stream_agent(events) expert = _make_mock_expert() lead = _make_mock_expert(name="lead") phase = _make_phase() plan = _make_simple_plan() orch = _make_orchestrator_for_streaming() await orch._run_agent_steps(expert, agent, lead, phase, plan) # Verify expert_result_chunk broadcasts for tokens chunk_calls = [ c for c in orch._broadcast_event.call_args_list if c.args[0] == "expert_result_chunk" ] assert len(chunk_calls) == 2 assert chunk_calls[0].args[1]["content"] == "Hello" assert chunk_calls[1].args[1]["content"] == " World" assert chunk_calls[0].args[1]["expert_id"] == "test_expert" @pytest.mark.asyncio async def test_final_answer_not_forwarded_as_chunk(self): """final_answer 仅作完成信号,不转发为 expert_result_chunk(避免与 token 双重累积)。 无 token 时 final_answer output 作为兜底内容累积到 result,但不广播 chunk。 """ events = [ ReActEvent(event_type="final_answer", step=0, data={"output": "最终结果"}), ] agent = _make_stream_agent(events) expert = _make_mock_expert() lead = _make_mock_expert(name="lead") phase = _make_phase() plan = _make_simple_plan() orch = _make_orchestrator_for_streaming() result, _, passed, _, _ = await orch._run_agent_steps( expert, agent, lead, phase, plan ) chunk_calls = [ c for c in orch._broadcast_event.call_args_list if c.args[0] == "expert_result_chunk" ] assert len(chunk_calls) == 0 # output still reaches expert_result(completed) via fallback accumulation assert result["content"] == "最终结果" assert passed is True @pytest.mark.asyncio async def test_thinking_events_forwarded_as_expert_step(self): """execute_stream 产出 thinking 事件时,_broadcast_event 转发 expert_step。 P1 fix: payload 对齐前端 WsServerMessage 契约 — 携带 expert_id/expert_name/expert_color/content/step 字段。 """ events = [ ReActEvent(event_type="thinking", step=0, data={"content": "思考中..."}), ReActEvent(event_type="token", step=0, data={"content": "结果"}), ] agent = _make_stream_agent(events) expert = _make_mock_expert() lead = _make_mock_expert(name="lead") phase = _make_phase() plan = _make_simple_plan() orch = _make_orchestrator_for_streaming() await orch._run_agent_steps(expert, agent, lead, phase, plan) step_calls = [ c for c in orch._broadcast_event.call_args_list if c.args[0] == "expert_step" and c.args[1].get("step") == "thinking" ] assert len(step_calls) == 1 payload = step_calls[0].args[1] # P1 fix: payload 必须包含前端契约字段 assert payload["expert_id"] == expert.config.name assert payload["expert_name"] == expert.config.name assert payload["expert_color"] == expert.config.color assert payload["content"] == "思考中..." assert payload["step"] == "thinking" @pytest.mark.asyncio async def test_tool_call_events_forwarded_as_expert_step(self): """execute_stream 产出 tool_call/tool_result 事件时,_broadcast_event 转发 expert_step。 P1 fix: payload 对齐前端契约 — content 携带 tool_name 摘要,step_data 保留原始数据。 """ events = [ ReActEvent(event_type="tool_call", step=0, data={"tool_name": "search", "args": {"q": "test"}}), ReActEvent(event_type="tool_result", step=0, data={"tool_name": "search", "result": "hit"}), ] agent = _make_stream_agent(events) expert = _make_mock_expert() lead = _make_mock_expert(name="lead") phase = _make_phase() plan = _make_simple_plan() orch = _make_orchestrator_for_streaming() await orch._run_agent_steps(expert, agent, lead, phase, plan) # 仅断言 tool_call/tool_result 事件(phase intro step 单独由其他测试覆盖) step_calls = [ c for c in orch._broadcast_event.call_args_list if c.args[0] == "expert_step" and c.args[1].get("step") in ("tool_call", "tool_result") ] assert len(step_calls) == 2 # tool_call event p1 = step_calls[0].args[1] assert p1["step"] == "tool_call" assert p1["content"] == "search" assert p1["step_data"]["args"] == {"q": "test"} assert p1["expert_name"] == expert.config.name assert p1["expert_color"] == expert.config.color # tool_result event p2 = step_calls[1].args[1] assert p2["step"] == "tool_result" assert p2["content"] == "search" assert p2["step_data"]["result"] == "hit" # ── expert_result 终结事件测试 ──────────────────────────── class TestExpertResultTermination: """流式会话必须以 expert_result(completed) 或 expert_result(error) 终结。""" @pytest.mark.asyncio async def test_expert_result_completed_after_stream(self): """循环结束后广播完整 expert_result 事件,status=completed。 ReActEngine 合约:token 事件(增量)+ final_answer(全文)。 final_answer 仅作完成信号,不重复累积 — 避免内容翻倍。 """ events = [ ReActEvent(event_type="token", step=0, data={"content": "Hel"}), ReActEvent(event_type="token", step=0, data={"content": "lo"}), ReActEvent(event_type="final_answer", step=0, data={"output": "Hello"}), ] agent = _make_stream_agent(events) expert = _make_mock_expert() lead = _make_mock_expert(name="lead") phase = _make_phase() plan = _make_simple_plan() orch = _make_orchestrator_for_streaming() result, last_error, passed, feedback, degraded = await orch._run_agent_steps( expert, agent, lead, phase, plan ) # Verify expert_result(completed) broadcast result_calls = [ c for c in orch._broadcast_event.call_args_list if c.args[0] == "expert_result" and c.args[1].get("status") == "completed" ] assert len(result_calls) == 1 # Content is token-accumulated only — final_answer must not double it assert result_calls[0].args[1]["content"] == "Hello" assert result_calls[0].args[1]["expert_id"] == "test_expert" assert result["content"] == "Hello" assert passed is True @pytest.mark.asyncio async def test_final_answer_fallback_when_no_tokens(self): """无 token 事件时(如 _wrap_sync_as_stream fallback),final_answer output 作为兜底。""" events = [ ReActEvent(event_type="final_answer", step=0, data={"output": "Fallback"}), ] agent = _make_stream_agent(events) expert = _make_mock_expert() lead = _make_mock_expert(name="lead") phase = _make_phase() plan = _make_simple_plan() orch = _make_orchestrator_for_streaming() result, last_error, passed, feedback, degraded = await orch._run_agent_steps( expert, agent, lead, phase, plan ) assert result["content"] == "Fallback" assert passed is True @pytest.mark.asyncio async def test_streaming_always_terminates_with_result_event(self): """即使 execute_stream 无事件产出,也必须广播 expert_result(completed)。""" events = [] agent = _make_stream_agent(events) expert = _make_mock_expert() lead = _make_mock_expert(name="lead") phase = _make_phase() plan = _make_simple_plan() orch = _make_orchestrator_for_streaming() await orch._run_agent_steps(expert, agent, lead, phase, plan) result_calls = [ c for c in orch._broadcast_event.call_args_list if c.args[0] == "expert_result" ] assert len(result_calls) == 1 assert result_calls[0].args[1]["status"] == "completed" # ── 异常处理测试 ────────────────────────────────────────── class TestStreamExceptionHandling: """execute_stream 异常时广播 expert_result(error) 并携带已累积内容。""" @pytest.mark.asyncio async def test_mid_stream_exception_broadcasts_error_with_accumulated(self): """execute_stream 中途抛出异常时,广播 expert_result(error) 携带已累积内容。""" events_before_error = [ ReActEvent(event_type="token", step=0, data={"content": "部分"}), ReActEvent(event_type="token", step=0, data={"content": "内容"}), ] agent = _make_error_stream_agent(events_before_error, RuntimeError("LLM exploded")) expert = _make_mock_expert() lead = _make_mock_expert(name="lead") phase = _make_phase() plan = _make_simple_plan() orch = _make_orchestrator_for_streaming() # MAX_RETRIES=1, so after 2 attempts (1 initial + 1 retry), it raises with pytest.raises(RuntimeError, match="LLM exploded"): await orch._run_agent_steps(expert, agent, lead, phase, plan) # Verify expert_result(error) was broadcast with accumulated content error_result_calls = [ c for c in orch._broadcast_event.call_args_list if c.args[0] == "expert_result" and c.args[1].get("status") == "error" ] assert len(error_result_calls) >= 1 # The last error broadcast should have the accumulated content from the final attempt last_error_call = error_result_calls[-1] assert last_error_call.args[1]["error"] == "LLM exploded" # Content should be the accumulated partial content assert "部分" in last_error_call.args[1]["content"] assert "内容" in last_error_call.args[1]["content"] @pytest.mark.asyncio async def test_exception_does_not_silently_hang(self): """流式会话不允许静默挂起 — 异常后必须有 expert_result 事件。""" agent = _make_error_stream_agent([], RuntimeError("immediate failure")) expert = _make_mock_expert() lead = _make_mock_expert(name="lead") phase = _make_phase() plan = _make_simple_plan() orch = _make_orchestrator_for_streaming() with pytest.raises(RuntimeError, match="immediate failure"): await orch._run_agent_steps(expert, agent, lead, phase, plan) # Must have at least one expert_result event (error or completed) result_calls = [ c for c in orch._broadcast_event.call_args_list if c.args[0] == "expert_result" ] assert len(result_calls) >= 1 # All should be error status (no completed since it never succeeded) statuses = [c.args[1].get("status") for c in result_calls] assert all(s == "error" for s in statuses) # ── 重试 + 流式合约测试 ──────────────────────────────────── class TestRetryStreamContract: """重试 + 流式合约:reset → 重试 → 仅含 attempt 2 内容。""" @pytest.mark.asyncio async def test_retry_broadcasts_chunk_reset_then_succeeds(self): """execute_stream 2 chunks 后抛异常 → retry → 广播 reset → 重试成功 → 仅含 attempt 2 内容。""" fail_events = [ ReActEvent(event_type="token", step=0, data={"content": "attempt1_"}), ReActEvent(event_type="token", step=0, data={"content": "partial"}), ] success_events = [ ReActEvent(event_type="token", step=0, data={"content": "attempt2_"}), ReActEvent(event_type="token", step=0, data={"content": "success"}), ReActEvent(event_type="final_answer", step=0, data={"output": "attempt2_success"}), ] agent = _make_retry_stream_agent(fail_events, success_events, RuntimeError("transient")) expert = _make_mock_expert() lead = _make_mock_expert(name="lead") phase = _make_phase() plan = _make_simple_plan() orch = _make_orchestrator_for_streaming() result, last_error, passed, feedback, degraded = await orch._run_agent_steps( expert, agent, lead, phase, plan ) # Verify expert_result_chunk_reset was broadcast before retry reset_calls = [ c for c in orch._broadcast_event.call_args_list if c.args[0] == "expert_result_chunk_reset" ] assert len(reset_calls) == 1 assert reset_calls[0].args[1]["expert_id"] == "test_expert" # Verify expert_result(completed) contains ONLY attempt 2 content completed_calls = [ c for c in orch._broadcast_event.call_args_list if c.args[0] == "expert_result" and c.args[1].get("status") == "completed" ] assert len(completed_calls) == 1 content = completed_calls[0].args[1]["content"] assert "attempt2_success" in content assert "attempt1_partial" not in content # Return value matches attempt 2 only assert result["content"] == "attempt2_success" @pytest.mark.asyncio async def test_retry_exhausted_broadcasts_error(self): """重试耗尽后广播 expert_result(error),不静默挂起。""" # Every attempt fails — use _make_error_stream_agent (fails on every call) # rather than _make_retry_stream_agent (fails only on first call) agent = _make_error_stream_agent( events_before_error=[ReActEvent(event_type="token", step=0, data={"content": "fail"})], error=RuntimeError("persistent failure"), ) expert = _make_mock_expert() lead = _make_mock_expert(name="lead") phase = _make_phase() plan = _make_simple_plan() orch = _make_orchestrator_for_streaming() with pytest.raises(RuntimeError, match="persistent failure"): await orch._run_agent_steps(expert, agent, lead, phase, plan) # Must have expert_result(error) as terminal event error_calls = [ c for c in orch._broadcast_event.call_args_list if c.args[0] == "expert_result" and c.args[1].get("status") == "error" ] assert len(error_calls) >= 1 # No completed event completed_calls = [ c for c in orch._broadcast_event.call_args_list if c.args[0] == "expert_result" and c.args[1].get("status") == "completed" ] assert len(completed_calls) == 0 # ── 综合阶段流式测试 ─────────────────────────────────────── class TestSynthesisStreaming: """TeamOrchestrator 综合阶段流式广播 team_synthesis_chunk。""" @pytest.mark.asyncio async def test_synthesis_streams_team_synthesis_chunk(self): """_synthesize_results 流式综合时调用 broadcast_callback 广播 team_synthesis_chunk。""" from agentkit.experts.orchestrator import TeamOrchestrator team = ExpertTeam() team._handoff_transport = MagicMock(spec=InProcessHandoffTransport) orch = TeamOrchestrator(team) # Mock gateway with chat_stream yielding chunks stream_chunks = ["综合", "结果", "完成"] async def _mock_chat_stream(messages, model=None, **kwargs): for text in stream_chunks: chunk = MagicMock() chunk.content = text yield chunk gateway = MagicMock() gateway.chat_stream = _mock_chat_stream orch._get_llm_gateway = MagicMock(return_value=gateway) orch._get_model = MagicMock(return_value="test_model") orch._user_context = [] # Two completed phases (needed to enter LLM synthesis path) phase1 = _make_phase(phase_id="p1", name="Phase 1") phase1.result = {"content": "结果1"} phase2 = _make_phase(phase_id="p2", name="Phase 2") phase2.result = {"content": "结果2"} lead = _make_mock_expert(name="lead") # Collect chunks via callback received_chunks: list[str] = [] async def callback(data: dict[str, object]) -> None: received_chunks.append(str(data.get("chunk", ""))) result = await orch._synthesize_results( lead, "原始任务", [phase1, phase2], broadcast_callback=callback ) # Verify all chunks were forwarded assert received_chunks == ["综合", "结果", "完成"] # Verify returned content is the full concatenation assert result["content"] == "综合结果完成" assert result["strategy"] == "best" assert result["phases_completed"] == 2 @pytest.mark.asyncio async def test_synthesis_without_callback_uses_sync_chat(self): """无 broadcast_callback 时,_synthesize_results 回退到 gateway.chat()(向后兼容)。""" from agentkit.experts.orchestrator import TeamOrchestrator team = ExpertTeam() team._handoff_transport = MagicMock(spec=InProcessHandoffTransport) orch = TeamOrchestrator(team) response = MagicMock() response.content = "同步综合结果" gateway = MagicMock() gateway.chat = AsyncMock(return_value=response) orch._get_llm_gateway = MagicMock(return_value=gateway) orch._get_model = MagicMock(return_value="test_model") orch._user_context = [] phase1 = _make_phase(phase_id="p1", name="Phase 1") phase1.result = {"content": "结果1"} phase2 = _make_phase(phase_id="p2", name="Phase 2") phase2.result = {"content": "结果2"} lead = _make_mock_expert(name="lead") result = await orch._synthesize_results(lead, "任务", [phase1, phase2]) # Verify sync chat was used (not chat_stream) gateway.chat.assert_called_once() assert result["content"] == "同步综合结果" @pytest.mark.asyncio async def test_synthesis_stream_failure_falls_back_to_concatenation(self): """chat_stream 失败时回退到拼接(设计意图保留 except Exception)。""" from agentkit.experts.orchestrator import TeamOrchestrator team = ExpertTeam() team._handoff_transport = MagicMock(spec=InProcessHandoffTransport) orch = TeamOrchestrator(team) async def _failing_chat_stream(messages, model=None, **kwargs): raise RuntimeError("stream unavailable") yield # never reached — makes this an async generator gateway = MagicMock() gateway.chat_stream = _failing_chat_stream orch._get_llm_gateway = MagicMock(return_value=gateway) orch._get_model = MagicMock(return_value="test_model") orch._user_context = [] phase1 = _make_phase(phase_id="p1", name="Phase 1") phase1.result = {"content": "结果1"} phase2 = _make_phase(phase_id="p2", name="Phase 2") phase2.result = {"content": "结果2"} lead = _make_mock_expert(name="lead") received: list[str] = [] async def callback(data: dict[str, object]) -> None: received.append(str(data.get("chunk", ""))) result = await orch._synthesize_results( lead, "任务", [phase1, phase2], broadcast_callback=callback ) # No chunks were forwarded (stream failed immediately) assert received == [] # Falls back to concatenation assert "结果1" in result["content"] assert "结果2" in result["content"]