test: fix async generator mock for U3 streaming orchestrator #18

Merged
fischer merged 1 commits from fix/async-generator-mock-tests into main 2026-07-02 22:57:16 +08:00
4 changed files with 162 additions and 60 deletions

View File

@ -0,0 +1,71 @@
"""Shared test helpers for experts/ unit tests.
Provides async generator mocks for:
- gateway.chat_stream AsyncMock() returns a coroutine (no __aiter__), causing
`async for chunk in gateway.chat_stream(...)` to fail with
"'async for' requires an object with __aiter__ method, got coroutine".
- agent.execute_stream same issue: MagicMock returns a non-async-iterable,
causing `async for event in agent.execute_stream(...)` to fail.
"""
from __future__ import annotations
from unittest.mock import MagicMock
async def _chat_stream_async_gen(content: str):
"""Async generator yielding a single mock chunk with given content."""
chunk = MagicMock()
chunk.content = content
yield chunk
def make_chat_stream_mock(content: str = "测试流式回复"):
"""Return a MagicMock that yields an async generator when called.
Use to mock gateway.chat_stream so `async for chunk in gateway.chat_stream(...)`
works. AsyncMock() returns a coroutine lacking __aiter__; this helper returns
a real async generator compatible with `async for`.
"""
return MagicMock(side_effect=lambda *a, **kw: _chat_stream_async_gen(content))
async def _execute_stream_async_gen(content: str):
"""Async generator yielding a single final_answer ReActEvent-like mock.
orchestrator._run_agent_steps() iterates `agent.execute_stream(task_msg)`
and dispatches by event.event_type. We yield one final_answer event with
the desired content so the result is `{"content": content}`.
"""
event = MagicMock()
event.event_type = "final_answer"
event.data = {"output": content}
yield event
def make_execute_stream_mock(content: str = "测试流式回复"):
"""Return a MagicMock that yields an async generator when called.
Use to mock agent.execute_stream so `async for event in agent.execute_stream(...)`
works. The yielded event has event_type="final_answer" and data={"output": content},
matching what orchestrator._run_agent_steps() expects.
"""
return MagicMock(side_effect=lambda *a, **kw: _execute_stream_async_gen(content))
async def _execute_stream_raising_async_gen(error: Exception):
"""Async generator that raises `error` immediately when iterated."""
if False: # pragma: no cover — makes this an async generator (PEP 525)
yield
raise error
def make_execute_stream_raising_mock(error: Exception):
"""Return a MagicMock whose side_effect is an async generator raising `error`.
Use to mock agent.execute_stream when phase execution should fail.
orchestrator._run_agent_steps() catches the exception and broadcasts expert_result(error).
"""
return MagicMock(
side_effect=lambda *a, **kw: _execute_stream_raising_async_gen(error)
)

View File

@ -11,6 +11,8 @@ from agentkit.experts.board_orchestrator import BoardOrchestrator
from agentkit.experts.config import ExpertConfig
from agentkit.experts.expert import Expert
from tests.unit.experts._helpers import make_chat_stream_mock
# ── 辅助函数 ──────────────────────────────────────────────
@ -57,6 +59,10 @@ def _make_mock_gateway(response_content: str = "测试回复") -> AsyncMock:
response = MagicMock()
response.content = response_content
gateway.chat = AsyncMock(return_value=response)
# board_orchestrator._stream_expert_speech() calls gateway.chat_stream()
# (async gen). Without this, AsyncMock returns a coroutine lacking __aiter__,
# triggering a RuntimeWarning + falling back to non-streaming path.
gateway.chat_stream = make_chat_stream_mock(response_content)
return gateway

View File

@ -14,6 +14,8 @@ from unittest.mock import AsyncMock, MagicMock
import pytest
from tests.unit.experts._helpers import make_chat_stream_mock, make_execute_stream_mock
from agentkit.core.handoff_transport import InProcessHandoffTransport
from agentkit.core.protocol import TaskResult, TaskStatus
from agentkit.experts.config import ExpertConfig
@ -78,6 +80,8 @@ def _make_mock_expert(
completed_at=None,
)
)
# U3: orchestrator._run_agent_steps() calls agent.execute_stream() (async gen)
mock_agent.execute_stream = make_execute_stream_mock(f"Result from {name}")
expert.agent = mock_agent
return expert
@ -128,6 +132,7 @@ def _make_mock_llm_gateway(
return synth_response
gateway.chat = AsyncMock(side_effect=chat_side_effect)
gateway.chat_stream = make_chat_stream_mock(synthesis_content)
return gateway
@ -494,9 +499,10 @@ class TestCollaborationExecution:
await orchestrator._execute_execution_phase(frontend_phase, plan)
# 验证 frontend 专家的 agent.execute 收到了 collaboration_outputs
# 验证 frontend 专家的 agent.execute_stream 收到了 collaboration_outputs.
# U3: orchestrator calls agent.execute_stream() (not execute())
frontend_expert = team.get_expert("frontend")
task_msg = frontend_expert.agent.execute.call_args.args[0]
task_msg = frontend_expert.agent.execute_stream.call_args.args[0]
assert "collaboration_outputs" in task_msg.input_data
assert "backend" in task_msg.input_data["collaboration_outputs"]
assert "API definition" in task_msg.input_data["collaboration_outputs"]["backend"]
@ -580,9 +586,10 @@ class TestCollaborationExecution:
# 验证正常执行
assert result is not None
# 验证 input_data 中没有 collaboration_outputs
# 验证 input_data 中没有 collaboration_outputs.
# U3: orchestrator calls agent.execute_stream() (not execute())
backend_expert = team.get_expert("backend")
task_msg = backend_expert.agent.execute.call_args.args[0]
task_msg = backend_expert.agent.execute_stream.call_args.args[0]
assert "collaboration_outputs" not in task_msg.input_data
# 验证没有 collaboration_notice 事件
calls = team._handoff_transport.send.call_args_list
@ -859,12 +866,13 @@ class TestPhaseReview:
# 验证 task_description 被附加了返工反馈
assert original_task in phase.task_description
assert "[返工要求]: 请增加单元测试" in phase.task_description
# 验证第二次执行的 task_msg 包含返工反馈
# 验证第二次执行的 task_msg 包含返工反馈.
# U3: orchestrator calls agent.execute_stream() (not execute())
backend_expert = team.get_expert("backend")
# agent.execute 被调用了 2 次1 次初始 + 1 次返工)
assert backend_expert.agent.execute.call_count == 2
# agent.execute_stream 被调用了 2 次1 次初始 + 1 次返工)
assert backend_expert.agent.execute_stream.call_count == 2
# 验证第二次执行的 task_msg 应包含返工反馈
second_call_args = backend_expert.agent.execute.call_args_list[1]
second_call_args = backend_expert.agent.execute_stream.call_args_list[1]
second_task_msg = second_call_args.args[0]
assert "[返工要求]" in second_task_msg.input_data["task"]
@ -912,18 +920,11 @@ class TestRiskFlagging:
"""专家输出包含 [RISK: ...] 时risk_flagged 事件被发出"""
gateway = _make_review_gateway([(True, "")])
team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway)
# 覆盖 backend 专家的输出,包含风险标记
# 覆盖 backend 专家的输出,包含风险标记.
# U3: orchestrator calls agent.execute_stream() (not execute()), so mock that.
backend_expert = team.get_expert("backend")
backend_expert.agent.execute = AsyncMock(
return_value=TaskResult(
task_id="test",
agent_name="backend",
status=TaskStatus.COMPLETED.value,
output_data={"content": "API 实现完成 [RISK: 接口响应时间可能超标]"},
error_message=None,
started_at=None,
completed_at=None,
)
backend_expert.agent.execute_stream = make_execute_stream_mock(
"API 实现完成 [RISK: 接口响应时间可能超标]"
)
orchestrator = TeamOrchestrator(team)
@ -949,16 +950,9 @@ class TestRiskFlagging:
gateway = _make_review_gateway([(True, "")])
team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway)
backend_expert = team.get_expert("backend")
backend_expert.agent.execute = AsyncMock(
return_value=TaskResult(
task_id="test",
agent_name="backend",
status=TaskStatus.COMPLETED.value,
output_data={"content": "完成 [RISK: 安全漏洞风险]"},
error_message=None,
started_at=None,
completed_at=None,
)
# U3: orchestrator calls agent.execute_stream() (not execute())
backend_expert.agent.execute_stream = make_execute_stream_mock(
"完成 [RISK: 安全漏洞风险]"
)
orchestrator = TeamOrchestrator(team)

View File

@ -31,6 +31,12 @@ from agentkit.experts.orchestrator import TeamOrchestrator
from agentkit.experts.plan import PhaseStatus, PhaseType, PlanPhase, PlanStatus, TeamPlan
from agentkit.experts.team import ExpertTeam, TeamStatus
from tests.unit.experts._helpers import (
make_chat_stream_mock,
make_execute_stream_mock,
make_execute_stream_raising_mock,
)
# ── 辅助函数 ──────────────────────────────────────────────
@ -84,6 +90,9 @@ def _make_mock_expert(
started_at=None,
completed_at=None,
))
# U3: orchestrator._run_agent_steps() calls agent.execute_stream() (async gen),
# not agent.execute(). Mock it to yield one final_answer event.
mock_agent.execute_stream = make_execute_stream_mock(f"Result from {name}")
# No LLM gateway by default (tests single-phase path)
mock_agent._llm_gateway = None
expert.agent = mock_agent
@ -146,14 +155,16 @@ def _make_mock_llm_gateway(
response = MagicMock()
response.content = synthesis_content
gateway.chat = AsyncMock(return_value=response)
gateway.chat_stream = make_chat_stream_mock(synthesis_content)
return gateway
def _make_mock_pool() -> MagicMock:
"""创建 mock AgentPool模拟上下文隔离的 agent 创建"""
pool = MagicMock()
pool.create_agent = AsyncMock(side_effect=lambda config: MagicMock(
execute=AsyncMock(return_value=TaskResult(
def _create_agent(config):
m = MagicMock()
m.execute = AsyncMock(return_value=TaskResult(
task_id="test",
agent_name=config.name,
status=TaskStatus.COMPLETED.value,
@ -162,7 +173,10 @@ def _make_mock_pool() -> MagicMock:
started_at=None,
completed_at=None,
))
))
# U3: orchestrator calls agent.execute_stream() (async gen)
m.execute_stream = make_execute_stream_mock(f"Isolated result from {config.name}")
return m
pool.create_agent = AsyncMock(side_effect=_create_agent)
pool.remove_agent = AsyncMock()
return pool
@ -213,14 +227,15 @@ class TestPipelineExecution:
)
team._experts["lead"].agent._llm_gateway = gateway
# Track execution order
# Track execution order — wrap execute_stream (orchestrator calls it, not execute)
execution_order: list[str] = []
for name in ["lead", "member1", "member2"]:
original_execute = team._experts[name].agent.execute
async def tracking_execute(task_msg, _orig=original_execute, _name=name):
original_stream = team._experts[name].agent.execute_stream
async def tracking_stream(task_msg, _orig=original_stream, _name=name):
execution_order.append(_name)
return await _orig(task_msg)
team._experts[name].agent.execute = tracking_execute
async for ev in _orig(task_msg):
yield ev
team._experts[name].agent.execute_stream = tracking_stream
result = await orchestrator.execute("串行任务")
@ -358,9 +373,12 @@ class TestPhaseEvents:
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
# Make all agents fail to trigger phase_failed
# Make all agents fail to trigger phase_failed.
# U3: orchestrator calls agent.execute_stream() (not execute()), so mock that.
for expert in team._experts.values():
expert.agent.execute = AsyncMock(side_effect=RuntimeError("Execution failed"))
expert.agent.execute_stream = make_execute_stream_raising_mock(
RuntimeError("Execution failed")
)
result = await orchestrator.execute("失败任务")
@ -426,6 +444,7 @@ class TestTaskDecomposition:
synth_response = MagicMock()
synth_response.content = "综合结果"
gateway.chat = AsyncMock(side_effect=[bad_response, synth_response])
gateway.chat_stream = make_chat_stream_mock("综合结果")
team._experts["lead"].agent._llm_gateway = gateway
result = await orchestrator.execute("测试任务")
@ -509,14 +528,14 @@ class TestPhaseExecution:
@pytest.mark.asyncio
async def test_phase_execution_calls_agent_execute(self):
"""阶段执行调用 agent.execute()"""
"""阶段执行调用 agent.execute_stream()U3: 流式执行)"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
await orchestrator.execute("测试任务")
# Lead expert's agent should have been called
team._experts["lead"].agent.execute.assert_awaited()
# U3: orchestrator calls agent.execute_stream() (not execute())
team._experts["lead"].agent.execute_stream.assert_called_once()
@pytest.mark.asyncio
async def test_phase_marks_completed(self):
@ -600,9 +619,10 @@ class TestPhaseExecution:
result = await orchestrator.execute("测试任务")
# Should still complete successfully using expert's existing agent
# Should still complete successfully using expert's existing agent.
# U3: orchestrator calls agent.execute_stream() (not execute())
assert result["status"] == "completed"
team._experts["lead"].agent.execute.assert_awaited()
team._experts["lead"].agent.execute_stream.assert_called_once()
# ── 阶段失败与依赖传播测试 ────────────────────────────────
@ -626,8 +646,11 @@ class TestPhaseFailure:
)
team._experts["lead"].agent._llm_gateway = gateway
# Make member1's agent fail (phase B)
team._experts["member1"].agent.execute = AsyncMock(side_effect=RuntimeError("B failed"))
# Make member1's agent fail (phase B).
# U3: orchestrator calls agent.execute_stream() (not execute())
team._experts["member1"].agent.execute_stream = make_execute_stream_raising_mock(
RuntimeError("B failed")
)
result = await orchestrator.execute("失败传播任务")
@ -650,9 +673,12 @@ class TestPhaseFailure:
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
# Make all agents fail
# Make all agents fail.
# U3: orchestrator calls agent.execute_stream() (not execute())
for expert in team._experts.values():
expert.agent.execute = AsyncMock(side_effect=RuntimeError("Execution failed"))
expert.agent.execute_stream = make_execute_stream_raising_mock(
RuntimeError("Execution failed")
)
result = await orchestrator.execute("全失败任务")
@ -665,16 +691,15 @@ class TestPhaseFailure:
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
call_count = 0
async def mock_execute(task_msg):
nonlocal call_count
call_count += 1
if task_msg.task_type == "team_phase":
raise RuntimeError("Phase failed")
# Fallback succeeds
return TaskResult(
task_id=task_msg.task_id,
# U3: orchestrator calls agent.execute_stream() for phases (not execute()).
# Fallback path (_fallback_to_single_agent) still calls agent.execute().
# Mock execute_stream to raise (phase fails), execute to succeed (fallback).
team._experts["lead"].agent.execute_stream = make_execute_stream_raising_mock(
RuntimeError("Phase failed")
)
team._experts["lead"].agent.execute = AsyncMock(
return_value=TaskResult(
task_id="fallback",
agent_name="lead",
status=TaskStatus.COMPLETED.value,
output_data={"content": "Fallback result"},
@ -682,8 +707,7 @@ class TestPhaseFailure:
started_at=None,
completed_at=None,
)
team._experts["lead"].agent.execute = AsyncMock(side_effect=mock_execute)
)
result = await orchestrator.execute("测试任务")
@ -802,6 +826,13 @@ class TestResultSynthesis:
raise RuntimeError("LLM unavailable")
gateway.chat = AsyncMock(side_effect=chat_side_effect)
# U3: synthesizer tries chat_stream first (broadcast_callback is set).
# Make it raise so synthesizer catches and falls back to concatenation,
# matching the test intent ("without LLM").
async def _stream_raise(*a, **kw):
raise RuntimeError("LLM unavailable")
yield # async gen marker
gateway.chat_stream = MagicMock(side_effect=_stream_raise)
team._experts["lead"].agent._llm_gateway = gateway
result = await orchestrator.execute("复杂任务")