1129 lines
38 KiB
Python
1129 lines
38 KiB
Python
"""Integration tests for Expert Team Pipeline Mode.
|
||
|
||
Covers the pipeline orchestration flow end-to-end with mocked LLM and AgentPool:
|
||
- F1: Manual team formation via @team:expert1,expert2
|
||
- F2: Default team template (@team:dev_team)
|
||
- F3: Pipeline execution (sequential phases with dependencies)
|
||
- F4: Parallel phases (no dependencies)
|
||
- F5: Phase failure and dependency failure propagation
|
||
- F6: SharedWorkspace data passing between phases
|
||
- F7: Context isolation (independent ConfigDrivenAgent per phase)
|
||
- F8: Fallback to single agent when all phases fail
|
||
- F9: Event sequence (team_formed → plan_update → phase_started → ... → team_synthesis)
|
||
- F10: TeamStatus.PLANNING state transition
|
||
- F11: Circular dependency detection
|
||
- F12: Invalid expert reference fallback
|
||
- F13: LLM decomposition failure fallback
|
||
|
||
Replaces the legacy CollaborationPlan-based tests (U9).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
from unittest.mock import AsyncMock, MagicMock
|
||
|
||
import pytest
|
||
|
||
from agentkit.core.handoff_transport import InProcessHandoffTransport
|
||
from agentkit.core.protocol import TaskResult, TaskStatus
|
||
from agentkit.core.shared_workspace import SharedWorkspace
|
||
from agentkit.experts.config import ExpertConfig, ExpertTemplate
|
||
from agentkit.experts.orchestrator import TeamOrchestrator
|
||
from agentkit.experts.plan import PhaseStatus, PlanPhase, TeamPlan
|
||
from agentkit.experts.registry import ExpertTemplateRegistry
|
||
from agentkit.experts.router import ExpertTeamRouter
|
||
from agentkit.experts.team import ExpertTeam, TeamStatus
|
||
|
||
|
||
# ── 辅助函数 ──────────────────────────────────────────────
|
||
|
||
|
||
def _make_expert_config(
|
||
name: str,
|
||
persona: str = "",
|
||
is_lead: bool = False,
|
||
bound_skills: list[str] | None = None,
|
||
llm: dict | None = None,
|
||
) -> ExpertConfig:
|
||
"""创建测试用 ExpertConfig"""
|
||
return ExpertConfig(
|
||
name=name,
|
||
persona=persona or f"{name} expert",
|
||
is_lead=is_lead,
|
||
bound_skills=bound_skills or [],
|
||
agent_type="expert",
|
||
task_mode="llm_generate",
|
||
prompt={"identity": f"You are {name}"},
|
||
llm=llm,
|
||
)
|
||
|
||
|
||
def _make_mock_agent(
|
||
name: str,
|
||
content: str | None = None,
|
||
fail: bool = False,
|
||
llm_gateway: MagicMock | None = None,
|
||
) -> MagicMock:
|
||
"""创建 mock ConfigDrivenAgent"""
|
||
agent = MagicMock()
|
||
if fail:
|
||
agent.execute = AsyncMock(side_effect=RuntimeError(f"{name} execution failed"))
|
||
else:
|
||
agent.execute = AsyncMock(
|
||
return_value=TaskResult(
|
||
task_id="test",
|
||
agent_name=name,
|
||
status=TaskStatus.COMPLETED.value,
|
||
output_data={"content": content or f"Result from {name}"},
|
||
error_message=None,
|
||
started_at=None,
|
||
completed_at=None,
|
||
)
|
||
)
|
||
agent._llm_gateway = llm_gateway
|
||
return agent
|
||
|
||
|
||
def _make_mock_expert(
|
||
name: str,
|
||
is_lead: bool = False,
|
||
is_active: bool = True,
|
||
content: str | None = None,
|
||
fail: bool = False,
|
||
llm_gateway: MagicMock | None = None,
|
||
llm: dict | None = None,
|
||
) -> MagicMock:
|
||
"""创建 mock Expert (spec=Expert)"""
|
||
from agentkit.experts.expert import Expert
|
||
|
||
config = _make_expert_config(name=name, is_lead=is_lead, llm=llm)
|
||
expert = MagicMock(spec=Expert)
|
||
expert.config = config
|
||
expert.is_active = is_active
|
||
expert.team_id = None
|
||
expert.get_capabilities_summary.return_value = {
|
||
"name": name,
|
||
"persona": config.persona,
|
||
"bound_skills": config.bound_skills,
|
||
"is_lead": is_lead,
|
||
}
|
||
expert.agent = _make_mock_agent(name, content=content, fail=fail, llm_gateway=llm_gateway)
|
||
return expert
|
||
|
||
|
||
def _make_mock_llm_gateway(
|
||
phases: list[dict] | None = None,
|
||
synthesis_content: str = "综合结果",
|
||
decomp_fail: bool = False,
|
||
) -> MagicMock:
|
||
"""创建 mock LLM gateway.
|
||
|
||
- phases: 提供 LLM 分解返回的阶段列表
|
||
- decomp_fail: 分解调用抛出异常
|
||
"""
|
||
gateway = AsyncMock()
|
||
if decomp_fail:
|
||
gateway.chat = AsyncMock(side_effect=RuntimeError("LLM unavailable"))
|
||
return gateway
|
||
|
||
if phases:
|
||
phases_json = json.dumps(phases)
|
||
decomp_response = MagicMock()
|
||
decomp_response.content = phases_json
|
||
synth_response = MagicMock()
|
||
synth_response.content = synthesis_content
|
||
gateway.chat = AsyncMock(side_effect=[decomp_response, synth_response])
|
||
else:
|
||
response = MagicMock()
|
||
response.content = synthesis_content
|
||
gateway.chat = AsyncMock(return_value=response)
|
||
return gateway
|
||
|
||
|
||
def _make_team_with_experts(
|
||
expert_names: list[str] | None = None,
|
||
lead_name: str = "lead",
|
||
pool: MagicMock | None = None,
|
||
workspace: SharedWorkspace | None = None,
|
||
transport: InProcessHandoffTransport | None = None,
|
||
) -> ExpertTeam:
|
||
"""创建包含 mock experts 的 ExpertTeam"""
|
||
team = ExpertTeam(workspace=workspace or SharedWorkspace())
|
||
# Use a real InProcessHandoffTransport so events can be captured
|
||
team._handoff_transport = transport or InProcessHandoffTransport()
|
||
if pool is not None:
|
||
team._pool = pool
|
||
|
||
if expert_names is None:
|
||
expert_names = [lead_name, "member1", "member2"]
|
||
|
||
for name in expert_names:
|
||
is_lead = name == lead_name
|
||
expert = _make_mock_expert(name=name, is_lead=is_lead)
|
||
team._experts[name] = expert
|
||
if is_lead:
|
||
team._lead_expert_name = name
|
||
|
||
return team
|
||
|
||
|
||
def _make_mock_pool(content_prefix: str = "Isolated") -> MagicMock:
|
||
"""创建 mock AgentPool,模拟上下文隔离的 agent 创建"""
|
||
pool = MagicMock()
|
||
pool.create_agent = AsyncMock(
|
||
side_effect=lambda config: _make_mock_agent(
|
||
config.name, content=f"{content_prefix} result from {config.name}"
|
||
)
|
||
)
|
||
pool.remove_agent = AsyncMock()
|
||
return pool
|
||
|
||
|
||
def _make_registry_with_dev_team() -> ExpertTemplateRegistry:
|
||
"""创建包含 dev_team 模板和 5 个编程专家的注册表"""
|
||
reg = ExpertTemplateRegistry()
|
||
for name in [
|
||
"tech_lead",
|
||
"frontend_engineer",
|
||
"backend_engineer",
|
||
"qa_engineer",
|
||
"code_reviewer",
|
||
]:
|
||
reg.register(
|
||
ExpertTemplate(
|
||
name=name,
|
||
description=f"{name} expert",
|
||
config=_make_expert_config(
|
||
name,
|
||
persona=f"{name} persona",
|
||
# Individual experts have empty bound_skills;
|
||
# only team templates use bound_skills for member list
|
||
bound_skills=[],
|
||
),
|
||
)
|
||
)
|
||
# dev_team template stores members in bound_skills
|
||
reg.register(
|
||
ExpertTemplate(
|
||
name="dev_team",
|
||
description="Development team template",
|
||
config=ExpertConfig(
|
||
name="dev_team",
|
||
persona="dev team",
|
||
agent_type="expert",
|
||
task_mode="llm_generate",
|
||
prompt={"identity": "dev team"},
|
||
bound_skills=[
|
||
"tech_lead",
|
||
"frontend_engineer",
|
||
"backend_engineer",
|
||
"qa_engineer",
|
||
"code_reviewer",
|
||
],
|
||
),
|
||
)
|
||
)
|
||
return reg
|
||
|
||
|
||
async def _capture_events(transport: InProcessHandoffTransport, channel: str) -> list[dict]:
|
||
"""Capture all events sent to a channel."""
|
||
events: list[dict] = []
|
||
|
||
async def listener():
|
||
async for msg in transport.listen(channel):
|
||
events.append(msg)
|
||
|
||
task = asyncio.create_task(listener())
|
||
await asyncio.sleep(0.05) # Let listener start
|
||
return events, task
|
||
|
||
|
||
# ── Fixtures ─────────────────────────────────────────────
|
||
|
||
|
||
@pytest.fixture
|
||
def workspace():
|
||
return SharedWorkspace()
|
||
|
||
|
||
@pytest.fixture
|
||
def registry():
|
||
return _make_registry_with_dev_team()
|
||
|
||
|
||
# ── F1: Manual Team Formation ────────────────────────────
|
||
|
||
|
||
class TestManualTeamFormation:
|
||
"""F1: 用户通过 @team:expert1,expert2 指定专家团队"""
|
||
|
||
def test_manual_team_with_templates(self, registry):
|
||
"""AE1: 用户通过模板名指定专家团队"""
|
||
router = ExpertTeamRouter(registry)
|
||
result = router.resolve("@team:tech_lead,frontend_engineer 开发登录功能")
|
||
|
||
assert result.team_mode is True
|
||
assert result.specified_experts == ["tech_lead", "frontend_engineer"]
|
||
assert result.auto_compose is False
|
||
assert result.task_content == "开发登录功能"
|
||
|
||
def test_explicit_experts_resolved_to_configs(self, registry):
|
||
"""指定专家名解析为 ExpertConfig"""
|
||
router = ExpertTeamRouter(registry)
|
||
result = router.resolve("@team:frontend_engineer,backend_engineer 任务")
|
||
|
||
configs = router.resolve_expert_configs(result.specified_experts)
|
||
assert len(configs) == 2
|
||
assert configs[0].name == "frontend_engineer"
|
||
assert configs[0].is_lead is True
|
||
assert configs[1].name == "backend_engineer"
|
||
assert configs[1].is_lead is False
|
||
|
||
def test_dynamic_expert_creation_for_unknown_name(self, registry):
|
||
"""未知专家名动态创建 ExpertConfig"""
|
||
router = ExpertTeamRouter(registry)
|
||
configs = router.resolve_expert_configs(["legal_advisor"])
|
||
|
||
assert len(configs) == 1
|
||
assert configs[0].name == "legal_advisor"
|
||
assert configs[0].is_lead is True
|
||
|
||
|
||
# ── F2: Default Team Template ────────────────────────────
|
||
|
||
|
||
class TestDefaultTeamTemplate:
|
||
"""F2: @team:dev_team 调用 5 个编程专家"""
|
||
|
||
def test_dev_team_template_expands_to_members(self, registry):
|
||
"""@team:dev_team 展开为 dev_team 模板的 bound_skills 成员列表"""
|
||
router = ExpertTeamRouter(registry)
|
||
result = router.resolve("@team:dev_team 开发用户系统")
|
||
|
||
assert result.team_mode is True
|
||
assert result.specified_experts == [
|
||
"tech_lead",
|
||
"frontend_engineer",
|
||
"backend_engineer",
|
||
"qa_engineer",
|
||
"code_reviewer",
|
||
]
|
||
assert result.auto_compose is False
|
||
assert result.task_content == "开发用户系统"
|
||
|
||
def test_no_experts_uses_default_template(self, registry):
|
||
"""@team 无指定专家时使用默认 dev_team 模板"""
|
||
router = ExpertTeamRouter(registry)
|
||
result = router.resolve("@team 开发功能")
|
||
|
||
assert result.team_mode is True
|
||
assert len(result.specified_experts) == 5
|
||
assert "tech_lead" in result.specified_experts
|
||
assert result.auto_compose is False
|
||
|
||
|
||
# ── F3: Pipeline Sequential Execution ────────────────────
|
||
|
||
|
||
class TestPipelineExecution:
|
||
"""F3: 流水线模式执行测试"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_pipeline_execution_three_sequential_phases(self):
|
||
"""3 阶段(A→B→C)按序执行"""
|
||
team = _make_team_with_experts()
|
||
orchestrator = TeamOrchestrator(team)
|
||
|
||
gateway = _make_mock_llm_gateway(
|
||
phases=[
|
||
{
|
||
"name": "A",
|
||
"assigned_expert": "lead",
|
||
"task_description": "阶段A",
|
||
"depends_on": [],
|
||
},
|
||
{
|
||
"name": "B",
|
||
"assigned_expert": "member1",
|
||
"task_description": "阶段B",
|
||
"depends_on": ["A"],
|
||
},
|
||
{
|
||
"name": "C",
|
||
"assigned_expert": "member2",
|
||
"task_description": "阶段C",
|
||
"depends_on": ["B"],
|
||
},
|
||
]
|
||
)
|
||
team._experts["lead"].agent._llm_gateway = gateway
|
||
|
||
result = await orchestrator.execute("串行任务")
|
||
|
||
assert result["status"] == "completed"
|
||
plan = result["plan"]
|
||
assert len(plan.phases) == 3
|
||
for ph in plan.phases:
|
||
assert ph.status == PhaseStatus.COMPLETED
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_pipeline_single_phase_no_llm(self):
|
||
"""无 LLM 时,任务作为单个阶段执行完成"""
|
||
team = _make_team_with_experts()
|
||
orchestrator = TeamOrchestrator(team)
|
||
|
||
result = await orchestrator.execute("简单任务")
|
||
|
||
assert result["status"] == "completed"
|
||
assert "result" in result
|
||
assert "phase_results" in result
|
||
assert team.status == TeamStatus.COMPLETED
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_pipeline_status_transitions(self):
|
||
"""执行过程中状态从 PLANNING → EXECUTING → SYNTHESIZING → COMPLETED"""
|
||
team = _make_team_with_experts()
|
||
orchestrator = TeamOrchestrator(team)
|
||
|
||
statuses_seen: list[str] = []
|
||
|
||
original_set_status = team.set_status
|
||
|
||
def tracking_set_status(status: TeamStatus) -> None:
|
||
statuses_seen.append(status.value)
|
||
original_set_status(status)
|
||
|
||
team.set_status = tracking_set_status # type: ignore[assignment]
|
||
|
||
await orchestrator.execute("任务")
|
||
|
||
# PLANNING comes before EXECUTING, SYNTHESIZING, COMPLETED
|
||
assert "planning" in statuses_seen
|
||
assert "executing" in statuses_seen
|
||
assert "synthesizing" in statuses_seen
|
||
assert "completed" in statuses_seen
|
||
assert statuses_seen.index("planning") < statuses_seen.index("executing")
|
||
assert statuses_seen.index("executing") < statuses_seen.index("synthesizing")
|
||
assert statuses_seen.index("synthesizing") < statuses_seen.index("completed")
|
||
|
||
|
||
# ── F4: Parallel Phases ──────────────────────────────────
|
||
|
||
|
||
class TestParallelPhases:
|
||
"""F4: 无依赖阶段并行执行"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_parallel_phases_no_dependencies(self):
|
||
"""2 个无依赖阶段并行执行"""
|
||
team = _make_team_with_experts()
|
||
orchestrator = TeamOrchestrator(team)
|
||
|
||
gateway = _make_mock_llm_gateway(
|
||
phases=[
|
||
{
|
||
"name": "A",
|
||
"assigned_expert": "member1",
|
||
"task_description": "阶段A",
|
||
"depends_on": [],
|
||
},
|
||
{
|
||
"name": "B",
|
||
"assigned_expert": "member2",
|
||
"task_description": "阶段B",
|
||
"depends_on": [],
|
||
},
|
||
]
|
||
)
|
||
team._experts["lead"].agent._llm_gateway = gateway
|
||
|
||
result = await orchestrator.execute("并行任务")
|
||
|
||
assert result["status"] == "completed"
|
||
plan = result["plan"]
|
||
assert len(plan.phases) == 2
|
||
for ph in plan.phases:
|
||
assert ph.status == PhaseStatus.COMPLETED
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_mixed_parallel_and_sequential(self):
|
||
"""混合并行和串行:A、B 并行,C 依赖 A 和 B"""
|
||
team = _make_team_with_experts()
|
||
orchestrator = TeamOrchestrator(team)
|
||
|
||
gateway = _make_mock_llm_gateway(
|
||
phases=[
|
||
{
|
||
"name": "A",
|
||
"assigned_expert": "lead",
|
||
"task_description": "阶段A",
|
||
"depends_on": [],
|
||
},
|
||
{
|
||
"name": "B",
|
||
"assigned_expert": "member1",
|
||
"task_description": "阶段B",
|
||
"depends_on": [],
|
||
},
|
||
{
|
||
"name": "C",
|
||
"assigned_expert": "member2",
|
||
"task_description": "阶段C",
|
||
"depends_on": ["A", "B"],
|
||
},
|
||
]
|
||
)
|
||
team._experts["lead"].agent._llm_gateway = gateway
|
||
|
||
result = await orchestrator.execute("混合任务")
|
||
|
||
assert result["status"] == "completed"
|
||
plan = result["plan"]
|
||
assert len(plan.phases) == 3
|
||
c_phase = next(p for p in plan.phases if p.name == "C")
|
||
assert len(c_phase.depends_on) == 2
|
||
|
||
|
||
# ── F5: Phase Failure and Dependency Propagation ─────────
|
||
|
||
|
||
class TestPhaseFailure:
|
||
"""F5: 阶段失败和依赖失败传播"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_phase_dependency_failure_propagation(self):
|
||
"""阶段 B 依赖 A,A 失败时 B 标记为 FAILED"""
|
||
team = _make_team_with_experts()
|
||
# Make lead's agent fail
|
||
team._experts["lead"].agent = _make_mock_agent("lead", fail=True)
|
||
|
||
orchestrator = TeamOrchestrator(team)
|
||
|
||
gateway = _make_mock_llm_gateway(
|
||
phases=[
|
||
{
|
||
"name": "A",
|
||
"assigned_expert": "lead",
|
||
"task_description": "阶段A",
|
||
"depends_on": [],
|
||
},
|
||
{
|
||
"name": "B",
|
||
"assigned_expert": "member1",
|
||
"task_description": "阶段B",
|
||
"depends_on": ["A"],
|
||
},
|
||
]
|
||
)
|
||
team._experts["lead"].agent._llm_gateway = gateway
|
||
|
||
result = await orchestrator.execute("失败传播任务")
|
||
|
||
# Should fallback since all phases failed
|
||
assert result["status"] == "fallback"
|
||
plan = result["plan"]
|
||
a_phase = next(p for p in plan.phases if p.name == "A")
|
||
b_phase = next(p for p in plan.phases if p.name == "B")
|
||
assert a_phase.status == PhaseStatus.FAILED
|
||
assert b_phase.status == PhaseStatus.FAILED
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_fallback_on_all_failure(self):
|
||
"""所有阶段失败时 fallback 到单 Agent"""
|
||
team = _make_team_with_experts()
|
||
# All experts fail
|
||
for name in team._experts:
|
||
team._experts[name].agent = _make_mock_agent(name, fail=True)
|
||
|
||
orchestrator = TeamOrchestrator(team)
|
||
|
||
# No LLM so single-phase path; that phase also fails → fallback
|
||
result = await orchestrator.execute("全失败任务")
|
||
|
||
assert result["status"] == "fallback"
|
||
assert "result" in result
|
||
assert result["result"] is not None
|
||
|
||
|
||
# ── F6: SharedWorkspace Data Passing ─────────────────────
|
||
|
||
|
||
class TestSharedWorkspacePassing:
|
||
"""F6: 阶段间通过 SharedWorkspace 传递数据"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_phase_output_written_to_workspace(self, workspace):
|
||
"""阶段 A 的输出写入 SharedWorkspace"""
|
||
team = _make_team_with_experts(workspace=workspace)
|
||
orchestrator = TeamOrchestrator(team)
|
||
|
||
gateway = _make_mock_llm_gateway(
|
||
phases=[
|
||
{
|
||
"name": "A",
|
||
"assigned_expert": "lead",
|
||
"task_description": "阶段A",
|
||
"depends_on": [],
|
||
},
|
||
{
|
||
"name": "B",
|
||
"assigned_expert": "member1",
|
||
"task_description": "阶段B",
|
||
"depends_on": ["A"],
|
||
},
|
||
]
|
||
)
|
||
team._experts["lead"].agent._llm_gateway = gateway
|
||
|
||
result = await orchestrator.execute("数据传递任务")
|
||
|
||
assert result["status"] == "completed"
|
||
plan = result["plan"]
|
||
|
||
# Phase A output should be in workspace
|
||
a_phase = next(p for p in plan.phases if p.name == "A")
|
||
output_key = f"{plan.id}/phase/{a_phase.id}/output"
|
||
data = await workspace.read(output_key)
|
||
assert data is not None
|
||
assert "Result from lead" in data.get("value", "")
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_dependency_output_read_by_dependent(self, workspace):
|
||
"""阶段 B 读取阶段 A 的输出"""
|
||
team = _make_team_with_experts(workspace=workspace)
|
||
orchestrator = TeamOrchestrator(team)
|
||
|
||
gateway = _make_mock_llm_gateway(
|
||
phases=[
|
||
{
|
||
"name": "A",
|
||
"assigned_expert": "lead",
|
||
"task_description": "阶段A",
|
||
"depends_on": [],
|
||
},
|
||
{
|
||
"name": "B",
|
||
"assigned_expert": "member1",
|
||
"task_description": "阶段B",
|
||
"depends_on": ["A"],
|
||
},
|
||
]
|
||
)
|
||
team._experts["lead"].agent._llm_gateway = gateway
|
||
|
||
# Track what input_data member1's agent receives
|
||
received_inputs: list[dict] = []
|
||
original_execute = team._experts["member1"].agent.execute
|
||
|
||
async def tracking_execute(task_msg):
|
||
received_inputs.append(task_msg.input_data)
|
||
return await original_execute(task_msg)
|
||
|
||
team._experts["member1"].agent.execute = tracking_execute
|
||
|
||
await orchestrator.execute("依赖读取任务")
|
||
|
||
# member1 should have received dependency_outputs in input_data
|
||
assert len(received_inputs) > 0
|
||
b_input = received_inputs[0]
|
||
assert "dependency_outputs" in b_input
|
||
assert "A" in b_input["dependency_outputs"]
|
||
|
||
|
||
# ── F7: Context Isolation ────────────────────────────────
|
||
|
||
|
||
class TestContextIsolation:
|
||
"""F7: 上下文隔离 - 每个阶段创建独立 ConfigDrivenAgent"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_isolated_agent_created_per_phase(self):
|
||
"""每个阶段通过 AgentPool 创建独立的 agent"""
|
||
pool = _make_mock_pool()
|
||
team = _make_team_with_experts(pool=pool)
|
||
orchestrator = TeamOrchestrator(team)
|
||
|
||
gateway = _make_mock_llm_gateway(
|
||
phases=[
|
||
{
|
||
"name": "A",
|
||
"assigned_expert": "lead",
|
||
"task_description": "阶段A",
|
||
"depends_on": [],
|
||
},
|
||
{
|
||
"name": "B",
|
||
"assigned_expert": "member1",
|
||
"task_description": "阶段B",
|
||
"depends_on": [],
|
||
},
|
||
]
|
||
)
|
||
team._experts["lead"].agent._llm_gateway = gateway
|
||
|
||
await orchestrator.execute("隔离任务")
|
||
|
||
# Pool.create_agent should have been called for each phase
|
||
assert pool.create_agent.call_count >= 2
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_isolated_agent_cleaned_up_after_phase(self):
|
||
"""阶段完成后清理临时 agent"""
|
||
pool = _make_mock_pool()
|
||
team = _make_team_with_experts(pool=pool)
|
||
orchestrator = TeamOrchestrator(team)
|
||
|
||
gateway = _make_mock_llm_gateway(
|
||
phases=[
|
||
{
|
||
"name": "A",
|
||
"assigned_expert": "lead",
|
||
"task_description": "阶段A",
|
||
"depends_on": [],
|
||
},
|
||
]
|
||
)
|
||
team._experts["lead"].agent._llm_gateway = gateway
|
||
|
||
await orchestrator.execute("清理任务")
|
||
|
||
# All temp agents should be cleaned up
|
||
assert len(orchestrator._temp_agents) == 0
|
||
assert pool.remove_agent.call_count >= 1
|
||
|
||
|
||
# ── F8: Event Sequence ───────────────────────────────────
|
||
|
||
|
||
class TestEventSequence:
|
||
"""F8: 事件顺序正确"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_event_sequence_pipeline_execution(self):
|
||
"""事件顺序:team_formed → plan_update → phase_started → phase_completed → team_synthesis"""
|
||
transport = InProcessHandoffTransport()
|
||
team = _make_team_with_experts(transport=transport)
|
||
orchestrator = TeamOrchestrator(team)
|
||
|
||
gateway = _make_mock_llm_gateway(
|
||
phases=[
|
||
{
|
||
"name": "A",
|
||
"assigned_expert": "lead",
|
||
"task_description": "阶段A",
|
||
"depends_on": [],
|
||
},
|
||
]
|
||
)
|
||
team._experts["lead"].agent._llm_gateway = gateway
|
||
|
||
events, listener_task = await _capture_events(transport, team.team_channel)
|
||
|
||
await orchestrator.execute("事件序列任务")
|
||
|
||
# Stop listener
|
||
transport.close()
|
||
try:
|
||
await asyncio.wait_for(listener_task, timeout=0.5)
|
||
except (asyncio.TimeoutError, Exception):
|
||
listener_task.cancel()
|
||
|
||
event_types = [e.get("type") for e in events]
|
||
|
||
# Verify required events are present in order
|
||
assert "team_formed" in event_types
|
||
assert "plan_update" in event_types
|
||
assert "phase_started" in event_types
|
||
assert "phase_completed" in event_types
|
||
assert "team_synthesis" in event_types
|
||
|
||
# Verify order
|
||
assert event_types.index("team_formed") < event_types.index("plan_update")
|
||
assert event_types.index("plan_update") < event_types.index("phase_started")
|
||
assert event_types.index("phase_started") < event_types.index("phase_completed")
|
||
assert event_types.index("phase_completed") < event_types.index("team_synthesis")
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_phase_failed_event_emitted_on_failure(self):
|
||
"""阶段失败时发出 phase_failed 事件"""
|
||
transport = InProcessHandoffTransport()
|
||
team = _make_team_with_experts(transport=transport)
|
||
# Make all agents fail to trigger phase_failed
|
||
for name in team._experts:
|
||
team._experts[name].agent = _make_mock_agent(name, fail=True)
|
||
orchestrator = TeamOrchestrator(team)
|
||
|
||
events, listener_task = await _capture_events(transport, team.team_channel)
|
||
|
||
await orchestrator.execute("失败事件任务")
|
||
|
||
transport.close()
|
||
try:
|
||
await asyncio.wait_for(listener_task, timeout=0.5)
|
||
except (asyncio.TimeoutError, Exception):
|
||
listener_task.cancel()
|
||
|
||
event_types = [e.get("type") for e in events]
|
||
# phase_started should be emitted before failure
|
||
assert "phase_started" in event_types
|
||
|
||
|
||
# ── F9: TeamStatus.PLANNING Transition ───────────────────
|
||
|
||
|
||
class TestTeamStatusPlanning:
|
||
"""F9: create_team 后状态为 PLANNING"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_create_team_sets_planning_status(self):
|
||
"""create_team 后设置 PLANNING 状态"""
|
||
pool = _make_mock_pool()
|
||
team = ExpertTeam(pool=pool)
|
||
|
||
lead_config = _make_expert_config("lead", is_lead=True)
|
||
member_config = _make_expert_config("member")
|
||
|
||
await team.create_team(lead_config, [member_config])
|
||
|
||
assert team.status == TeamStatus.PLANNING
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_orchestrator_sets_planning_before_decomposition(self):
|
||
"""orchestrator.execute 在分解任务前设置 PLANNING"""
|
||
team = _make_team_with_experts()
|
||
orchestrator = TeamOrchestrator(team)
|
||
|
||
statuses_during_execution: list[TeamStatus] = []
|
||
original_set_status = team.set_status
|
||
|
||
def tracking_set_status(status: TeamStatus) -> None:
|
||
statuses_during_execution.append(status)
|
||
original_set_status(status)
|
||
|
||
team.set_status = tracking_set_status # type: ignore[assignment]
|
||
|
||
await orchestrator.execute("任务")
|
||
|
||
# First status set should be PLANNING
|
||
assert TeamStatus.PLANNING in statuses_during_execution
|
||
|
||
|
||
# ── F10: Circular Dependency Detection ───────────────────
|
||
|
||
|
||
class TestCircularDependency:
|
||
"""F10: 循环依赖检测"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_circular_dependency_triggers_fallback(self):
|
||
"""A→B→A 循环依赖 → fallback 到单 Agent"""
|
||
team = _make_team_with_experts()
|
||
orchestrator = TeamOrchestrator(team)
|
||
|
||
# Phases with circular dependency: A depends on B, B depends on A
|
||
# We need to craft LLM response that creates circular deps via name resolution
|
||
# Since depends_on is resolved by name → id, and ids are auto-generated,
|
||
# we craft phases where A depends on B and B depends on A
|
||
gateway = _make_mock_llm_gateway(
|
||
phases=[
|
||
{
|
||
"name": "A",
|
||
"assigned_expert": "lead",
|
||
"task_description": "阶段A",
|
||
"depends_on": ["B"],
|
||
},
|
||
{
|
||
"name": "B",
|
||
"assigned_expert": "member1",
|
||
"task_description": "阶段B",
|
||
"depends_on": ["A"],
|
||
},
|
||
]
|
||
)
|
||
team._experts["lead"].agent._llm_gateway = gateway
|
||
|
||
result = await orchestrator.execute("循环依赖任务")
|
||
|
||
# Should fallback due to ValueError from topological_sort
|
||
assert result["status"] == "fallback"
|
||
|
||
def test_topological_sort_raises_on_cycle(self):
|
||
"""topological_sort() 检测到循环依赖时抛出 ValueError"""
|
||
plan = TeamPlan(task="test", lead_expert="lead")
|
||
a = PlanPhase(name="A", assigned_expert="lead")
|
||
b = PlanPhase(name="B", assigned_expert="member1")
|
||
a.depends_on = [b.id]
|
||
b.depends_on = [a.id]
|
||
plan.phases = [a, b]
|
||
|
||
with pytest.raises(ValueError, match="Circular dependency"):
|
||
plan.topological_sort()
|
||
|
||
|
||
# ── F11: Invalid Expert Reference Fallback ───────────────
|
||
|
||
|
||
class TestInvalidExpertReference:
|
||
"""F11: 阶段引用不存在的专家 → fallback 到 lead"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_invalid_expert_reference_falls_back_to_lead(self):
|
||
"""阶段引用不存在的专家时 fallback 到 lead expert"""
|
||
team = _make_team_with_experts()
|
||
orchestrator = TeamOrchestrator(team)
|
||
|
||
gateway = _make_mock_llm_gateway(
|
||
phases=[
|
||
{
|
||
"name": "A",
|
||
"assigned_expert": "nonexistent_expert",
|
||
"task_description": "阶段A",
|
||
"depends_on": [],
|
||
},
|
||
]
|
||
)
|
||
team._experts["lead"].agent._llm_gateway = gateway
|
||
|
||
result = await orchestrator.execute("无效专家任务")
|
||
|
||
# Should complete (fallback to lead for the phase)
|
||
assert result["status"] == "completed"
|
||
plan = result["plan"]
|
||
a_phase = plan.phases[0]
|
||
# assigned_expert should have been reassigned to lead
|
||
assert a_phase.assigned_expert == "lead"
|
||
assert a_phase.status == PhaseStatus.COMPLETED
|
||
|
||
|
||
# ── F12: LLM Decomposition Failure ───────────────────────
|
||
|
||
|
||
class TestLLMDecompositionFailure:
|
||
"""F12: LLM 分解失败 → fallback 到单阶段"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_llm_decomposition_failure_falls_back_to_single_phase(self):
|
||
"""LLM 返回无效 JSON → fallback 到单阶段"""
|
||
team = _make_team_with_experts()
|
||
orchestrator = TeamOrchestrator(team)
|
||
|
||
# Gateway that returns invalid JSON for decomposition
|
||
gateway = AsyncMock()
|
||
decomp_response = MagicMock()
|
||
decomp_response.content = "This is not JSON"
|
||
synth_response = MagicMock()
|
||
synth_response.content = "综合结果"
|
||
gateway.chat = AsyncMock(side_effect=[decomp_response, synth_response])
|
||
team._experts["lead"].agent._llm_gateway = gateway
|
||
|
||
result = await orchestrator.execute("无效JSON任务")
|
||
|
||
# Should complete as single phase
|
||
assert result["status"] == "completed"
|
||
plan = result["plan"]
|
||
assert len(plan.phases) == 1
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_llm_unavailable_falls_back_to_single_phase(self):
|
||
"""LLM 不可用时 fallback 到单阶段"""
|
||
team = _make_team_with_experts()
|
||
orchestrator = TeamOrchestrator(team)
|
||
|
||
gateway = _make_mock_llm_gateway(decomp_fail=True)
|
||
team._experts["lead"].agent._llm_gateway = gateway
|
||
|
||
result = await orchestrator.execute("LLM不可用任务")
|
||
|
||
assert result["status"] == "completed"
|
||
plan = result["plan"]
|
||
assert len(plan.phases) == 1
|
||
assert plan.phases[0].assigned_expert == "lead"
|
||
|
||
|
||
# ── F13: Decentralized Collaboration (HandoffTransport) ──
|
||
|
||
|
||
class TestDecentralizedCollaboration:
|
||
"""F13: 专家间直接协作(HandoffTransport)"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_expert_direct_handoff(self):
|
||
"""AE4: Expert A 请求 Expert B 协助"""
|
||
transport = InProcessHandoffTransport()
|
||
channel = "expert:analyst:handoff"
|
||
|
||
messages: list[dict] = []
|
||
|
||
async def listener():
|
||
async for msg in transport.listen(channel):
|
||
messages.append(msg)
|
||
break
|
||
|
||
task = asyncio.create_task(listener())
|
||
await asyncio.sleep(0.05)
|
||
|
||
await transport.send(
|
||
channel,
|
||
{
|
||
"source_expert": "analyst",
|
||
"target_expert": "researcher",
|
||
"task": "需要行业数据",
|
||
"type": "assist_request",
|
||
},
|
||
)
|
||
|
||
await asyncio.wait_for(task, timeout=2.0)
|
||
|
||
assert len(messages) == 1
|
||
assert messages[0]["source_expert"] == "analyst"
|
||
assert messages[0]["type"] == "assist_request"
|
||
|
||
transport.close()
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_team_channel_broadcast(self):
|
||
"""团队频道消息广播到所有专家"""
|
||
transport = InProcessHandoffTransport()
|
||
channel = "team:test-team"
|
||
|
||
consumer1_msgs: list[dict] = []
|
||
consumer2_msgs: list[dict] = []
|
||
|
||
async def consumer1():
|
||
async for msg in transport.listen(channel):
|
||
consumer1_msgs.append(msg)
|
||
if len(consumer1_msgs) >= 1:
|
||
break
|
||
|
||
async def consumer2():
|
||
async for msg in transport.listen(channel):
|
||
consumer2_msgs.append(msg)
|
||
if len(consumer2_msgs) >= 1:
|
||
break
|
||
|
||
t1 = asyncio.create_task(consumer1())
|
||
t2 = asyncio.create_task(consumer2())
|
||
await asyncio.sleep(0.05)
|
||
|
||
await transport.send(channel, {"type": "chat", "content": "hello"})
|
||
|
||
await asyncio.wait_for(asyncio.gather(t1, t2), timeout=2.0)
|
||
|
||
assert len(consumer1_msgs) == 1
|
||
assert len(consumer2_msgs) == 1
|
||
|
||
transport.close()
|
||
|
||
|
||
# ── F14: User Intervention ───────────────────────────────
|
||
|
||
|
||
class TestUserIntervention:
|
||
"""F14: 用户干预消息广播"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_user_intervention_broadcast(self):
|
||
"""AE3: 用户干预消息到达所有专家"""
|
||
transport = InProcessHandoffTransport()
|
||
channel = "team:intervention-test"
|
||
|
||
received: list[dict] = []
|
||
|
||
async def listener():
|
||
async for msg in transport.listen(channel):
|
||
received.append(msg)
|
||
if len(received) >= 1:
|
||
break
|
||
|
||
task = asyncio.create_task(listener())
|
||
await asyncio.sleep(0.05)
|
||
|
||
await transport.send(
|
||
channel,
|
||
{"type": "user_intervention", "content": "重点看成本优化"},
|
||
)
|
||
|
||
await asyncio.wait_for(task, timeout=2.0)
|
||
|
||
assert len(received) == 1
|
||
assert received[0]["type"] == "user_intervention"
|
||
|
||
transport.close()
|
||
|
||
|
||
# ── F15: Team Dissolution ────────────────────────────────
|
||
|
||
|
||
class TestTeamDissolution:
|
||
"""F15: 团队解散和输出保留"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_dissolution_preserves_outputs(self, workspace):
|
||
"""R36: 临时 Expert 输出在 SharedWorkspace 中保留"""
|
||
await workspace.write("team:test:analyst:result", {"report": "analysis result"}, "analyst")
|
||
|
||
data = await workspace.read("team:test:analyst:result")
|
||
assert data is not None
|
||
assert data["value"]["report"] == "analysis result"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_dissolution_sets_status(self):
|
||
"""团队解散后状态为 DISSOLVED"""
|
||
pool = _make_mock_pool()
|
||
team = ExpertTeam(pool=pool)
|
||
|
||
lead_config = _make_expert_config("lead", is_lead=True)
|
||
await team.create_team(lead_config)
|
||
|
||
await team.dissolve()
|
||
|
||
assert team.status == TeamStatus.DISSOLVED
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_dissolution_clears_experts(self):
|
||
"""团队解散后 experts 列表为空"""
|
||
pool = _make_mock_pool()
|
||
team = ExpertTeam(pool=pool)
|
||
|
||
lead_config = _make_expert_config("lead", is_lead=True)
|
||
member_config = _make_expert_config("member")
|
||
await team.create_team(lead_config, [member_config])
|
||
|
||
assert len(team.experts) == 2
|
||
|
||
await team.dissolve()
|
||
|
||
assert len(team.experts) == 0
|
||
assert team.lead_expert is None
|
||
|
||
|
||
# ── F16: Dynamic Expert Management ───────────────────────
|
||
|
||
|
||
class TestDynamicExpertManagement:
|
||
"""F16: 动态添加和移除专家"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_add_expert_by_template(self, registry):
|
||
"""通过模板名添加专家"""
|
||
router = ExpertTeamRouter(registry)
|
||
result = router.resolve("@team:tech_lead 分析报告")
|
||
|
||
configs = router.resolve_expert_configs(result.specified_experts)
|
||
assert len(configs) == 1
|
||
assert configs[0].name == "tech_lead"
|
||
assert configs[0].is_lead is True
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_add_expert_dynamic(self, registry):
|
||
"""未知专家名动态创建配置"""
|
||
router = ExpertTeamRouter(registry)
|
||
configs = router.resolve_expert_configs(["legal_advisor"])
|
||
|
||
assert len(configs) == 1
|
||
assert configs[0].name == "legal_advisor"
|
||
assert "legal_advisor" in configs[0].persona
|