fischer-agentkit/tests/unit/experts/test_team_orchestrator.py

641 lines
23 KiB
Python

"""TeamOrchestrator 单元测试 (hub-and-spoke 模式)"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from agentkit.core.handoff_transport import InProcessHandoffTransport
from agentkit.core.protocol import TaskResult, TaskStatus
from agentkit.experts.config import ExpertConfig
from agentkit.experts.expert import Expert
from agentkit.experts.orchestrator import TeamOrchestrator
from agentkit.experts.plan import PlanStatus, SubTask, SubTaskStatus
from agentkit.experts.team import ExpertTeam, TeamStatus
# ── 辅助函数 ──────────────────────────────────────────────
def _make_expert_config(
name: str = "test_expert",
is_lead: bool = False,
) -> ExpertConfig:
"""创建测试用 ExpertConfig"""
return ExpertConfig(
name=name,
agent_type="expert",
persona="测试专家",
thinking_style="逻辑推理",
bound_skills=["skill_a"],
is_lead=is_lead,
task_mode="llm_generate",
prompt={"identity": "测试"},
)
def _make_mock_expert(
name: str = "test_expert",
is_lead: bool = False,
is_active: bool = True,
) -> MagicMock:
"""创建 mock Expert"""
config = _make_expert_config(name=name, is_lead=is_lead)
expert = MagicMock(spec=Expert)
expert.config = config
expert.is_active = is_active
expert.team_id = None
expert.get_capabilities_summary.return_value = {
"name": name,
"persona": config.persona,
"thinking_style": config.thinking_style,
"bound_skills": config.bound_skills,
"is_lead": is_lead,
}
# Mock agent.execute() to return a successful TaskResult
mock_agent = MagicMock()
mock_agent.execute = AsyncMock(return_value=TaskResult(
task_id="test",
agent_name=name,
status=TaskStatus.COMPLETED.value,
output_data={"content": f"Result from {name}"},
error_message=None,
started_at=None,
completed_at=None,
))
# No LLM gateway by default (tests single-subtask path)
mock_agent._llm_gateway = None
expert.agent = mock_agent
return expert
def _make_team_with_experts(
expert_names: list[str] | None = None,
lead_name: str = "lead",
) -> ExpertTeam:
"""创建包含 mock experts 的 ExpertTeam"""
team = ExpertTeam()
transport = AsyncMock(spec=InProcessHandoffTransport)
team._handoff_transport = transport
if expert_names is None:
expert_names = [lead_name, "member1", "member2"]
for name in expert_names:
is_lead = name == lead_name
expert = _make_mock_expert(name=name, is_lead=is_lead)
team._experts[name] = expert
if is_lead:
team._lead_expert_name = name
return team
def _make_mock_llm_gateway(subtask_descriptions: list[str] | None = None) -> MagicMock:
"""创建 mock LLM gateway.
If subtask_descriptions is provided, the gateway returns a JSON array
of subtasks for decomposition. Otherwise returns a simple response.
"""
gateway = AsyncMock()
if subtask_descriptions:
import json
subtasks_json = json.dumps([
{"description": desc, "assigned_expert": "member1"}
for desc in subtask_descriptions
])
response = MagicMock()
response.content = subtasks_json
gateway.chat = AsyncMock(return_value=response)
else:
response = MagicMock()
response.content = "Synthesized result"
gateway.chat = AsyncMock(return_value=response)
return gateway
# ── Hub-and-spoke 执行测试 ────────────────────────────────
class TestHubAndSpokeExecution:
"""Hub-and-spoke 模式执行测试"""
@pytest.mark.asyncio
async def test_execute_single_subtask_completes(self):
"""无 LLM 时,任务作为单个子任务执行完成"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
result = await orchestrator.execute("测试任务")
assert result["status"] == "completed"
assert "result" in result
assert "subtask_results" in result
assert "plan" in result
assert team.status == TeamStatus.COMPLETED
@pytest.mark.asyncio
async def test_execute_sets_team_status(self):
"""执行时设置 team 状态为 EXECUTING → SYNTHESIZING → COMPLETED"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
await orchestrator.execute("测试任务")
assert team.status == TeamStatus.COMPLETED
@pytest.mark.asyncio
async def test_execute_emits_team_formed_event(self):
"""执行时广播 team_formed 事件"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
await orchestrator.execute("测试任务")
calls = team._handoff_transport.send.call_args_list
event_types = [c[0][1]["type"] for c in calls]
assert "team_formed" in event_types
@pytest.mark.asyncio
async def test_execute_emits_expert_step_and_result_events(self):
"""执行时广播 expert_step 和 expert_result 事件"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
await orchestrator.execute("测试任务")
calls = team._handoff_transport.send.call_args_list
event_types = [c[0][1]["type"] for c in calls]
assert "expert_step" in event_types
assert "expert_result" in event_types
@pytest.mark.asyncio
async def test_execute_emits_team_synthesis_event(self):
"""执行完成时广播 team_synthesis 事件"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
await orchestrator.execute("测试任务")
calls = team._handoff_transport.send.call_args_list
event_types = [c[0][1]["type"] for c in calls]
assert "team_synthesis" in event_types
@pytest.mark.asyncio
async def test_execute_emits_plan_update_event(self):
"""执行时广播 plan_update 事件(包含子任务列表)"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
await orchestrator.execute("测试任务")
calls = team._handoff_transport.send.call_args_list
plan_updates = [c for c in calls if c[0][1].get("type") == "plan_update"]
assert len(plan_updates) >= 1
assert "subtasks" in plan_updates[0][0][1]
# ── LLM 任务分解测试 ──────────────────────────────────────
class TestTaskDecomposition:
"""LLM 任务分解测试"""
@pytest.mark.asyncio
async def test_llm_decomposes_task_into_subtasks(self):
"""LLM 将任务分解为多个子任务"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
# Set up LLM gateway on lead expert for decomposition
gateway = _make_mock_llm_gateway(
subtask_descriptions=["分析数据", "生成报告", "审核结果"]
)
team._experts["lead"].agent._llm_gateway = gateway
result = await orchestrator.execute("分析并报告数据")
assert result["status"] == "completed"
plan = result["plan"]
assert len(plan.subtasks) == 3
# Each subtask should have been executed
assert len(result["subtask_results"]) == 3
@pytest.mark.asyncio
async def test_decomposition_fallback_to_single_subtask(self):
"""LLM 不可用时回退到单个子任务"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
# No LLM gateway — should fall back to single subtask
result = await orchestrator.execute("测试任务")
assert result["status"] == "completed"
plan = result["plan"]
assert len(plan.subtasks) == 1
@pytest.mark.asyncio
async def test_parse_subtasks_valid_json(self):
"""_parse_subtasks 正确解析 JSON 数组"""
import json
content = json.dumps([
{"description": "任务1", "assigned_expert": "member1"},
{"description": "任务2", "assigned_expert": "member2"},
])
subtasks = TeamOrchestrator._parse_subtasks(
content, ["member1", "member2"], "lead"
)
assert len(subtasks) == 2
assert subtasks[0].description == "任务1"
assert subtasks[0].assigned_expert == "member1"
assert subtasks[1].description == "任务2"
assert subtasks[1].assigned_expert == "member2"
@pytest.mark.asyncio
async def test_parse_subtasks_invalid_expert_falls_back_to_lead(self):
"""_parse_subtasks 对无效专家名回退到 lead"""
import json
content = json.dumps([
{"description": "任务1", "assigned_expert": "nonexistent"},
])
subtasks = TeamOrchestrator._parse_subtasks(
content, ["member1"], "lead"
)
assert len(subtasks) == 1
assert subtasks[0].assigned_expert == "lead"
@pytest.mark.asyncio
async def test_parse_subtasks_invalid_json_returns_empty(self):
"""_parse_subtasks 对无效 JSON 返回空列表"""
subtasks = TeamOrchestrator._parse_subtasks(
"not json at all", ["member1"], "lead"
)
assert subtasks == []
@pytest.mark.asyncio
async def test_parse_subtasks_empty_description_skipped(self):
"""_parse_subtasks 跳过空描述的子任务"""
import json
content = json.dumps([
{"description": "", "assigned_expert": "member1"},
{"description": "有效任务", "assigned_expert": "member1"},
])
subtasks = TeamOrchestrator._parse_subtasks(
content, ["member1"], "lead"
)
assert len(subtasks) == 1
assert subtasks[0].description == "有效任务"
# ── 子任务执行测试 ────────────────────────────────────────
class TestSubtaskExecution:
"""子任务执行测试"""
@pytest.mark.asyncio
async def test_subtask_execution_calls_agent_execute(self):
"""子任务执行调用 agent.execute()"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
await orchestrator.execute("测试任务")
# Lead expert's agent should have been called
team._experts["lead"].agent.execute.assert_awaited()
@pytest.mark.asyncio
async def test_subtask_marks_completed(self):
"""子任务执行后状态标记为 COMPLETED"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
result = await orchestrator.execute("测试任务")
plan = result["plan"]
for st in plan.subtasks:
assert st.status == SubTaskStatus.COMPLETED
assert st.result is not None
@pytest.mark.asyncio
async def test_subtask_with_invalid_expert_falls_back_to_lead(self):
"""子任务分配的专家不可用时回退到 lead"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
# Set up LLM to assign to a nonexistent expert
import json
gateway = _make_mock_llm_gateway(subtask_descriptions=["任务1"])
gateway.chat = AsyncMock(return_value=MagicMock(
content=json.dumps([
{"description": "任务1", "assigned_expert": "nonexistent"}
])
))
team._experts["lead"].agent._llm_gateway = gateway
result = await orchestrator.execute("测试任务")
assert result["status"] == "completed"
# The subtask should have been reassigned to lead
plan = result["plan"]
assert plan.subtasks[0].assigned_expert == "lead"
# ── 结果综合测试 ──────────────────────────────────────────
class TestResultSynthesis:
"""结果综合测试"""
@pytest.mark.asyncio
async def test_synthesize_single_result(self):
"""单个子任务结果直接返回"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
result = await orchestrator.execute("测试任务")
assert result["status"] == "completed"
final = result["result"]
assert "content" in final
assert final["strategy"] == "best"
assert final["subtasks_completed"] == 1
@pytest.mark.asyncio
async def test_synthesize_multiple_results_with_llm(self):
"""多个子任务结果通过 LLM 综合"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
# Set up LLM for both decomposition and synthesis
import json
gateway = AsyncMock()
# First call: decomposition
decomp_response = MagicMock()
decomp_response.content = json.dumps([
{"description": "子任务1", "assigned_expert": "member1"},
{"description": "子任务2", "assigned_expert": "member2"},
])
# Second call: synthesis
synth_response = MagicMock()
synth_response.content = "综合结果"
gateway.chat = AsyncMock(side_effect=[decomp_response, synth_response])
team._experts["lead"].agent._llm_gateway = gateway
result = await orchestrator.execute("复杂任务")
assert result["status"] == "completed"
final = result["result"]
assert final["content"] == "综合结果"
assert final["strategy"] == "best"
assert final["subtasks_completed"] == 2
@pytest.mark.asyncio
async def test_synthesize_without_llm_concatenates(self):
"""无 LLM 时拼接所有结果"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
# Set up LLM for decomposition only (no synthesis LLM)
import json
gateway = AsyncMock()
decomp_response = MagicMock()
decomp_response.content = json.dumps([
{"description": "子任务1", "assigned_expert": "member1"},
{"description": "子任务2", "assigned_expert": "member2"},
])
# Synthesis call raises to force concatenation fallback
gateway.chat = AsyncMock(
side_effect=[decomp_response, RuntimeError("LLM unavailable")]
)
team._experts["lead"].agent._llm_gateway = gateway
result = await orchestrator.execute("复杂任务")
assert result["status"] == "completed"
final = result["result"]
assert "content" in final
# Should contain both results concatenated
assert "Result from member1" in final["content"]
assert "Result from member2" in final["content"]
# ── 回退测试 ──────────────────────────────────────────────
class TestFallback:
"""回退到单 Agent 模式测试"""
@pytest.mark.asyncio
async def test_all_subtasks_fail_triggers_fallback(self):
"""所有子任务失败时触发回退"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
# Make agent.execute raise for all subtasks
for expert in team._experts.values():
expert.agent.execute = AsyncMock(side_effect=RuntimeError("Execution failed"))
result = await orchestrator.execute("测试任务")
assert result["status"] == "fallback"
assert result["plan"].status == PlanStatus.FALLBACK
@pytest.mark.asyncio
async def test_fallback_uses_lead_expert(self):
"""回退使用 lead expert 执行原始任务"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
# Make agent.execute raise for subtasks but succeed for fallback
call_count = 0
async def mock_execute(task_msg):
nonlocal call_count
call_count += 1
if task_msg.task_type == "team_subtask":
raise RuntimeError("Subtask failed")
# Fallback succeeds
return TaskResult(
task_id=task_msg.task_id,
agent_name="lead",
status=TaskStatus.COMPLETED.value,
output_data={"content": "Fallback result"},
error_message=None,
started_at=None,
completed_at=None,
)
team._experts["lead"].agent.execute = AsyncMock(side_effect=mock_execute)
result = await orchestrator.execute("测试任务")
assert result["status"] == "fallback"
assert result["result"]["content"] == "Fallback result"
@pytest.mark.asyncio
async def test_no_active_experts_returns_failed(self):
"""没有活跃专家时返回 failed"""
team = _make_team_with_experts()
# Mark all experts as inactive
for expert in team._experts.values():
expert.is_active = False
orchestrator = TeamOrchestrator(team)
result = await orchestrator.execute("测试任务")
assert result["status"] == "failed"
assert "error" in result
# ── 事件广播测试 ──────────────────────────────────────────
class TestBroadcastEvent:
"""事件广播测试"""
@pytest.mark.asyncio
async def test_broadcast_event_sends_to_transport(self):
"""广播事件通过 handoff_transport 发送"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
await orchestrator._broadcast_event("test_event", {"key": "value"})
team._handoff_transport.send.assert_awaited()
call_args = team._handoff_transport.send.call_args
assert call_args[0][0] == team._team_channel
message = call_args[0][1]
assert message["type"] == "test_event"
assert message["key"] == "value"
@pytest.mark.asyncio
async def test_broadcast_event_no_transport(self):
"""没有 handoff_transport 时不报错"""
team = _make_team_with_experts()
team._handoff_transport = None
orchestrator = TeamOrchestrator(team)
# Should not raise
await orchestrator._broadcast_event("test_event", {"key": "value"})
@pytest.mark.asyncio
async def test_broadcast_event_handles_transport_error(self):
"""handoff_transport 发送失败时不影响执行"""
team = _make_team_with_experts()
team._handoff_transport.send = AsyncMock(side_effect=RuntimeError("Transport error"))
orchestrator = TeamOrchestrator(team)
# Should not raise
await orchestrator._broadcast_event("test_event", {"key": "value"})
# ── LLM Gateway 测试 ──────────────────────────────────────
class TestLLMGateway:
"""LLM Gateway 获取测试"""
def test_get_llm_gateway_from_lead(self):
"""从 lead expert 获取 LLM gateway"""
team = _make_team_with_experts()
gateway = MagicMock()
team._experts["lead"].agent._llm_gateway = gateway
orchestrator = TeamOrchestrator(team)
result = orchestrator._get_llm_gateway()
assert result is gateway
def test_get_llm_gateway_no_gateway(self):
"""没有 LLM gateway 时返回 None"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
result = orchestrator._get_llm_gateway()
assert result is None
def test_get_llm_gateway_fallback_to_active_expert(self):
"""lead 没有 gateway 时从其他活跃专家获取"""
team = _make_team_with_experts()
gateway = MagicMock()
# Lead has no gateway, but member1 does
team._experts["member1"].agent._llm_gateway = gateway
orchestrator = TeamOrchestrator(team)
result = orchestrator._get_llm_gateway()
assert result is gateway
# ── 并行执行测试 ──────────────────────────────────────────
class TestParallelExecution:
"""并行子任务执行测试"""
@pytest.mark.asyncio
async def test_multiple_subtasks_execute_in_parallel(self):
"""多个子任务并行执行"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
# Set up LLM for decomposition
import json
gateway = AsyncMock()
decomp_response = MagicMock()
decomp_response.content = json.dumps([
{"description": "子任务1", "assigned_expert": "member1"},
{"description": "子任务2", "assigned_expert": "member2"},
{"description": "子任务3", "assigned_expert": "lead"},
])
synth_response = MagicMock()
synth_response.content = "综合结果"
gateway.chat = AsyncMock(side_effect=[decomp_response, synth_response])
team._experts["lead"].agent._llm_gateway = gateway
result = await orchestrator.execute("并行任务")
assert result["status"] == "completed"
assert len(result["subtask_results"]) == 3
# All subtasks should be completed
plan = result["plan"]
for st in plan.subtasks:
assert st.status == SubTaskStatus.COMPLETED
@pytest.mark.asyncio
async def test_partial_failure_still_completes(self):
"""部分子任务失败时仍能完成(只要有成功的)"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
# Set up LLM for decomposition
import json
gateway = AsyncMock()
decomp_response = MagicMock()
decomp_response.content = json.dumps([
{"description": "子任务1", "assigned_expert": "member1"},
{"description": "子任务2", "assigned_expert": "member2"},
])
synth_response = MagicMock()
synth_response.content = "综合结果"
gateway.chat = AsyncMock(side_effect=[decomp_response, synth_response])
team._experts["lead"].agent._llm_gateway = gateway
# Make member1's agent fail
team._experts["member1"].agent.execute = AsyncMock(
side_effect=RuntimeError("member1 failed")
)
result = await orchestrator.execute("部分失败任务")
assert result["status"] == "completed"
# member2's subtask should have succeeded
plan = result["plan"]
completed = plan.completed_subtasks
failed = plan.failed_subtasks
assert len(completed) >= 1
assert len(failed) >= 1