fischer-agentkit/tests/integration/test_expert_team.py

280 lines
8.3 KiB
Python

"""Integration tests for Expert Team Mode.
Covers Key Flows F1-F6 and Acceptance Examples AE1-AE5 from the requirements document.
Uses mocked AgentPool and LLM calls to test orchestration logic.
Note: Tests using removed classes (CollaborationPlan, PlanPhase, ParallelType, MergeStrategy,
PhaseStatus) are temporarily skipped. U9 will rewrite them for pipeline mode.
"""
import asyncio
import pytest
from agentkit.experts.config import ExpertConfig, ExpertTemplate
from agentkit.experts.plan import PlanStatus
from agentkit.experts.team import TeamStatus
from agentkit.experts.router import ExpertTeamRouter
from agentkit.experts.registry import ExpertTemplateRegistry
from agentkit.core.handoff_transport import InProcessHandoffTransport
from agentkit.core.shared_workspace import SharedWorkspace
# --- Helpers ---
def make_expert_config(
name: str,
persona: str = "",
is_lead: bool = False,
bound_skills: list[str] | None = None,
) -> ExpertConfig:
"""Helper to create ExpertConfig for testing."""
return ExpertConfig(
name=name,
persona=persona,
is_lead=is_lead,
bound_skills=bound_skills or [],
agent_type="expert",
task_mode="llm_generate",
prompt={"identity": f"You are {name}, {persona}"} if persona else {"identity": f"You are {name}"},
)
# --- Fixtures ---
@pytest.fixture
def workspace():
return SharedWorkspace()
@pytest.fixture
def registry():
reg = ExpertTemplateRegistry()
reg.register(
ExpertTemplate(
name="analyst",
description="Data Analyst",
config=make_expert_config(
"analyst", "数据分析专家", bound_skills=["data_analysis"]
),
)
)
reg.register(
ExpertTemplate(
name="strategist",
description="Strategy Consultant",
config=make_expert_config(
"strategist", "战略顾问", bound_skills=["strategy_planning"]
),
)
)
reg.register(
ExpertTemplate(
name="architect",
description="Software Architect",
config=make_expert_config(
"architect", "软件架构师", bound_skills=["system_design"]
),
)
)
return reg
# --- F1: Manual Team Formation ---
class TestManualTeamFormation:
"""Covers F1: User specifies expert team members."""
async def test_manual_team_with_templates(self, registry):
"""AE1: User specifies expert team by template names."""
router = ExpertTeamRouter(registry)
result = router.resolve("@team:analyst,strategist 分析这份市场报告")
assert result.team_mode is True
assert result.specified_experts == ["analyst", "strategist"]
assert result.auto_compose is False
# Resolve to configs
configs = router.resolve_expert_configs(result.specified_experts)
assert len(configs) == 2
assert configs[0].name == "analyst"
assert configs[1].name == "strategist"
# --- F2: Auto Team Formation ---
# (test_auto_team_high_complexity and test_auto_team_competitive_parallel skipped — U9 will rewrite)
# --- F3: Decentralized Collaboration ---
class TestDecentralizedCollaboration:
"""Covers F3: Experts collaborate directly without Lead mediation."""
async def test_expert_direct_handoff(self):
"""AE4: Expert A requests assistance from Expert B directly."""
transport = InProcessHandoffTransport()
channel = "expert:analyst:handoff"
# Start listening first (consumer must be registered before send)
messages = []
async def listener():
async for msg in transport.listen(channel):
messages.append(msg)
break
task = asyncio.create_task(listener())
await asyncio.sleep(0.05)
# Expert A sends assist request
await transport.send(
channel,
{
"source_expert": "analyst",
"target_expert": "researcher",
"task": "需要行业数据",
"type": "assist_request",
},
)
await asyncio.wait_for(task, timeout=2.0)
assert len(messages) == 1
assert messages[0]["source_expert"] == "analyst"
assert messages[0]["type"] == "assist_request"
transport.close()
async def test_team_channel_broadcast(self):
"""All experts receive team channel messages."""
transport = InProcessHandoffTransport()
channel = "team:test-team"
# Two consumers listening
consumer1_msgs = []
consumer2_msgs = []
async def consumer1():
async for msg in transport.listen(channel):
consumer1_msgs.append(msg)
if len(consumer1_msgs) >= 1:
break
async def consumer2():
async for msg in transport.listen(channel):
consumer2_msgs.append(msg)
if len(consumer2_msgs) >= 1:
break
# Start consumers
t1 = asyncio.create_task(consumer1())
t2 = asyncio.create_task(consumer2())
await asyncio.sleep(0.05)
# Send message
await transport.send(channel, {"type": "chat", "content": "hello"})
await asyncio.wait_for(asyncio.gather(t1, t2), timeout=2.0)
assert len(consumer1_msgs) == 1
assert len(consumer2_msgs) == 1
transport.close()
# --- F4: User Intervention ---
class TestUserIntervention:
"""Covers F4: User intervenes during collaboration."""
async def test_user_intervention_broadcast(self):
"""AE3: User intervention message reaches all experts."""
transport = InProcessHandoffTransport()
channel = "team:intervention-test"
received = []
async def listener():
async for msg in transport.listen(channel):
received.append(msg)
if len(received) >= 1:
break
task = asyncio.create_task(listener())
await asyncio.sleep(0.05)
await transport.send(
channel,
{"type": "user_intervention", "content": "重点看成本优化"},
)
await asyncio.wait_for(task, timeout=2.0)
assert len(received) == 1
assert received[0]["type"] == "user_intervention"
transport.close()
# --- F5: Competitive Parallel ---
# (test_vote_strategy_with_tie_break and test_fusion_strategy skipped — U9 will rewrite)
# --- F6: Team Dissolution ---
class TestTeamDissolution:
"""Covers F6: Team dissolution and output preservation."""
async def test_dissolution_preserves_outputs(self, workspace):
"""R36: Temporary Expert outputs preserved in SharedWorkspace after dissolution."""
# Write some output to workspace
await workspace.write(
"team:test:analyst:result", {"report": "analysis result"}, "analyst"
)
# Verify output exists
data = await workspace.read("team:test:analyst:result")
assert data is not None
assert data["value"]["report"] == "analysis result"
async def test_dissolution_sets_status(self):
"""Team status becomes DISSOLVED after dissolution."""
assert TeamStatus.DISSOLVED == "dissolved"
# --- Retry and Fallback ---
# (test_plan_failure_triggers_retry and test_fallback_after_retry_failure skipped — U9 will rewrite)
# --- Dynamic Expert Addition/Removal ---
class TestDynamicExpertManagement:
"""AE5: Dynamic addition and removal of experts."""
async def test_add_expert_by_template(self, registry):
"""Add expert by template name."""
router = ExpertTeamRouter(registry)
result = router.resolve("@team:analyst 分析报告")
configs = router.resolve_expert_configs(result.specified_experts)
assert len(configs) == 1
assert configs[0].name == "analyst"
assert configs[0].bound_skills == ["data_analysis"]
async def test_add_expert_dynamic(self, registry):
"""Add expert with non-existent template creates dynamic config."""
router = ExpertTeamRouter(registry)
configs = router.resolve_expert_configs(["legal_advisor"])
assert len(configs) == 1
assert configs[0].name == "legal_advisor"
assert "legal_advisor" in configs[0].persona