"""Shared test helpers for experts/ unit tests. Provides async generator mocks for: - gateway.chat_stream — AsyncMock() returns a coroutine (no __aiter__), causing `async for chunk in gateway.chat_stream(...)` to fail with "'async for' requires an object with __aiter__ method, got coroutine". - agent.execute_stream — same issue: MagicMock returns a non-async-iterable, causing `async for event in agent.execute_stream(...)` to fail. """ from __future__ import annotations from unittest.mock import MagicMock async def _chat_stream_async_gen(content: str): """Async generator yielding a single mock chunk with given content.""" chunk = MagicMock() chunk.content = content yield chunk def make_chat_stream_mock(content: str = "测试流式回复"): """Return a MagicMock that yields an async generator when called. Use to mock gateway.chat_stream so `async for chunk in gateway.chat_stream(...)` works. AsyncMock() returns a coroutine lacking __aiter__; this helper returns a real async generator compatible with `async for`. """ return MagicMock(side_effect=lambda *a, **kw: _chat_stream_async_gen(content)) async def _execute_stream_async_gen(content: str): """Async generator yielding a single final_answer ReActEvent-like mock. orchestrator._run_agent_steps() iterates `agent.execute_stream(task_msg)` and dispatches by event.event_type. We yield one final_answer event with the desired content so the result is `{"content": content}`. """ event = MagicMock() event.event_type = "final_answer" event.data = {"output": content} yield event def make_execute_stream_mock(content: str = "测试流式回复"): """Return a MagicMock that yields an async generator when called. Use to mock agent.execute_stream so `async for event in agent.execute_stream(...)` works. The yielded event has event_type="final_answer" and data={"output": content}, matching what orchestrator._run_agent_steps() expects. """ return MagicMock(side_effect=lambda *a, **kw: _execute_stream_async_gen(content)) async def _execute_stream_raising_async_gen(error: Exception): """Async generator that raises `error` immediately when iterated.""" if False: # pragma: no cover — makes this an async generator (PEP 525) yield raise error def make_execute_stream_raising_mock(error: Exception): """Return a MagicMock whose side_effect is an async generator raising `error`. Use to mock agent.execute_stream when phase execution should fail. orchestrator._run_agent_steps() catches the exception and broadcasts expert_result(error). """ return MagicMock( side_effect=lambda *a, **kw: _execute_stream_raising_async_gen(error) )