fischer-agentkit/tests/unit/experts/_helpers.py

72 lines
2.7 KiB
Python

"""Shared test helpers for experts/ unit tests.
Provides async generator mocks for:
- gateway.chat_stream — AsyncMock() returns a coroutine (no __aiter__), causing
`async for chunk in gateway.chat_stream(...)` to fail with
"'async for' requires an object with __aiter__ method, got coroutine".
- agent.execute_stream — same issue: MagicMock returns a non-async-iterable,
causing `async for event in agent.execute_stream(...)` to fail.
"""
from __future__ import annotations
from unittest.mock import MagicMock
async def _chat_stream_async_gen(content: str):
"""Async generator yielding a single mock chunk with given content."""
chunk = MagicMock()
chunk.content = content
yield chunk
def make_chat_stream_mock(content: str = "测试流式回复"):
"""Return a MagicMock that yields an async generator when called.
Use to mock gateway.chat_stream so `async for chunk in gateway.chat_stream(...)`
works. AsyncMock() returns a coroutine lacking __aiter__; this helper returns
a real async generator compatible with `async for`.
"""
return MagicMock(side_effect=lambda *a, **kw: _chat_stream_async_gen(content))
async def _execute_stream_async_gen(content: str):
"""Async generator yielding a single final_answer ReActEvent-like mock.
orchestrator._run_agent_steps() iterates `agent.execute_stream(task_msg)`
and dispatches by event.event_type. We yield one final_answer event with
the desired content so the result is `{"content": content}`.
"""
event = MagicMock()
event.event_type = "final_answer"
event.data = {"output": content}
yield event
def make_execute_stream_mock(content: str = "测试流式回复"):
"""Return a MagicMock that yields an async generator when called.
Use to mock agent.execute_stream so `async for event in agent.execute_stream(...)`
works. The yielded event has event_type="final_answer" and data={"output": content},
matching what orchestrator._run_agent_steps() expects.
"""
return MagicMock(side_effect=lambda *a, **kw: _execute_stream_async_gen(content))
async def _execute_stream_raising_async_gen(error: Exception):
"""Async generator that raises `error` immediately when iterated."""
if False: # pragma: no cover — makes this an async generator (PEP 525)
yield
raise error
def make_execute_stream_raising_mock(error: Exception):
"""Return a MagicMock whose side_effect is an async generator raising `error`.
Use to mock agent.execute_stream when phase execution should fail.
orchestrator._run_agent_steps() catches the exception and broadcasts expert_result(error).
"""
return MagicMock(
side_effect=lambda *a, **kw: _execute_stream_raising_async_gen(error)
)