From 1599d193c78cda00e9ad051165323db83c70af0e Mon Sep 17 00:00:00 2001 From: chiguyong Date: Thu, 2 Jul 2026 22:52:10 +0800 Subject: [PATCH] test: fix async generator mock for U3 streaming orchestrator U3 streaming refactor switched orchestrator from agent.execute() to agent.execute_stream() (async gen), but tests still mocked execute(). AsyncMock() returns a coroutine lacking __aiter__, causing: - 'async for' requires an object with __aiter__ method, got coroutine - RuntimeWarning: coroutine was never awaited Add shared helpers in tests/unit/experts/_helpers.py: - make_chat_stream_mock: async gen for gateway.chat_stream - make_execute_stream_mock: async gen yielding final_answer event - make_execute_stream_raising_mock: async gen that raises (for failure tests) Update 3 test files to use the helpers: - test_team_orchestrator.py: _make_mock_expert, _make_mock_pool, failure tests (phase_failed, all_phases_fail, fallback_uses_lead, phase_failure_marks_dependents), assertion updates (execute_stream instead of execute), synthesizer warning cleanup - test_pm_collaboration.py: _make_mock_expert, _make_mock_llm_gateway, collaboration/risk/rework assertions - test_board_orchestrator.py: _make_mock_gateway (warning cleanup) All 483 experts/ tests pass with 0 warnings. --- tests/unit/experts/_helpers.py | 71 ++++++++++++++ tests/unit/experts/test_board_orchestrator.py | 6 ++ tests/unit/experts/test_pm_collaboration.py | 52 +++++------ tests/unit/experts/test_team_orchestrator.py | 93 ++++++++++++------- 4 files changed, 162 insertions(+), 60 deletions(-) create mode 100644 tests/unit/experts/_helpers.py diff --git a/tests/unit/experts/_helpers.py b/tests/unit/experts/_helpers.py new file mode 100644 index 0000000..10c66d8 --- /dev/null +++ b/tests/unit/experts/_helpers.py @@ -0,0 +1,71 @@ +"""Shared test helpers for experts/ unit tests. + +Provides async generator mocks for: +- gateway.chat_stream — AsyncMock() returns a coroutine (no __aiter__), causing + `async for chunk in gateway.chat_stream(...)` to fail with + "'async for' requires an object with __aiter__ method, got coroutine". +- agent.execute_stream — same issue: MagicMock returns a non-async-iterable, + causing `async for event in agent.execute_stream(...)` to fail. +""" + +from __future__ import annotations + +from unittest.mock import MagicMock + + +async def _chat_stream_async_gen(content: str): + """Async generator yielding a single mock chunk with given content.""" + chunk = MagicMock() + chunk.content = content + yield chunk + + +def make_chat_stream_mock(content: str = "测试流式回复"): + """Return a MagicMock that yields an async generator when called. + + Use to mock gateway.chat_stream so `async for chunk in gateway.chat_stream(...)` + works. AsyncMock() returns a coroutine lacking __aiter__; this helper returns + a real async generator compatible with `async for`. + """ + return MagicMock(side_effect=lambda *a, **kw: _chat_stream_async_gen(content)) + + +async def _execute_stream_async_gen(content: str): + """Async generator yielding a single final_answer ReActEvent-like mock. + + orchestrator._run_agent_steps() iterates `agent.execute_stream(task_msg)` + and dispatches by event.event_type. We yield one final_answer event with + the desired content so the result is `{"content": content}`. + """ + event = MagicMock() + event.event_type = "final_answer" + event.data = {"output": content} + yield event + + +def make_execute_stream_mock(content: str = "测试流式回复"): + """Return a MagicMock that yields an async generator when called. + + Use to mock agent.execute_stream so `async for event in agent.execute_stream(...)` + works. The yielded event has event_type="final_answer" and data={"output": content}, + matching what orchestrator._run_agent_steps() expects. + """ + return MagicMock(side_effect=lambda *a, **kw: _execute_stream_async_gen(content)) + + +async def _execute_stream_raising_async_gen(error: Exception): + """Async generator that raises `error` immediately when iterated.""" + if False: # pragma: no cover — makes this an async generator (PEP 525) + yield + raise error + + +def make_execute_stream_raising_mock(error: Exception): + """Return a MagicMock whose side_effect is an async generator raising `error`. + + Use to mock agent.execute_stream when phase execution should fail. + orchestrator._run_agent_steps() catches the exception and broadcasts expert_result(error). + """ + return MagicMock( + side_effect=lambda *a, **kw: _execute_stream_raising_async_gen(error) + ) diff --git a/tests/unit/experts/test_board_orchestrator.py b/tests/unit/experts/test_board_orchestrator.py index 5b0e247..21ad855 100644 --- a/tests/unit/experts/test_board_orchestrator.py +++ b/tests/unit/experts/test_board_orchestrator.py @@ -11,6 +11,8 @@ from agentkit.experts.board_orchestrator import BoardOrchestrator from agentkit.experts.config import ExpertConfig from agentkit.experts.expert import Expert +from tests.unit.experts._helpers import make_chat_stream_mock + # ── 辅助函数 ────────────────────────────────────────────── @@ -57,6 +59,10 @@ def _make_mock_gateway(response_content: str = "测试回复") -> AsyncMock: response = MagicMock() response.content = response_content gateway.chat = AsyncMock(return_value=response) + # board_orchestrator._stream_expert_speech() calls gateway.chat_stream() + # (async gen). Without this, AsyncMock returns a coroutine lacking __aiter__, + # triggering a RuntimeWarning + falling back to non-streaming path. + gateway.chat_stream = make_chat_stream_mock(response_content) return gateway diff --git a/tests/unit/experts/test_pm_collaboration.py b/tests/unit/experts/test_pm_collaboration.py index b602bac..196e69e 100644 --- a/tests/unit/experts/test_pm_collaboration.py +++ b/tests/unit/experts/test_pm_collaboration.py @@ -14,6 +14,8 @@ from unittest.mock import AsyncMock, MagicMock import pytest +from tests.unit.experts._helpers import make_chat_stream_mock, make_execute_stream_mock + from agentkit.core.handoff_transport import InProcessHandoffTransport from agentkit.core.protocol import TaskResult, TaskStatus from agentkit.experts.config import ExpertConfig @@ -78,6 +80,8 @@ def _make_mock_expert( completed_at=None, ) ) + # U3: orchestrator._run_agent_steps() calls agent.execute_stream() (async gen) + mock_agent.execute_stream = make_execute_stream_mock(f"Result from {name}") expert.agent = mock_agent return expert @@ -128,6 +132,7 @@ def _make_mock_llm_gateway( return synth_response gateway.chat = AsyncMock(side_effect=chat_side_effect) + gateway.chat_stream = make_chat_stream_mock(synthesis_content) return gateway @@ -494,9 +499,10 @@ class TestCollaborationExecution: await orchestrator._execute_execution_phase(frontend_phase, plan) - # 验证 frontend 专家的 agent.execute 收到了 collaboration_outputs + # 验证 frontend 专家的 agent.execute_stream 收到了 collaboration_outputs. + # U3: orchestrator calls agent.execute_stream() (not execute()) frontend_expert = team.get_expert("frontend") - task_msg = frontend_expert.agent.execute.call_args.args[0] + task_msg = frontend_expert.agent.execute_stream.call_args.args[0] assert "collaboration_outputs" in task_msg.input_data assert "backend" in task_msg.input_data["collaboration_outputs"] assert "API definition" in task_msg.input_data["collaboration_outputs"]["backend"] @@ -580,9 +586,10 @@ class TestCollaborationExecution: # 验证正常执行 assert result is not None - # 验证 input_data 中没有 collaboration_outputs + # 验证 input_data 中没有 collaboration_outputs. + # U3: orchestrator calls agent.execute_stream() (not execute()) backend_expert = team.get_expert("backend") - task_msg = backend_expert.agent.execute.call_args.args[0] + task_msg = backend_expert.agent.execute_stream.call_args.args[0] assert "collaboration_outputs" not in task_msg.input_data # 验证没有 collaboration_notice 事件 calls = team._handoff_transport.send.call_args_list @@ -859,12 +866,13 @@ class TestPhaseReview: # 验证 task_description 被附加了返工反馈 assert original_task in phase.task_description assert "[返工要求]: 请增加单元测试" in phase.task_description - # 验证第二次执行的 task_msg 包含返工反馈 + # 验证第二次执行的 task_msg 包含返工反馈. + # U3: orchestrator calls agent.execute_stream() (not execute()) backend_expert = team.get_expert("backend") - # agent.execute 被调用了 2 次(1 次初始 + 1 次返工) - assert backend_expert.agent.execute.call_count == 2 + # agent.execute_stream 被调用了 2 次(1 次初始 + 1 次返工) + assert backend_expert.agent.execute_stream.call_count == 2 # 验证第二次执行的 task_msg 应包含返工反馈 - second_call_args = backend_expert.agent.execute.call_args_list[1] + second_call_args = backend_expert.agent.execute_stream.call_args_list[1] second_task_msg = second_call_args.args[0] assert "[返工要求]" in second_task_msg.input_data["task"] @@ -912,18 +920,11 @@ class TestRiskFlagging: """专家输出包含 [RISK: ...] 时,risk_flagged 事件被发出""" gateway = _make_review_gateway([(True, "")]) team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway) - # 覆盖 backend 专家的输出,包含风险标记 + # 覆盖 backend 专家的输出,包含风险标记. + # U3: orchestrator calls agent.execute_stream() (not execute()), so mock that. backend_expert = team.get_expert("backend") - backend_expert.agent.execute = AsyncMock( - return_value=TaskResult( - task_id="test", - agent_name="backend", - status=TaskStatus.COMPLETED.value, - output_data={"content": "API 实现完成 [RISK: 接口响应时间可能超标]"}, - error_message=None, - started_at=None, - completed_at=None, - ) + backend_expert.agent.execute_stream = make_execute_stream_mock( + "API 实现完成 [RISK: 接口响应时间可能超标]" ) orchestrator = TeamOrchestrator(team) @@ -949,16 +950,9 @@ class TestRiskFlagging: gateway = _make_review_gateway([(True, "")]) team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway) backend_expert = team.get_expert("backend") - backend_expert.agent.execute = AsyncMock( - return_value=TaskResult( - task_id="test", - agent_name="backend", - status=TaskStatus.COMPLETED.value, - output_data={"content": "完成 [RISK: 安全漏洞风险]"}, - error_message=None, - started_at=None, - completed_at=None, - ) + # U3: orchestrator calls agent.execute_stream() (not execute()) + backend_expert.agent.execute_stream = make_execute_stream_mock( + "完成 [RISK: 安全漏洞风险]" ) orchestrator = TeamOrchestrator(team) diff --git a/tests/unit/experts/test_team_orchestrator.py b/tests/unit/experts/test_team_orchestrator.py index 4eaa8a8..a21c2c2 100644 --- a/tests/unit/experts/test_team_orchestrator.py +++ b/tests/unit/experts/test_team_orchestrator.py @@ -31,6 +31,12 @@ from agentkit.experts.orchestrator import TeamOrchestrator from agentkit.experts.plan import PhaseStatus, PhaseType, PlanPhase, PlanStatus, TeamPlan from agentkit.experts.team import ExpertTeam, TeamStatus +from tests.unit.experts._helpers import ( + make_chat_stream_mock, + make_execute_stream_mock, + make_execute_stream_raising_mock, +) + # ── 辅助函数 ────────────────────────────────────────────── @@ -84,6 +90,9 @@ def _make_mock_expert( started_at=None, completed_at=None, )) + # U3: orchestrator._run_agent_steps() calls agent.execute_stream() (async gen), + # not agent.execute(). Mock it to yield one final_answer event. + mock_agent.execute_stream = make_execute_stream_mock(f"Result from {name}") # No LLM gateway by default (tests single-phase path) mock_agent._llm_gateway = None expert.agent = mock_agent @@ -146,14 +155,16 @@ def _make_mock_llm_gateway( response = MagicMock() response.content = synthesis_content gateway.chat = AsyncMock(return_value=response) + gateway.chat_stream = make_chat_stream_mock(synthesis_content) return gateway def _make_mock_pool() -> MagicMock: """创建 mock AgentPool,模拟上下文隔离的 agent 创建""" pool = MagicMock() - pool.create_agent = AsyncMock(side_effect=lambda config: MagicMock( - execute=AsyncMock(return_value=TaskResult( + def _create_agent(config): + m = MagicMock() + m.execute = AsyncMock(return_value=TaskResult( task_id="test", agent_name=config.name, status=TaskStatus.COMPLETED.value, @@ -162,7 +173,10 @@ def _make_mock_pool() -> MagicMock: started_at=None, completed_at=None, )) - )) + # U3: orchestrator calls agent.execute_stream() (async gen) + m.execute_stream = make_execute_stream_mock(f"Isolated result from {config.name}") + return m + pool.create_agent = AsyncMock(side_effect=_create_agent) pool.remove_agent = AsyncMock() return pool @@ -213,14 +227,15 @@ class TestPipelineExecution: ) team._experts["lead"].agent._llm_gateway = gateway - # Track execution order + # Track execution order — wrap execute_stream (orchestrator calls it, not execute) execution_order: list[str] = [] for name in ["lead", "member1", "member2"]: - original_execute = team._experts[name].agent.execute - async def tracking_execute(task_msg, _orig=original_execute, _name=name): + original_stream = team._experts[name].agent.execute_stream + async def tracking_stream(task_msg, _orig=original_stream, _name=name): execution_order.append(_name) - return await _orig(task_msg) - team._experts[name].agent.execute = tracking_execute + async for ev in _orig(task_msg): + yield ev + team._experts[name].agent.execute_stream = tracking_stream result = await orchestrator.execute("串行任务") @@ -358,9 +373,12 @@ class TestPhaseEvents: team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) - # Make all agents fail to trigger phase_failed + # Make all agents fail to trigger phase_failed. + # U3: orchestrator calls agent.execute_stream() (not execute()), so mock that. for expert in team._experts.values(): - expert.agent.execute = AsyncMock(side_effect=RuntimeError("Execution failed")) + expert.agent.execute_stream = make_execute_stream_raising_mock( + RuntimeError("Execution failed") + ) result = await orchestrator.execute("失败任务") @@ -426,6 +444,7 @@ class TestTaskDecomposition: synth_response = MagicMock() synth_response.content = "综合结果" gateway.chat = AsyncMock(side_effect=[bad_response, synth_response]) + gateway.chat_stream = make_chat_stream_mock("综合结果") team._experts["lead"].agent._llm_gateway = gateway result = await orchestrator.execute("测试任务") @@ -509,14 +528,14 @@ class TestPhaseExecution: @pytest.mark.asyncio async def test_phase_execution_calls_agent_execute(self): - """阶段执行调用 agent.execute()""" + """阶段执行调用 agent.execute_stream()(U3: 流式执行)""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) await orchestrator.execute("测试任务") - # Lead expert's agent should have been called - team._experts["lead"].agent.execute.assert_awaited() + # U3: orchestrator calls agent.execute_stream() (not execute()) + team._experts["lead"].agent.execute_stream.assert_called_once() @pytest.mark.asyncio async def test_phase_marks_completed(self): @@ -600,9 +619,10 @@ class TestPhaseExecution: result = await orchestrator.execute("测试任务") - # Should still complete successfully using expert's existing agent + # Should still complete successfully using expert's existing agent. + # U3: orchestrator calls agent.execute_stream() (not execute()) assert result["status"] == "completed" - team._experts["lead"].agent.execute.assert_awaited() + team._experts["lead"].agent.execute_stream.assert_called_once() # ── 阶段失败与依赖传播测试 ──────────────────────────────── @@ -626,8 +646,11 @@ class TestPhaseFailure: ) team._experts["lead"].agent._llm_gateway = gateway - # Make member1's agent fail (phase B) - team._experts["member1"].agent.execute = AsyncMock(side_effect=RuntimeError("B failed")) + # Make member1's agent fail (phase B). + # U3: orchestrator calls agent.execute_stream() (not execute()) + team._experts["member1"].agent.execute_stream = make_execute_stream_raising_mock( + RuntimeError("B failed") + ) result = await orchestrator.execute("失败传播任务") @@ -650,9 +673,12 @@ class TestPhaseFailure: team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) - # Make all agents fail + # Make all agents fail. + # U3: orchestrator calls agent.execute_stream() (not execute()) for expert in team._experts.values(): - expert.agent.execute = AsyncMock(side_effect=RuntimeError("Execution failed")) + expert.agent.execute_stream = make_execute_stream_raising_mock( + RuntimeError("Execution failed") + ) result = await orchestrator.execute("全失败任务") @@ -665,16 +691,15 @@ class TestPhaseFailure: team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) - call_count = 0 - - async def mock_execute(task_msg): - nonlocal call_count - call_count += 1 - if task_msg.task_type == "team_phase": - raise RuntimeError("Phase failed") - # Fallback succeeds - return TaskResult( - task_id=task_msg.task_id, + # U3: orchestrator calls agent.execute_stream() for phases (not execute()). + # Fallback path (_fallback_to_single_agent) still calls agent.execute(). + # Mock execute_stream to raise (phase fails), execute to succeed (fallback). + team._experts["lead"].agent.execute_stream = make_execute_stream_raising_mock( + RuntimeError("Phase failed") + ) + team._experts["lead"].agent.execute = AsyncMock( + return_value=TaskResult( + task_id="fallback", agent_name="lead", status=TaskStatus.COMPLETED.value, output_data={"content": "Fallback result"}, @@ -682,8 +707,7 @@ class TestPhaseFailure: started_at=None, completed_at=None, ) - - team._experts["lead"].agent.execute = AsyncMock(side_effect=mock_execute) + ) result = await orchestrator.execute("测试任务") @@ -802,6 +826,13 @@ class TestResultSynthesis: raise RuntimeError("LLM unavailable") gateway.chat = AsyncMock(side_effect=chat_side_effect) + # U3: synthesizer tries chat_stream first (broadcast_callback is set). + # Make it raise so synthesizer catches and falls back to concatenation, + # matching the test intent ("without LLM"). + async def _stream_raise(*a, **kw): + raise RuntimeError("LLM unavailable") + yield # async gen marker + gateway.chat_stream = MagicMock(side_effect=_stream_raise) team._experts["lead"].agent._llm_gateway = gateway result = await orchestrator.execute("复杂任务")