fischer-agentkit/tests/unit/experts/test_board_orchestrator.py

346 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""BoardOrchestrator 单元测试 — 私董会讨论引擎"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from agentkit.experts.board import BoardTeam
from agentkit.experts.board_orchestrator import BoardOrchestrator
from agentkit.experts.config import ExpertConfig
from agentkit.experts.expert import Expert
from tests.unit.experts._helpers import make_chat_stream_mock
# ── 辅助函数 ──────────────────────────────────────────────
def _make_expert_config(
name: str = "test_expert",
is_lead: bool = False,
) -> ExpertConfig:
"""创建测试用 ExpertConfig"""
return ExpertConfig(
name=name,
agent_type="expert",
persona=f"测试专家 {name}",
thinking_style="analytical",
speaking_style="直接",
decision_framework="分析",
bound_skills=[],
is_lead=is_lead,
task_mode="llm_generate",
prompt={"identity": name},
avatar="T",
color="#FF0000",
)
def _make_mock_expert(
name: str = "test_expert",
is_lead: bool = False,
is_active: bool = True,
) -> MagicMock:
"""创建 mock Expert 实例"""
config = _make_expert_config(name, is_lead)
expert = MagicMock(spec=Expert)
expert.config = config
expert.is_active = is_active
expert.agent = MagicMock()
expert.agent._llm_gateway = MagicMock()
return expert
def _make_mock_gateway(response_content: str = "测试回复") -> AsyncMock:
"""创建 mock LLM gateway"""
gateway = AsyncMock()
response = MagicMock()
response.content = response_content
gateway.chat = AsyncMock(return_value=response)
# board_orchestrator._stream_expert_speech() calls gateway.chat_stream()
# (async gen). Without this, AsyncMock returns a coroutine lacking __aiter__,
# triggering a RuntimeWarning + falling back to non-streaming path.
gateway.chat_stream = make_chat_stream_mock(response_content)
return gateway
def _setup_team_with_experts(
team: BoardTeam,
experts: list[MagicMock],
moderator_name: str | None = None,
) -> None:
"""设置 BoardTeam 的内部专家字典"""
for expert in experts:
team._experts[expert.config.name] = expert
if moderator_name:
team._moderator_name = moderator_name
elif experts:
team._moderator_name = experts[0].config.name
# ── BoardOrchestrator 初始化测试 ──────────────────────────
class TestBoardOrchestratorInit:
"""BoardOrchestrator 初始化测试"""
def test_init(self):
"""初始化"""
team = BoardTeam()
orchestrator = BoardOrchestrator(team=team)
assert orchestrator._team is team
def test_stop_commands(self):
"""停止命令集合"""
assert "/stop" in BoardOrchestrator.STOP_COMMANDS
assert "停止讨论" in BoardOrchestrator.STOP_COMMANDS
assert "stop" in BoardOrchestrator.STOP_COMMANDS
assert "结束讨论" in BoardOrchestrator.STOP_COMMANDS
# ── BoardOrchestrator.execute 测试 ────────────────────────
class TestBoardOrchestratorExecute:
"""BoardOrchestrator.execute 执行流程测试"""
@pytest.mark.asyncio
async def test_execute_no_active_experts(self):
"""无活跃专家时返回失败"""
team = BoardTeam()
orchestrator = BoardOrchestrator(team=team)
result = await orchestrator.execute("测试主题")
assert result["status"] == "failed"
assert "error" in result
assert "No active expert" in result["error"]
@pytest.mark.asyncio
async def test_execute_success(self):
"""正常执行流程"""
team = BoardTeam(max_rounds=2)
moderator = _make_mock_expert("moderator", is_lead=True)
member1 = _make_mock_expert("member1")
member2 = _make_mock_expert("member2")
_setup_team_with_experts(team, [moderator, member1, member2], "moderator")
# Mock LLM gateway
gateway = _make_mock_gateway("测试发言内容")
moderator.agent._llm_gateway = gateway
member1.agent._llm_gateway = gateway
member2.agent._llm_gateway = gateway
orchestrator = BoardOrchestrator(team=team)
# Mock broadcast_event to avoid transport issues
with patch.object(orchestrator, "_broadcast_event", new_callable=AsyncMock):
result = await orchestrator.execute("测试主题")
assert result["status"] == "completed"
assert result["total_rounds"] == 2
assert "summary" in result
@pytest.mark.asyncio
async def test_execute_with_stop_command(self):
"""用户发送停止命令时终止讨论"""
team = BoardTeam(max_rounds=5)
moderator = _make_mock_expert("moderator", is_lead=True)
member1 = _make_mock_expert("member1")
_setup_team_with_experts(team, [moderator, member1], "moderator")
gateway = _make_mock_gateway("测试内容")
moderator.agent._llm_gateway = gateway
member1.agent._llm_gateway = gateway
# 添加停止命令作为用户干预
await team.add_user_intervention("/stop")
orchestrator = BoardOrchestrator(team=team)
with patch.object(orchestrator, "_broadcast_event", new_callable=AsyncMock):
result = await orchestrator.execute("测试主题")
# 应该在第1轮就停止因为 consume_user_interventions 在循环开始时检查)
assert result["status"] == "completed"
assert result["total_rounds"] <= 1
@pytest.mark.asyncio
async def test_execute_llm_unavailable_uses_fallback(self):
"""LLM 不可用时使用回退文本,仍正常完成"""
team = BoardTeam(max_rounds=2)
moderator = _make_mock_expert("moderator", is_lead=True)
_setup_team_with_experts(team, [moderator], "moderator")
# Mock gateway to raise exception for all LLM calls
gateway = AsyncMock()
gateway.chat = AsyncMock(side_effect=Exception("LLM 不可用"))
moderator.agent._llm_gateway = gateway
orchestrator = BoardOrchestrator(team=team)
with patch.object(orchestrator, "_broadcast_event", new_callable=AsyncMock):
result = await orchestrator.execute("测试主题")
# LLM 不可用时orchestrator 使用回退文本,仍正常完成
assert result["status"] == "completed"
assert result["total_rounds"] == 2
# summary 应包含回退文本
assert "summary" in result
@pytest.mark.asyncio
async def test_execute_unexpected_exception(self):
"""非 LLM 异常时进入回退处理"""
team = BoardTeam(max_rounds=2)
moderator = _make_mock_expert("moderator", is_lead=True)
_setup_team_with_experts(team, [moderator], "moderator")
# Provide a working gateway for fallback conclusion
gateway = _make_mock_gateway("回退总结")
moderator.agent._llm_gateway = gateway
orchestrator = BoardOrchestrator(team=team)
# Mock _generate_moderator_opening to raise an unexpected exception
with patch.object(
orchestrator,
"_generate_moderator_opening",
new_callable=AsyncMock,
side_effect=RuntimeError("Unexpected error"),
):
with patch.object(orchestrator, "_broadcast_event", new_callable=AsyncMock):
result = await orchestrator.execute("测试主题")
# 非预期异常应返回 failed 状态
assert result["status"] == "failed"
assert "error" in result
# ── BoardOrchestrator._has_stop_command 测试 ──────────────
class TestBoardOrchestratorStopCommand:
"""BoardOrchestrator 停止命令检测测试"""
def test_has_stop_command_true(self):
"""检测到停止命令"""
team = BoardTeam()
orchestrator = BoardOrchestrator(team=team)
assert orchestrator._has_stop_command(["/stop"]) is True
assert orchestrator._has_stop_command(["停止讨论"]) is True
assert orchestrator._has_stop_command(["some text", "stop"]) is True
assert orchestrator._has_stop_command(["结束讨论"]) is True
def test_has_stop_command_false(self):
"""无停止命令"""
team = BoardTeam()
orchestrator = BoardOrchestrator(team=team)
assert orchestrator._has_stop_command([]) is False
assert orchestrator._has_stop_command(["继续讨论"]) is False
assert orchestrator._has_stop_command(["请多说一些"]) is False
def test_has_stop_command_case_insensitive(self):
"""停止命令大小写不敏感"""
team = BoardTeam()
orchestrator = BoardOrchestrator(team=team)
assert orchestrator._has_stop_command(["STOP"]) is True
assert orchestrator._has_stop_command(["Stop"]) is True
# ── BoardOrchestrator._get_llm_gateway 测试 ───────────────
class TestBoardOrchestratorGetGateway:
"""BoardOrchestrator._get_llm_gateway 测试"""
def test_get_gateway_from_expert(self):
"""从指定专家获取 gateway"""
team = BoardTeam()
orchestrator = BoardOrchestrator(team=team)
expert = _make_mock_expert("test")
gateway = orchestrator._get_llm_gateway(expert)
assert gateway is not None
def test_get_gateway_fallback_to_active_experts(self):
"""从其他活跃专家回退获取 gateway"""
team = BoardTeam()
moderator = _make_mock_expert("moderator", is_lead=True)
# moderator 没有 gateway
moderator.agent._llm_gateway = None
member = _make_mock_expert("member")
_setup_team_with_experts(team, [moderator, member], "moderator")
orchestrator = BoardOrchestrator(team=team)
# 应该从 member 回退获取
gateway = orchestrator._get_llm_gateway(moderator)
assert gateway is not None
def test_get_gateway_none_when_no_gateway(self):
"""无可用 gateway 时返回 None"""
team = BoardTeam()
moderator = _make_mock_expert("moderator", is_lead=True)
moderator.agent._llm_gateway = None
_setup_team_with_experts(team, [moderator], "moderator")
orchestrator = BoardOrchestrator(team=team)
gateway = orchestrator._get_llm_gateway(moderator)
assert gateway is None
# ── BoardOrchestrator._broadcast_event 测试 ───────────────
class TestBoardOrchestratorBroadcast:
"""BoardOrchestrator._broadcast_event 测试"""
@pytest.mark.asyncio
async def test_broadcast_event_with_transport(self):
"""有 transport 时广播事件"""
team = BoardTeam()
orchestrator = BoardOrchestrator(team=team)
# Mock transport
team._handoff_transport = AsyncMock()
team._handoff_transport.send = AsyncMock()
await orchestrator._broadcast_event("board_started", {"topic": "测试"})
team._handoff_transport.send.assert_called_once()
call_args = team._handoff_transport.send.call_args
assert call_args[0][0] == team.team_channel
assert call_args[0][1]["type"] == "board_started"
assert call_args[0][1]["topic"] == "测试"
@pytest.mark.asyncio
async def test_broadcast_event_no_transport(self):
"""无 transport 时不报错"""
team = BoardTeam()
team._handoff_transport = None
orchestrator = BoardOrchestrator(team=team)
# 不应抛出异常
await orchestrator._broadcast_event("board_started", {"topic": "测试"})
@pytest.mark.asyncio
async def test_broadcast_event_transport_error(self):
"""transport 错误时不传播异常"""
team = BoardTeam()
orchestrator = BoardOrchestrator(team=team)
team._handoff_transport = AsyncMock()
team._handoff_transport.send = AsyncMock(side_effect=Exception("Transport error"))
# 不应抛出异常
await orchestrator._broadcast_event("board_started", {"topic": "测试"})