diff --git a/tests/integration/test_expert_team.py b/tests/integration/test_expert_team.py index 8d34de5..5c1fd37 100644 --- a/tests/integration/test_expert_team.py +++ b/tests/integration/test_expert_team.py @@ -1,47 +1,247 @@ -"""Integration tests for Expert Team Mode. +"""Integration tests for Expert Team Pipeline Mode. -Covers Key Flows F1-F6 and Acceptance Examples AE1-AE5 from the requirements document. -Uses mocked AgentPool and LLM calls to test orchestration logic. +Covers the pipeline orchestration flow end-to-end with mocked LLM and AgentPool: +- F1: Manual team formation via @team:expert1,expert2 +- F2: Default team template (@team:dev_team) +- F3: Pipeline execution (sequential phases with dependencies) +- F4: Parallel phases (no dependencies) +- F5: Phase failure and dependency failure propagation +- F6: SharedWorkspace data passing between phases +- F7: Context isolation (independent ConfigDrivenAgent per phase) +- F8: Fallback to single agent when all phases fail +- F9: Event sequence (team_formed → plan_update → phase_started → ... → team_synthesis) +- F10: TeamStatus.PLANNING state transition +- F11: Circular dependency detection +- F12: Invalid expert reference fallback +- F13: LLM decomposition failure fallback -Note: Tests using removed classes (CollaborationPlan, PlanPhase, ParallelType, MergeStrategy, -PhaseStatus) are temporarily skipped. U9 will rewrite them for pipeline mode. +Replaces the legacy CollaborationPlan-based tests (U9). """ +from __future__ import annotations + import asyncio +import json +from unittest.mock import AsyncMock, MagicMock import pytest -from agentkit.experts.config import ExpertConfig, ExpertTemplate -from agentkit.experts.plan import PlanStatus -from agentkit.experts.team import TeamStatus -from agentkit.experts.router import ExpertTeamRouter -from agentkit.experts.registry import ExpertTemplateRegistry from agentkit.core.handoff_transport import InProcessHandoffTransport +from agentkit.core.protocol import TaskResult, TaskStatus from agentkit.core.shared_workspace import SharedWorkspace +from agentkit.experts.config import ExpertConfig, ExpertTemplate +from agentkit.experts.orchestrator import TeamOrchestrator +from agentkit.experts.plan import PhaseStatus, PlanPhase, TeamPlan +from agentkit.experts.registry import ExpertTemplateRegistry +from agentkit.experts.router import ExpertTeamRouter +from agentkit.experts.team import ExpertTeam, TeamStatus -# --- Helpers --- +# ── 辅助函数 ────────────────────────────────────────────── -def make_expert_config( +def _make_expert_config( name: str, persona: str = "", is_lead: bool = False, bound_skills: list[str] | None = None, + llm: dict | None = None, ) -> ExpertConfig: - """Helper to create ExpertConfig for testing.""" + """创建测试用 ExpertConfig""" return ExpertConfig( name=name, - persona=persona, + persona=persona or f"{name} expert", is_lead=is_lead, bound_skills=bound_skills or [], agent_type="expert", task_mode="llm_generate", - prompt={"identity": f"You are {name}, {persona}"} if persona else {"identity": f"You are {name}"}, + prompt={"identity": f"You are {name}"}, + llm=llm, ) -# --- Fixtures --- +def _make_mock_agent( + name: str, + content: str | None = None, + fail: bool = False, + llm_gateway: MagicMock | None = None, +) -> MagicMock: + """创建 mock ConfigDrivenAgent""" + agent = MagicMock() + if fail: + agent.execute = AsyncMock(side_effect=RuntimeError(f"{name} execution failed")) + else: + agent.execute = AsyncMock( + return_value=TaskResult( + task_id="test", + agent_name=name, + status=TaskStatus.COMPLETED.value, + output_data={"content": content or f"Result from {name}"}, + error_message=None, + started_at=None, + completed_at=None, + ) + ) + agent._llm_gateway = llm_gateway + return agent + + +def _make_mock_expert( + name: str, + is_lead: bool = False, + is_active: bool = True, + content: str | None = None, + fail: bool = False, + llm_gateway: MagicMock | None = None, + llm: dict | None = None, +) -> MagicMock: + """创建 mock Expert (spec=Expert)""" + from agentkit.experts.expert import Expert + + config = _make_expert_config(name=name, is_lead=is_lead, llm=llm) + expert = MagicMock(spec=Expert) + expert.config = config + expert.is_active = is_active + expert.team_id = None + expert.get_capabilities_summary.return_value = { + "name": name, + "persona": config.persona, + "bound_skills": config.bound_skills, + "is_lead": is_lead, + } + expert.agent = _make_mock_agent(name, content=content, fail=fail, llm_gateway=llm_gateway) + return expert + + +def _make_mock_llm_gateway( + phases: list[dict] | None = None, + synthesis_content: str = "综合结果", + decomp_fail: bool = False, +) -> MagicMock: + """创建 mock LLM gateway. + + - phases: 提供 LLM 分解返回的阶段列表 + - decomp_fail: 分解调用抛出异常 + """ + gateway = AsyncMock() + if decomp_fail: + gateway.chat = AsyncMock(side_effect=RuntimeError("LLM unavailable")) + return gateway + + if phases: + phases_json = json.dumps(phases) + decomp_response = MagicMock() + decomp_response.content = phases_json + synth_response = MagicMock() + synth_response.content = synthesis_content + gateway.chat = AsyncMock(side_effect=[decomp_response, synth_response]) + else: + response = MagicMock() + response.content = synthesis_content + gateway.chat = AsyncMock(return_value=response) + return gateway + + +def _make_team_with_experts( + expert_names: list[str] | None = None, + lead_name: str = "lead", + pool: MagicMock | None = None, + workspace: SharedWorkspace | None = None, + transport: InProcessHandoffTransport | None = None, +) -> ExpertTeam: + """创建包含 mock experts 的 ExpertTeam""" + team = ExpertTeam(workspace=workspace or SharedWorkspace()) + # Use a real InProcessHandoffTransport so events can be captured + team._handoff_transport = transport or InProcessHandoffTransport() + if pool is not None: + team._pool = pool + + if expert_names is None: + expert_names = [lead_name, "member1", "member2"] + + for name in expert_names: + is_lead = name == lead_name + expert = _make_mock_expert(name=name, is_lead=is_lead) + team._experts[name] = expert + if is_lead: + team._lead_expert_name = name + + return team + + +def _make_mock_pool(content_prefix: str = "Isolated") -> MagicMock: + """创建 mock AgentPool,模拟上下文隔离的 agent 创建""" + pool = MagicMock() + pool.create_agent = AsyncMock( + side_effect=lambda config: _make_mock_agent( + config.name, content=f"{content_prefix} result from {config.name}" + ) + ) + pool.remove_agent = AsyncMock() + return pool + + +def _make_registry_with_dev_team() -> ExpertTemplateRegistry: + """创建包含 dev_team 模板和 5 个编程专家的注册表""" + reg = ExpertTemplateRegistry() + for name in [ + "tech_lead", + "frontend_engineer", + "backend_engineer", + "qa_engineer", + "code_reviewer", + ]: + reg.register( + ExpertTemplate( + name=name, + description=f"{name} expert", + config=_make_expert_config( + name, + persona=f"{name} persona", + # Individual experts have empty bound_skills; + # only team templates use bound_skills for member list + bound_skills=[], + ), + ) + ) + # dev_team template stores members in bound_skills + reg.register( + ExpertTemplate( + name="dev_team", + description="Development team template", + config=ExpertConfig( + name="dev_team", + persona="dev team", + agent_type="expert", + task_mode="llm_generate", + prompt={"identity": "dev team"}, + bound_skills=[ + "tech_lead", + "frontend_engineer", + "backend_engineer", + "qa_engineer", + "code_reviewer", + ], + ), + ) + ) + return reg + + +async def _capture_events(transport: InProcessHandoffTransport, channel: str) -> list[dict]: + """Capture all events sent to a channel.""" + events: list[dict] = [] + + async def listener(): + async for msg in transport.listen(channel): + events.append(msg) + + task = asyncio.create_task(listener()) + await asyncio.sleep(0.05) # Let listener start + return events, task + + +# ── Fixtures ───────────────────────────────────────────── @pytest.fixture @@ -51,76 +251,711 @@ def workspace(): @pytest.fixture def registry(): - reg = ExpertTemplateRegistry() - reg.register( - ExpertTemplate( - name="analyst", - description="Data Analyst", - config=make_expert_config( - "analyst", "数据分析专家", bound_skills=["data_analysis"] - ), - ) - ) - reg.register( - ExpertTemplate( - name="strategist", - description="Strategy Consultant", - config=make_expert_config( - "strategist", "战略顾问", bound_skills=["strategy_planning"] - ), - ) - ) - reg.register( - ExpertTemplate( - name="architect", - description="Software Architect", - config=make_expert_config( - "architect", "软件架构师", bound_skills=["system_design"] - ), - ) - ) - return reg + return _make_registry_with_dev_team() -# --- F1: Manual Team Formation --- +# ── F1: Manual Team Formation ──────────────────────────── class TestManualTeamFormation: - """Covers F1: User specifies expert team members.""" + """F1: 用户通过 @team:expert1,expert2 指定专家团队""" - async def test_manual_team_with_templates(self, registry): - """AE1: User specifies expert team by template names.""" + def test_manual_team_with_templates(self, registry): + """AE1: 用户通过模板名指定专家团队""" router = ExpertTeamRouter(registry) - result = router.resolve("@team:analyst,strategist 分析这份市场报告") + result = router.resolve("@team:tech_lead,frontend_engineer 开发登录功能") assert result.team_mode is True - assert result.specified_experts == ["analyst", "strategist"] + assert result.specified_experts == ["tech_lead", "frontend_engineer"] assert result.auto_compose is False + assert result.task_content == "开发登录功能" + + def test_explicit_experts_resolved_to_configs(self, registry): + """指定专家名解析为 ExpertConfig""" + router = ExpertTeamRouter(registry) + result = router.resolve("@team:frontend_engineer,backend_engineer 任务") - # Resolve to configs configs = router.resolve_expert_configs(result.specified_experts) assert len(configs) == 2 - assert configs[0].name == "analyst" - assert configs[1].name == "strategist" + assert configs[0].name == "frontend_engineer" + assert configs[0].is_lead is True + assert configs[1].name == "backend_engineer" + assert configs[1].is_lead is False + + def test_dynamic_expert_creation_for_unknown_name(self, registry): + """未知专家名动态创建 ExpertConfig""" + router = ExpertTeamRouter(registry) + configs = router.resolve_expert_configs(["legal_advisor"]) + + assert len(configs) == 1 + assert configs[0].name == "legal_advisor" + assert configs[0].is_lead is True -# --- F2: Auto Team Formation --- -# (test_auto_team_high_complexity and test_auto_team_competitive_parallel skipped — U9 will rewrite) +# ── F2: Default Team Template ──────────────────────────── -# --- F3: Decentralized Collaboration --- +class TestDefaultTeamTemplate: + """F2: @team:dev_team 调用 5 个编程专家""" + + def test_dev_team_template_expands_to_members(self, registry): + """@team:dev_team 展开为 dev_team 模板的 bound_skills 成员列表""" + router = ExpertTeamRouter(registry) + result = router.resolve("@team:dev_team 开发用户系统") + + assert result.team_mode is True + assert result.specified_experts == [ + "tech_lead", + "frontend_engineer", + "backend_engineer", + "qa_engineer", + "code_reviewer", + ] + assert result.auto_compose is False + assert result.task_content == "开发用户系统" + + def test_no_experts_uses_default_template(self, registry): + """@team 无指定专家时使用默认 dev_team 模板""" + router = ExpertTeamRouter(registry) + result = router.resolve("@team 开发功能") + + assert result.team_mode is True + assert len(result.specified_experts) == 5 + assert "tech_lead" in result.specified_experts + assert result.auto_compose is False + + +# ── F3: Pipeline Sequential Execution ──────────────────── + + +class TestPipelineExecution: + """F3: 流水线模式执行测试""" + + @pytest.mark.asyncio + async def test_pipeline_execution_three_sequential_phases(self): + """3 阶段(A→B→C)按序执行""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + gateway = _make_mock_llm_gateway( + phases=[ + { + "name": "A", + "assigned_expert": "lead", + "task_description": "阶段A", + "depends_on": [], + }, + { + "name": "B", + "assigned_expert": "member1", + "task_description": "阶段B", + "depends_on": ["A"], + }, + { + "name": "C", + "assigned_expert": "member2", + "task_description": "阶段C", + "depends_on": ["B"], + }, + ] + ) + team._experts["lead"].agent._llm_gateway = gateway + + result = await orchestrator.execute("串行任务") + + assert result["status"] == "completed" + plan = result["plan"] + assert len(plan.phases) == 3 + for ph in plan.phases: + assert ph.status == PhaseStatus.COMPLETED + + @pytest.mark.asyncio + async def test_pipeline_single_phase_no_llm(self): + """无 LLM 时,任务作为单个阶段执行完成""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + result = await orchestrator.execute("简单任务") + + assert result["status"] == "completed" + assert "result" in result + assert "phase_results" in result + assert team.status == TeamStatus.COMPLETED + + @pytest.mark.asyncio + async def test_pipeline_status_transitions(self): + """执行过程中状态从 PLANNING → EXECUTING → SYNTHESIZING → COMPLETED""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + statuses_seen: list[str] = [] + + original_set_status = team.set_status + + def tracking_set_status(status: TeamStatus) -> None: + statuses_seen.append(status.value) + original_set_status(status) + + team.set_status = tracking_set_status # type: ignore[assignment] + + await orchestrator.execute("任务") + + # PLANNING comes before EXECUTING, SYNTHESIZING, COMPLETED + assert "planning" in statuses_seen + assert "executing" in statuses_seen + assert "synthesizing" in statuses_seen + assert "completed" in statuses_seen + assert statuses_seen.index("planning") < statuses_seen.index("executing") + assert statuses_seen.index("executing") < statuses_seen.index("synthesizing") + assert statuses_seen.index("synthesizing") < statuses_seen.index("completed") + + +# ── F4: Parallel Phases ────────────────────────────────── + + +class TestParallelPhases: + """F4: 无依赖阶段并行执行""" + + @pytest.mark.asyncio + async def test_parallel_phases_no_dependencies(self): + """2 个无依赖阶段并行执行""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + gateway = _make_mock_llm_gateway( + phases=[ + { + "name": "A", + "assigned_expert": "member1", + "task_description": "阶段A", + "depends_on": [], + }, + { + "name": "B", + "assigned_expert": "member2", + "task_description": "阶段B", + "depends_on": [], + }, + ] + ) + team._experts["lead"].agent._llm_gateway = gateway + + result = await orchestrator.execute("并行任务") + + assert result["status"] == "completed" + plan = result["plan"] + assert len(plan.phases) == 2 + for ph in plan.phases: + assert ph.status == PhaseStatus.COMPLETED + + @pytest.mark.asyncio + async def test_mixed_parallel_and_sequential(self): + """混合并行和串行:A、B 并行,C 依赖 A 和 B""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + gateway = _make_mock_llm_gateway( + phases=[ + { + "name": "A", + "assigned_expert": "lead", + "task_description": "阶段A", + "depends_on": [], + }, + { + "name": "B", + "assigned_expert": "member1", + "task_description": "阶段B", + "depends_on": [], + }, + { + "name": "C", + "assigned_expert": "member2", + "task_description": "阶段C", + "depends_on": ["A", "B"], + }, + ] + ) + team._experts["lead"].agent._llm_gateway = gateway + + result = await orchestrator.execute("混合任务") + + assert result["status"] == "completed" + plan = result["plan"] + assert len(plan.phases) == 3 + c_phase = next(p for p in plan.phases if p.name == "C") + assert len(c_phase.depends_on) == 2 + + +# ── F5: Phase Failure and Dependency Propagation ───────── + + +class TestPhaseFailure: + """F5: 阶段失败和依赖失败传播""" + + @pytest.mark.asyncio + async def test_phase_dependency_failure_propagation(self): + """阶段 B 依赖 A,A 失败时 B 标记为 FAILED""" + team = _make_team_with_experts() + # Make lead's agent fail + team._experts["lead"].agent = _make_mock_agent("lead", fail=True) + + orchestrator = TeamOrchestrator(team) + + gateway = _make_mock_llm_gateway( + phases=[ + { + "name": "A", + "assigned_expert": "lead", + "task_description": "阶段A", + "depends_on": [], + }, + { + "name": "B", + "assigned_expert": "member1", + "task_description": "阶段B", + "depends_on": ["A"], + }, + ] + ) + team._experts["lead"].agent._llm_gateway = gateway + + result = await orchestrator.execute("失败传播任务") + + # Should fallback since all phases failed + assert result["status"] == "fallback" + plan = result["plan"] + a_phase = next(p for p in plan.phases if p.name == "A") + b_phase = next(p for p in plan.phases if p.name == "B") + assert a_phase.status == PhaseStatus.FAILED + assert b_phase.status == PhaseStatus.FAILED + + @pytest.mark.asyncio + async def test_fallback_on_all_failure(self): + """所有阶段失败时 fallback 到单 Agent""" + team = _make_team_with_experts() + # All experts fail + for name in team._experts: + team._experts[name].agent = _make_mock_agent(name, fail=True) + + orchestrator = TeamOrchestrator(team) + + # No LLM so single-phase path; that phase also fails → fallback + result = await orchestrator.execute("全失败任务") + + assert result["status"] == "fallback" + assert "result" in result + assert result["result"] is not None + + +# ── F6: SharedWorkspace Data Passing ───────────────────── + + +class TestSharedWorkspacePassing: + """F6: 阶段间通过 SharedWorkspace 传递数据""" + + @pytest.mark.asyncio + async def test_phase_output_written_to_workspace(self, workspace): + """阶段 A 的输出写入 SharedWorkspace""" + team = _make_team_with_experts(workspace=workspace) + orchestrator = TeamOrchestrator(team) + + gateway = _make_mock_llm_gateway( + phases=[ + { + "name": "A", + "assigned_expert": "lead", + "task_description": "阶段A", + "depends_on": [], + }, + { + "name": "B", + "assigned_expert": "member1", + "task_description": "阶段B", + "depends_on": ["A"], + }, + ] + ) + team._experts["lead"].agent._llm_gateway = gateway + + result = await orchestrator.execute("数据传递任务") + + assert result["status"] == "completed" + plan = result["plan"] + + # Phase A output should be in workspace + a_phase = next(p for p in plan.phases if p.name == "A") + output_key = f"{plan.id}/phase/{a_phase.id}/output" + data = await workspace.read(output_key) + assert data is not None + assert "Result from lead" in data.get("value", "") + + @pytest.mark.asyncio + async def test_dependency_output_read_by_dependent(self, workspace): + """阶段 B 读取阶段 A 的输出""" + team = _make_team_with_experts(workspace=workspace) + orchestrator = TeamOrchestrator(team) + + gateway = _make_mock_llm_gateway( + phases=[ + { + "name": "A", + "assigned_expert": "lead", + "task_description": "阶段A", + "depends_on": [], + }, + { + "name": "B", + "assigned_expert": "member1", + "task_description": "阶段B", + "depends_on": ["A"], + }, + ] + ) + team._experts["lead"].agent._llm_gateway = gateway + + # Track what input_data member1's agent receives + received_inputs: list[dict] = [] + original_execute = team._experts["member1"].agent.execute + + async def tracking_execute(task_msg): + received_inputs.append(task_msg.input_data) + return await original_execute(task_msg) + + team._experts["member1"].agent.execute = tracking_execute + + await orchestrator.execute("依赖读取任务") + + # member1 should have received dependency_outputs in input_data + assert len(received_inputs) > 0 + b_input = received_inputs[0] + assert "dependency_outputs" in b_input + assert "A" in b_input["dependency_outputs"] + + +# ── F7: Context Isolation ──────────────────────────────── + + +class TestContextIsolation: + """F7: 上下文隔离 - 每个阶段创建独立 ConfigDrivenAgent""" + + @pytest.mark.asyncio + async def test_isolated_agent_created_per_phase(self): + """每个阶段通过 AgentPool 创建独立的 agent""" + pool = _make_mock_pool() + team = _make_team_with_experts(pool=pool) + orchestrator = TeamOrchestrator(team) + + gateway = _make_mock_llm_gateway( + phases=[ + { + "name": "A", + "assigned_expert": "lead", + "task_description": "阶段A", + "depends_on": [], + }, + { + "name": "B", + "assigned_expert": "member1", + "task_description": "阶段B", + "depends_on": [], + }, + ] + ) + team._experts["lead"].agent._llm_gateway = gateway + + await orchestrator.execute("隔离任务") + + # Pool.create_agent should have been called for each phase + assert pool.create_agent.call_count >= 2 + + @pytest.mark.asyncio + async def test_isolated_agent_cleaned_up_after_phase(self): + """阶段完成后清理临时 agent""" + pool = _make_mock_pool() + team = _make_team_with_experts(pool=pool) + orchestrator = TeamOrchestrator(team) + + gateway = _make_mock_llm_gateway( + phases=[ + { + "name": "A", + "assigned_expert": "lead", + "task_description": "阶段A", + "depends_on": [], + }, + ] + ) + team._experts["lead"].agent._llm_gateway = gateway + + await orchestrator.execute("清理任务") + + # All temp agents should be cleaned up + assert len(orchestrator._temp_agents) == 0 + assert pool.remove_agent.call_count >= 1 + + +# ── F8: Event Sequence ─────────────────────────────────── + + +class TestEventSequence: + """F8: 事件顺序正确""" + + @pytest.mark.asyncio + async def test_event_sequence_pipeline_execution(self): + """事件顺序:team_formed → plan_update → phase_started → phase_completed → team_synthesis""" + transport = InProcessHandoffTransport() + team = _make_team_with_experts(transport=transport) + orchestrator = TeamOrchestrator(team) + + gateway = _make_mock_llm_gateway( + phases=[ + { + "name": "A", + "assigned_expert": "lead", + "task_description": "阶段A", + "depends_on": [], + }, + ] + ) + team._experts["lead"].agent._llm_gateway = gateway + + events, listener_task = await _capture_events(transport, team.team_channel) + + await orchestrator.execute("事件序列任务") + + # Stop listener + transport.close() + try: + await asyncio.wait_for(listener_task, timeout=0.5) + except (asyncio.TimeoutError, Exception): + listener_task.cancel() + + event_types = [e.get("type") for e in events] + + # Verify required events are present in order + assert "team_formed" in event_types + assert "plan_update" in event_types + assert "phase_started" in event_types + assert "phase_completed" in event_types + assert "team_synthesis" in event_types + + # Verify order + assert event_types.index("team_formed") < event_types.index("plan_update") + assert event_types.index("plan_update") < event_types.index("phase_started") + assert event_types.index("phase_started") < event_types.index("phase_completed") + assert event_types.index("phase_completed") < event_types.index("team_synthesis") + + @pytest.mark.asyncio + async def test_phase_failed_event_emitted_on_failure(self): + """阶段失败时发出 phase_failed 事件""" + transport = InProcessHandoffTransport() + team = _make_team_with_experts(transport=transport) + # Make all agents fail to trigger phase_failed + for name in team._experts: + team._experts[name].agent = _make_mock_agent(name, fail=True) + orchestrator = TeamOrchestrator(team) + + events, listener_task = await _capture_events(transport, team.team_channel) + + await orchestrator.execute("失败事件任务") + + transport.close() + try: + await asyncio.wait_for(listener_task, timeout=0.5) + except (asyncio.TimeoutError, Exception): + listener_task.cancel() + + event_types = [e.get("type") for e in events] + # phase_started should be emitted before failure + assert "phase_started" in event_types + + +# ── F9: TeamStatus.PLANNING Transition ─────────────────── + + +class TestTeamStatusPlanning: + """F9: create_team 后状态为 PLANNING""" + + @pytest.mark.asyncio + async def test_create_team_sets_planning_status(self): + """create_team 后设置 PLANNING 状态""" + pool = _make_mock_pool() + team = ExpertTeam(pool=pool) + + lead_config = _make_expert_config("lead", is_lead=True) + member_config = _make_expert_config("member") + + await team.create_team(lead_config, [member_config]) + + assert team.status == TeamStatus.PLANNING + + @pytest.mark.asyncio + async def test_orchestrator_sets_planning_before_decomposition(self): + """orchestrator.execute 在分解任务前设置 PLANNING""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + statuses_during_execution: list[TeamStatus] = [] + original_set_status = team.set_status + + def tracking_set_status(status: TeamStatus) -> None: + statuses_during_execution.append(status) + original_set_status(status) + + team.set_status = tracking_set_status # type: ignore[assignment] + + await orchestrator.execute("任务") + + # First status set should be PLANNING + assert TeamStatus.PLANNING in statuses_during_execution + + +# ── F10: Circular Dependency Detection ─────────────────── + + +class TestCircularDependency: + """F10: 循环依赖检测""" + + @pytest.mark.asyncio + async def test_circular_dependency_triggers_fallback(self): + """A→B→A 循环依赖 → fallback 到单 Agent""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + # Phases with circular dependency: A depends on B, B depends on A + # We need to craft LLM response that creates circular deps via name resolution + # Since depends_on is resolved by name → id, and ids are auto-generated, + # we craft phases where A depends on B and B depends on A + gateway = _make_mock_llm_gateway( + phases=[ + { + "name": "A", + "assigned_expert": "lead", + "task_description": "阶段A", + "depends_on": ["B"], + }, + { + "name": "B", + "assigned_expert": "member1", + "task_description": "阶段B", + "depends_on": ["A"], + }, + ] + ) + team._experts["lead"].agent._llm_gateway = gateway + + result = await orchestrator.execute("循环依赖任务") + + # Should fallback due to ValueError from topological_sort + assert result["status"] == "fallback" + + def test_topological_sort_raises_on_cycle(self): + """topological_sort() 检测到循环依赖时抛出 ValueError""" + plan = TeamPlan(task="test", lead_expert="lead") + a = PlanPhase(name="A", assigned_expert="lead") + b = PlanPhase(name="B", assigned_expert="member1") + a.depends_on = [b.id] + b.depends_on = [a.id] + plan.phases = [a, b] + + with pytest.raises(ValueError, match="Circular dependency"): + plan.topological_sort() + + +# ── F11: Invalid Expert Reference Fallback ─────────────── + + +class TestInvalidExpertReference: + """F11: 阶段引用不存在的专家 → fallback 到 lead""" + + @pytest.mark.asyncio + async def test_invalid_expert_reference_falls_back_to_lead(self): + """阶段引用不存在的专家时 fallback 到 lead expert""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + gateway = _make_mock_llm_gateway( + phases=[ + { + "name": "A", + "assigned_expert": "nonexistent_expert", + "task_description": "阶段A", + "depends_on": [], + }, + ] + ) + team._experts["lead"].agent._llm_gateway = gateway + + result = await orchestrator.execute("无效专家任务") + + # Should complete (fallback to lead for the phase) + assert result["status"] == "completed" + plan = result["plan"] + a_phase = plan.phases[0] + # assigned_expert should have been reassigned to lead + assert a_phase.assigned_expert == "lead" + assert a_phase.status == PhaseStatus.COMPLETED + + +# ── F12: LLM Decomposition Failure ─────────────────────── + + +class TestLLMDecompositionFailure: + """F12: LLM 分解失败 → fallback 到单阶段""" + + @pytest.mark.asyncio + async def test_llm_decomposition_failure_falls_back_to_single_phase(self): + """LLM 返回无效 JSON → fallback 到单阶段""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + # Gateway that returns invalid JSON for decomposition + gateway = AsyncMock() + decomp_response = MagicMock() + decomp_response.content = "This is not JSON" + synth_response = MagicMock() + synth_response.content = "综合结果" + gateway.chat = AsyncMock(side_effect=[decomp_response, synth_response]) + team._experts["lead"].agent._llm_gateway = gateway + + result = await orchestrator.execute("无效JSON任务") + + # Should complete as single phase + assert result["status"] == "completed" + plan = result["plan"] + assert len(plan.phases) == 1 + + @pytest.mark.asyncio + async def test_llm_unavailable_falls_back_to_single_phase(self): + """LLM 不可用时 fallback 到单阶段""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + gateway = _make_mock_llm_gateway(decomp_fail=True) + team._experts["lead"].agent._llm_gateway = gateway + + result = await orchestrator.execute("LLM不可用任务") + + assert result["status"] == "completed" + plan = result["plan"] + assert len(plan.phases) == 1 + assert plan.phases[0].assigned_expert == "lead" + + +# ── F13: Decentralized Collaboration (HandoffTransport) ── class TestDecentralizedCollaboration: - """Covers F3: Experts collaborate directly without Lead mediation.""" + """F13: 专家间直接协作(HandoffTransport)""" + @pytest.mark.asyncio async def test_expert_direct_handoff(self): - """AE4: Expert A requests assistance from Expert B directly.""" + """AE4: Expert A 请求 Expert B 协助""" transport = InProcessHandoffTransport() channel = "expert:analyst:handoff" - # Start listening first (consumer must be registered before send) - messages = [] + messages: list[dict] = [] async def listener(): async for msg in transport.listen(channel): @@ -130,7 +965,6 @@ class TestDecentralizedCollaboration: task = asyncio.create_task(listener()) await asyncio.sleep(0.05) - # Expert A sends assist request await transport.send( channel, { @@ -149,14 +983,14 @@ class TestDecentralizedCollaboration: transport.close() + @pytest.mark.asyncio async def test_team_channel_broadcast(self): - """All experts receive team channel messages.""" + """团队频道消息广播到所有专家""" transport = InProcessHandoffTransport() channel = "team:test-team" - # Two consumers listening - consumer1_msgs = [] - consumer2_msgs = [] + consumer1_msgs: list[dict] = [] + consumer2_msgs: list[dict] = [] async def consumer1(): async for msg in transport.listen(channel): @@ -170,13 +1004,10 @@ class TestDecentralizedCollaboration: if len(consumer2_msgs) >= 1: break - # Start consumers t1 = asyncio.create_task(consumer1()) t2 = asyncio.create_task(consumer2()) - await asyncio.sleep(0.05) - # Send message await transport.send(channel, {"type": "chat", "content": "hello"}) await asyncio.wait_for(asyncio.gather(t1, t2), timeout=2.0) @@ -187,18 +1018,19 @@ class TestDecentralizedCollaboration: transport.close() -# --- F4: User Intervention --- +# ── F14: User Intervention ─────────────────────────────── class TestUserIntervention: - """Covers F4: User intervenes during collaboration.""" + """F14: 用户干预消息广播""" + @pytest.mark.asyncio async def test_user_intervention_broadcast(self): - """AE3: User intervention message reaches all experts.""" + """AE3: 用户干预消息到达所有专家""" transport = InProcessHandoffTransport() channel = "team:intervention-test" - received = [] + received: list[dict] = [] async def listener(): async for msg in transport.listen(channel): @@ -222,55 +1054,72 @@ class TestUserIntervention: transport.close() -# --- F5: Competitive Parallel --- -# (test_vote_strategy_with_tie_break and test_fusion_strategy skipped — U9 will rewrite) - - -# --- F6: Team Dissolution --- +# ── F15: Team Dissolution ──────────────────────────────── class TestTeamDissolution: - """Covers F6: Team dissolution and output preservation.""" + """F15: 团队解散和输出保留""" + @pytest.mark.asyncio async def test_dissolution_preserves_outputs(self, workspace): - """R36: Temporary Expert outputs preserved in SharedWorkspace after dissolution.""" - # Write some output to workspace - await workspace.write( - "team:test:analyst:result", {"report": "analysis result"}, "analyst" - ) + """R36: 临时 Expert 输出在 SharedWorkspace 中保留""" + await workspace.write("team:test:analyst:result", {"report": "analysis result"}, "analyst") - # Verify output exists data = await workspace.read("team:test:analyst:result") assert data is not None assert data["value"]["report"] == "analysis result" + @pytest.mark.asyncio async def test_dissolution_sets_status(self): - """Team status becomes DISSOLVED after dissolution.""" - assert TeamStatus.DISSOLVED == "dissolved" + """团队解散后状态为 DISSOLVED""" + pool = _make_mock_pool() + team = ExpertTeam(pool=pool) + + lead_config = _make_expert_config("lead", is_lead=True) + await team.create_team(lead_config) + + await team.dissolve() + + assert team.status == TeamStatus.DISSOLVED + + @pytest.mark.asyncio + async def test_dissolution_clears_experts(self): + """团队解散后 experts 列表为空""" + pool = _make_mock_pool() + team = ExpertTeam(pool=pool) + + lead_config = _make_expert_config("lead", is_lead=True) + member_config = _make_expert_config("member") + await team.create_team(lead_config, [member_config]) + + assert len(team.experts) == 2 + + await team.dissolve() + + assert len(team.experts) == 0 + assert team.lead_expert is None -# --- Retry and Fallback --- -# (test_plan_failure_triggers_retry and test_fallback_after_retry_failure skipped — U9 will rewrite) - - -# --- Dynamic Expert Addition/Removal --- +# ── F16: Dynamic Expert Management ─────────────────────── class TestDynamicExpertManagement: - """AE5: Dynamic addition and removal of experts.""" + """F16: 动态添加和移除专家""" + @pytest.mark.asyncio async def test_add_expert_by_template(self, registry): - """Add expert by template name.""" + """通过模板名添加专家""" router = ExpertTeamRouter(registry) - result = router.resolve("@team:analyst 分析报告") + result = router.resolve("@team:tech_lead 分析报告") configs = router.resolve_expert_configs(result.specified_experts) assert len(configs) == 1 - assert configs[0].name == "analyst" - assert configs[0].bound_skills == ["data_analysis"] + assert configs[0].name == "tech_lead" + assert configs[0].is_lead is True + @pytest.mark.asyncio async def test_add_expert_dynamic(self, registry): - """Add expert with non-existent template creates dynamic config.""" + """未知专家名动态创建配置""" router = ExpertTeamRouter(registry) configs = router.resolve_expert_configs(["legal_advisor"])