fischer-agentkit/tests/unit/experts/test_phase_executor_streami...

576 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""PhaseExecutor streaming tests (U3)
Tests streaming execution in _run_agent_steps:
- token/final_answer events forwarded as expert_result_chunk
- expert_result(completed) broadcast after stream completes
- expert_result(error) broadcast on mid-stream exception
- retry contract: expert_result_chunk_reset before retry
- TeamOrchestrator synthesis streams team_synthesis_chunk
"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock
import pytest
from agentkit.core.handoff_transport import InProcessHandoffTransport
from agentkit.core.react import ReActEvent
from agentkit.experts._review_gate import ReviewResult
from agentkit.experts.config import ExpertConfig
from agentkit.experts.expert import Expert
from agentkit.experts.orchestrator import TeamOrchestrator
from agentkit.experts.plan import PlanPhase, TeamPlan
from agentkit.experts.team import ExpertTeam
# ── 辅助函数 ──────────────────────────────────────────────
def _make_expert_config(name: str = "test_expert", color: str = "#1890ff") -> ExpertConfig:
return ExpertConfig(
name=name,
agent_type="expert",
persona="测试专家",
thinking_style="逻辑推理",
bound_skills=["skill_a"],
color=color,
prompt={"system": "你是测试专家"}, # ponytail: llm_generate 模式校验要求 prompt
)
def _make_mock_expert(name: str = "test_expert", color: str = "#1890ff") -> MagicMock:
config = _make_expert_config(name=name, color=color)
expert = MagicMock(spec=Expert)
expert.config = config
expert.is_active = True
return expert
def _make_stream_agent(events: list[ReActEvent]) -> MagicMock:
"""Create a mock agent whose execute_stream yields the given ReActEvents."""
async def _execute_stream(task):
for e in events:
yield e
agent = MagicMock()
agent.execute_stream = _execute_stream
return agent
def _make_error_stream_agent(
events_before_error: list[ReActEvent], error: Exception
) -> MagicMock:
"""Agent that yields some events then raises an error."""
async def _execute_stream(task):
for e in events_before_error:
yield e
raise error
agent = MagicMock()
agent.execute_stream = _execute_stream
return agent
def _make_retry_stream_agent(
fail_events: list[ReActEvent],
success_events: list[ReActEvent],
error: Exception,
) -> MagicMock:
"""Agent that fails on first call (after yielding fail_events), succeeds on second."""
call_count = [0]
async def _execute_stream(task):
call_count[0] += 1
if call_count[0] == 1:
for e in fail_events:
yield e
raise error
for e in success_events:
yield e
agent = MagicMock()
agent.execute_stream = _execute_stream
return agent
def _make_phase(
phase_id: str = "phase_1",
name: str = "Test Phase",
expert_name: str = "test_expert",
depends_on: list[str] | None = None,
) -> PlanPhase:
return PlanPhase(
id=phase_id,
name=name,
assigned_expert=expert_name,
task_description="完成测试任务",
depends_on=depends_on or [],
)
def _make_orchestrator_for_streaming() -> TeamOrchestrator:
"""Create a TeamOrchestrator with mocked broadcast + review for _run_agent_steps tests."""
team = ExpertTeam()
team._handoff_transport = MagicMock(spec=InProcessHandoffTransport)
orchestrator = TeamOrchestrator(team)
orchestrator._broadcast_event = AsyncMock()
orchestrator._review_phase_output = AsyncMock(
return_value=ReviewResult(passed=True, degraded=False, feedback="")
)
return orchestrator
def _make_simple_plan() -> TeamPlan:
"""Create a minimal TeamPlan with no phases (not accessed when deps/contracts empty)."""
plan = MagicMock(spec=TeamPlan)
plan.id = "test_plan"
plan.phases = []
return plan
# ── 流式 token/final_answer 转发测试 ──────────────────────
class TestStreamEventForwarding:
"""execute_stream 事件转发到 _broadcast_event。"""
@pytest.mark.asyncio
async def test_token_events_forwarded_as_expert_result_chunk(self):
"""execute_stream 产出 token 事件时_broadcast_event 转发 expert_result_chunk。"""
events = [
ReActEvent(event_type="token", step=0, data={"content": "Hello"}),
ReActEvent(event_type="token", step=0, data={"content": " World"}),
]
agent = _make_stream_agent(events)
expert = _make_mock_expert()
lead = _make_mock_expert(name="lead")
phase = _make_phase()
plan = _make_simple_plan()
orch = _make_orchestrator_for_streaming()
await orch._run_agent_steps(expert, agent, lead, phase, plan)
# Verify expert_result_chunk broadcasts for tokens
chunk_calls = [
c for c in orch._broadcast_event.call_args_list if c.args[0] == "expert_result_chunk"
]
assert len(chunk_calls) == 2
assert chunk_calls[0].args[1]["content"] == "Hello"
assert chunk_calls[1].args[1]["content"] == " World"
assert chunk_calls[0].args[1]["expert_id"] == "test_expert"
@pytest.mark.asyncio
async def test_final_answer_not_forwarded_as_chunk(self):
"""final_answer 仅作完成信号,不转发为 expert_result_chunk避免与 token 双重累积)。
无 token 时 final_answer output 作为兜底内容累积到 result但不广播 chunk。
"""
events = [
ReActEvent(event_type="final_answer", step=0, data={"output": "最终结果"}),
]
agent = _make_stream_agent(events)
expert = _make_mock_expert()
lead = _make_mock_expert(name="lead")
phase = _make_phase()
plan = _make_simple_plan()
orch = _make_orchestrator_for_streaming()
result, _, passed, _, _ = await orch._run_agent_steps(
expert, agent, lead, phase, plan
)
chunk_calls = [
c for c in orch._broadcast_event.call_args_list if c.args[0] == "expert_result_chunk"
]
assert len(chunk_calls) == 0
# output still reaches expert_result(completed) via fallback accumulation
assert result["content"] == "最终结果"
assert passed is True
@pytest.mark.asyncio
async def test_thinking_events_forwarded_as_expert_step(self):
"""execute_stream 产出 thinking 事件时_broadcast_event 转发 expert_step。"""
events = [
ReActEvent(event_type="thinking", step=0, data={"content": "思考中..."}),
ReActEvent(event_type="token", step=0, data={"content": "结果"}),
]
agent = _make_stream_agent(events)
expert = _make_mock_expert()
lead = _make_mock_expert(name="lead")
phase = _make_phase()
plan = _make_simple_plan()
orch = _make_orchestrator_for_streaming()
await orch._run_agent_steps(expert, agent, lead, phase, plan)
step_calls = [
c
for c in orch._broadcast_event.call_args_list
if c.args[0] == "expert_step" and "thinking" in c.args[1]
]
assert len(step_calls) == 1
assert step_calls[0].args[1]["thinking"] == "思考中..."
# ── expert_result 终结事件测试 ────────────────────────────
class TestExpertResultTermination:
"""流式会话必须以 expert_result(completed) 或 expert_result(error) 终结。"""
@pytest.mark.asyncio
async def test_expert_result_completed_after_stream(self):
"""循环结束后广播完整 expert_result 事件status=completed。
ReActEngine 合约token 事件(增量)+ final_answer全文
final_answer 仅作完成信号,不重复累积 — 避免内容翻倍。
"""
events = [
ReActEvent(event_type="token", step=0, data={"content": "Hel"}),
ReActEvent(event_type="token", step=0, data={"content": "lo"}),
ReActEvent(event_type="final_answer", step=0, data={"output": "Hello"}),
]
agent = _make_stream_agent(events)
expert = _make_mock_expert()
lead = _make_mock_expert(name="lead")
phase = _make_phase()
plan = _make_simple_plan()
orch = _make_orchestrator_for_streaming()
result, last_error, passed, feedback, degraded = await orch._run_agent_steps(
expert, agent, lead, phase, plan
)
# Verify expert_result(completed) broadcast
result_calls = [
c
for c in orch._broadcast_event.call_args_list
if c.args[0] == "expert_result" and c.args[1].get("status") == "completed"
]
assert len(result_calls) == 1
# Content is token-accumulated only — final_answer must not double it
assert result_calls[0].args[1]["content"] == "Hello"
assert result_calls[0].args[1]["expert_id"] == "test_expert"
assert result["content"] == "Hello"
assert passed is True
@pytest.mark.asyncio
async def test_final_answer_fallback_when_no_tokens(self):
"""无 token 事件时(如 _wrap_sync_as_stream fallbackfinal_answer output 作为兜底。"""
events = [
ReActEvent(event_type="final_answer", step=0, data={"output": "Fallback"}),
]
agent = _make_stream_agent(events)
expert = _make_mock_expert()
lead = _make_mock_expert(name="lead")
phase = _make_phase()
plan = _make_simple_plan()
orch = _make_orchestrator_for_streaming()
result, last_error, passed, feedback, degraded = await orch._run_agent_steps(
expert, agent, lead, phase, plan
)
assert result["content"] == "Fallback"
assert passed is True
@pytest.mark.asyncio
async def test_streaming_always_terminates_with_result_event(self):
"""即使 execute_stream 无事件产出,也必须广播 expert_result(completed)。"""
events = []
agent = _make_stream_agent(events)
expert = _make_mock_expert()
lead = _make_mock_expert(name="lead")
phase = _make_phase()
plan = _make_simple_plan()
orch = _make_orchestrator_for_streaming()
await orch._run_agent_steps(expert, agent, lead, phase, plan)
result_calls = [
c for c in orch._broadcast_event.call_args_list if c.args[0] == "expert_result"
]
assert len(result_calls) == 1
assert result_calls[0].args[1]["status"] == "completed"
# ── 异常处理测试 ──────────────────────────────────────────
class TestStreamExceptionHandling:
"""execute_stream 异常时广播 expert_result(error) 并携带已累积内容。"""
@pytest.mark.asyncio
async def test_mid_stream_exception_broadcasts_error_with_accumulated(self):
"""execute_stream 中途抛出异常时,广播 expert_result(error) 携带已累积内容。"""
events_before_error = [
ReActEvent(event_type="token", step=0, data={"content": "部分"}),
ReActEvent(event_type="token", step=0, data={"content": "内容"}),
]
agent = _make_error_stream_agent(events_before_error, RuntimeError("LLM exploded"))
expert = _make_mock_expert()
lead = _make_mock_expert(name="lead")
phase = _make_phase()
plan = _make_simple_plan()
orch = _make_orchestrator_for_streaming()
# MAX_RETRIES=1, so after 2 attempts (1 initial + 1 retry), it raises
with pytest.raises(RuntimeError, match="LLM exploded"):
await orch._run_agent_steps(expert, agent, lead, phase, plan)
# Verify expert_result(error) was broadcast with accumulated content
error_result_calls = [
c
for c in orch._broadcast_event.call_args_list
if c.args[0] == "expert_result" and c.args[1].get("status") == "error"
]
assert len(error_result_calls) >= 1
# The last error broadcast should have the accumulated content from the final attempt
last_error_call = error_result_calls[-1]
assert last_error_call.args[1]["error"] == "LLM exploded"
# Content should be the accumulated partial content
assert "部分" in last_error_call.args[1]["content"]
assert "内容" in last_error_call.args[1]["content"]
@pytest.mark.asyncio
async def test_exception_does_not_silently_hang(self):
"""流式会话不允许静默挂起 — 异常后必须有 expert_result 事件。"""
agent = _make_error_stream_agent([], RuntimeError("immediate failure"))
expert = _make_mock_expert()
lead = _make_mock_expert(name="lead")
phase = _make_phase()
plan = _make_simple_plan()
orch = _make_orchestrator_for_streaming()
with pytest.raises(RuntimeError, match="immediate failure"):
await orch._run_agent_steps(expert, agent, lead, phase, plan)
# Must have at least one expert_result event (error or completed)
result_calls = [
c for c in orch._broadcast_event.call_args_list if c.args[0] == "expert_result"
]
assert len(result_calls) >= 1
# All should be error status (no completed since it never succeeded)
statuses = [c.args[1].get("status") for c in result_calls]
assert all(s == "error" for s in statuses)
# ── 重试 + 流式合约测试 ────────────────────────────────────
class TestRetryStreamContract:
"""重试 + 流式合约reset → 重试 → 仅含 attempt 2 内容。"""
@pytest.mark.asyncio
async def test_retry_broadcasts_chunk_reset_then_succeeds(self):
"""execute_stream 2 chunks 后抛异常 → retry → 广播 reset → 重试成功 → 仅含 attempt 2 内容。"""
fail_events = [
ReActEvent(event_type="token", step=0, data={"content": "attempt1_"}),
ReActEvent(event_type="token", step=0, data={"content": "partial"}),
]
success_events = [
ReActEvent(event_type="token", step=0, data={"content": "attempt2_"}),
ReActEvent(event_type="token", step=0, data={"content": "success"}),
ReActEvent(event_type="final_answer", step=0, data={"output": "attempt2_success"}),
]
agent = _make_retry_stream_agent(fail_events, success_events, RuntimeError("transient"))
expert = _make_mock_expert()
lead = _make_mock_expert(name="lead")
phase = _make_phase()
plan = _make_simple_plan()
orch = _make_orchestrator_for_streaming()
result, last_error, passed, feedback, degraded = await orch._run_agent_steps(
expert, agent, lead, phase, plan
)
# Verify expert_result_chunk_reset was broadcast before retry
reset_calls = [
c
for c in orch._broadcast_event.call_args_list
if c.args[0] == "expert_result_chunk_reset"
]
assert len(reset_calls) == 1
assert reset_calls[0].args[1]["expert_id"] == "test_expert"
# Verify expert_result(completed) contains ONLY attempt 2 content
completed_calls = [
c
for c in orch._broadcast_event.call_args_list
if c.args[0] == "expert_result" and c.args[1].get("status") == "completed"
]
assert len(completed_calls) == 1
content = completed_calls[0].args[1]["content"]
assert "attempt2_success" in content
assert "attempt1_partial" not in content
# Return value matches attempt 2 only
assert result["content"] == "attempt2_success"
@pytest.mark.asyncio
async def test_retry_exhausted_broadcasts_error(self):
"""重试耗尽后广播 expert_result(error),不静默挂起。"""
# Every attempt fails — use _make_error_stream_agent (fails on every call)
# rather than _make_retry_stream_agent (fails only on first call)
agent = _make_error_stream_agent(
events_before_error=[ReActEvent(event_type="token", step=0, data={"content": "fail"})],
error=RuntimeError("persistent failure"),
)
expert = _make_mock_expert()
lead = _make_mock_expert(name="lead")
phase = _make_phase()
plan = _make_simple_plan()
orch = _make_orchestrator_for_streaming()
with pytest.raises(RuntimeError, match="persistent failure"):
await orch._run_agent_steps(expert, agent, lead, phase, plan)
# Must have expert_result(error) as terminal event
error_calls = [
c
for c in orch._broadcast_event.call_args_list
if c.args[0] == "expert_result" and c.args[1].get("status") == "error"
]
assert len(error_calls) >= 1
# No completed event
completed_calls = [
c
for c in orch._broadcast_event.call_args_list
if c.args[0] == "expert_result" and c.args[1].get("status") == "completed"
]
assert len(completed_calls) == 0
# ── 综合阶段流式测试 ───────────────────────────────────────
class TestSynthesisStreaming:
"""TeamOrchestrator 综合阶段流式广播 team_synthesis_chunk。"""
@pytest.mark.asyncio
async def test_synthesis_streams_team_synthesis_chunk(self):
"""_synthesize_results 流式综合时调用 broadcast_callback 广播 team_synthesis_chunk。"""
from agentkit.experts.orchestrator import TeamOrchestrator
team = ExpertTeam()
team._handoff_transport = MagicMock(spec=InProcessHandoffTransport)
orch = TeamOrchestrator(team)
# Mock gateway with chat_stream yielding chunks
stream_chunks = ["综合", "结果", "完成"]
async def _mock_chat_stream(messages, model=None, **kwargs):
for text in stream_chunks:
chunk = MagicMock()
chunk.content = text
yield chunk
gateway = MagicMock()
gateway.chat_stream = _mock_chat_stream
orch._get_llm_gateway = MagicMock(return_value=gateway)
orch._get_model = MagicMock(return_value="test_model")
orch._user_context = []
# Two completed phases (needed to enter LLM synthesis path)
phase1 = _make_phase(phase_id="p1", name="Phase 1")
phase1.result = {"content": "结果1"}
phase2 = _make_phase(phase_id="p2", name="Phase 2")
phase2.result = {"content": "结果2"}
lead = _make_mock_expert(name="lead")
# Collect chunks via callback
received_chunks: list[str] = []
async def callback(data: dict[str, object]) -> None:
received_chunks.append(str(data.get("chunk", "")))
result = await orch._synthesize_results(
lead, "原始任务", [phase1, phase2], broadcast_callback=callback
)
# Verify all chunks were forwarded
assert received_chunks == ["综合", "结果", "完成"]
# Verify returned content is the full concatenation
assert result["content"] == "综合结果完成"
assert result["strategy"] == "best"
assert result["phases_completed"] == 2
@pytest.mark.asyncio
async def test_synthesis_without_callback_uses_sync_chat(self):
"""无 broadcast_callback 时_synthesize_results 回退到 gateway.chat()(向后兼容)。"""
from agentkit.experts.orchestrator import TeamOrchestrator
team = ExpertTeam()
team._handoff_transport = MagicMock(spec=InProcessHandoffTransport)
orch = TeamOrchestrator(team)
response = MagicMock()
response.content = "同步综合结果"
gateway = MagicMock()
gateway.chat = AsyncMock(return_value=response)
orch._get_llm_gateway = MagicMock(return_value=gateway)
orch._get_model = MagicMock(return_value="test_model")
orch._user_context = []
phase1 = _make_phase(phase_id="p1", name="Phase 1")
phase1.result = {"content": "结果1"}
phase2 = _make_phase(phase_id="p2", name="Phase 2")
phase2.result = {"content": "结果2"}
lead = _make_mock_expert(name="lead")
result = await orch._synthesize_results(lead, "任务", [phase1, phase2])
# Verify sync chat was used (not chat_stream)
gateway.chat.assert_called_once()
assert result["content"] == "同步综合结果"
@pytest.mark.asyncio
async def test_synthesis_stream_failure_falls_back_to_concatenation(self):
"""chat_stream 失败时回退到拼接(设计意图保留 except Exception"""
from agentkit.experts.orchestrator import TeamOrchestrator
team = ExpertTeam()
team._handoff_transport = MagicMock(spec=InProcessHandoffTransport)
orch = TeamOrchestrator(team)
async def _failing_chat_stream(messages, model=None, **kwargs):
raise RuntimeError("stream unavailable")
yield # never reached — makes this an async generator
gateway = MagicMock()
gateway.chat_stream = _failing_chat_stream
orch._get_llm_gateway = MagicMock(return_value=gateway)
orch._get_model = MagicMock(return_value="test_model")
orch._user_context = []
phase1 = _make_phase(phase_id="p1", name="Phase 1")
phase1.result = {"content": "结果1"}
phase2 = _make_phase(phase_id="p2", name="Phase 2")
phase2.result = {"content": "结果2"}
lead = _make_mock_expert(name="lead")
received: list[str] = []
async def callback(data: dict[str, object]) -> None:
received.append(str(data.get("chunk", "")))
result = await orch._synthesize_results(
lead, "任务", [phase1, phase2], broadcast_callback=callback
)
# No chunks were forwarded (stream failed immediately)
assert received == []
# Falls back to concatenation
assert "结果1" in result["content"]
assert "结果2" in result["content"]