"""TeamOrchestrator 辩论阶段执行器单元测试 (U2) 测试覆盖: - Happy path: 2 轮辩论,2 个专家参与,Lead 裁决产出结论 - 边界: max_rounds=1 时只辩论一轮就裁决 - 边界: participants 为空时,Lead 直接给出结论(无辩论) - 用户停止: 辩论中收到 /stop,提前结束并裁决 - 逃生舱: debate_config.skip=true 时直接跳过 - 错误路径: LLM 不可用时,Lead 用模板文本裁决,不抛异常 - 集成: 辩论结论写入 SharedWorkspace - 事件广播: debate_started / expert_argument / debate_round_summary / debate_resolved - 干预通道: _consume_team_interventions getattr 回退(U4 兼容) """ from __future__ import annotations import json from unittest.mock import AsyncMock, MagicMock import pytest from agentkit.core.handoff_transport import InProcessHandoffTransport from agentkit.experts.config import ExpertConfig from agentkit.experts.expert import Expert from agentkit.experts.orchestrator import TeamOrchestrator from agentkit.experts.plan import PhaseStatus, PhaseType, PlanPhase, TeamPlan from agentkit.experts.team import ExpertTeam # ── 辅助函数 ────────────────────────────────────────────── def _make_expert_config( name: str = "test_expert", is_lead: bool = False, llm: dict | None = None, ) -> ExpertConfig: """创建测试用 ExpertConfig(含辩论 prompt 所需的角色字段)""" return ExpertConfig( name=name, agent_type="expert", persona=f"{name}的角色描述", thinking_style="逻辑推理", speaking_style="简洁直接", decision_framework="数据驱动决策", bound_skills=["skill_a"], is_lead=is_lead, task_mode="llm_generate", prompt={"identity": "测试"}, llm=llm, ) def _make_mock_expert( name: str = "test_expert", is_lead: bool = False, is_active: bool = True, llm: dict | None = None, gateway: MagicMock | None = None, ) -> MagicMock: """创建 mock Expert Args: gateway: 如果提供,设置到 expert.agent._llm_gateway 上 """ config = _make_expert_config(name=name, is_lead=is_lead, llm=llm) expert = MagicMock(spec=Expert) expert.config = config expert.is_active = is_active expert.team_id = None expert.get_capabilities_summary.return_value = { "name": name, "persona": config.persona, "thinking_style": config.thinking_style, "bound_skills": config.bound_skills, "is_lead": is_lead, } mock_agent = MagicMock() mock_agent._llm_gateway = gateway expert.agent = mock_agent return expert def _make_team_with_experts( expert_names: list[str] | None = None, lead_name: str = "lead", gateway: MagicMock | None = None, ) -> ExpertTeam: """创建包含 mock experts 的 ExpertTeam Args: gateway: 如果提供,设置到所有 expert 的 agent._llm_gateway 上 """ team = ExpertTeam() transport = AsyncMock(spec=InProcessHandoffTransport) team._handoff_transport = transport if expert_names is None: expert_names = [lead_name, "member1", "member2"] for name in expert_names: is_lead = name == lead_name expert = _make_mock_expert(name=name, is_lead=is_lead, gateway=gateway) team._experts[name] = expert if is_lead: team._lead_expert_name = name return team def _make_smart_llm_gateway( opening: str = "开场:我们需要讨论这个分歧点。", argument_template: str = "[{expert}] 我认为应该采用这个方案。", summary: str = "本轮小结:双方各有道理。", verdict: dict | None = None, ) -> AsyncMock: """创建智能 mock LLM gateway,根据 prompt 内容返回不同响应 通过 prompt 关键词区分:开场 / 论点 / 小结 / 裁决 避免依赖并行调用顺序。 """ if verdict is None: verdict = { "decision": "adopt", "rationale": "甲方论据更充分", "conclusion": "采纳甲方方案,按此执行。", } verdict_json = json.dumps(verdict, ensure_ascii=False) async def chat_side_effect(messages, model=None, **kwargs): prompt = messages[0]["content"] if messages else "" response = MagicMock() # Order matters: check most specific first — verdict/summary prompts # contain debate history which includes opening/argument text. if "最终裁决" in prompt: response.content = f"```json\n{verdict_json}\n```" elif "小结本轮辩论" in prompt: response.content = summary elif "发表你的论点" in prompt: # Extract expert name from prompt: "你是 {name},正在参加" import re name_match = re.search(r"你是 (\w+),正在参加", prompt) expert_name = name_match.group(1) if name_match else "expert" response.content = argument_template.format(expert=expert_name) elif "主持人开场" in prompt: response.content = opening else: response.content = "默认响应" return response gateway = AsyncMock() gateway.chat = AsyncMock(side_effect=chat_side_effect) return gateway def _make_debate_phase( phase_id: str = "debate_1", name: str = "架构辩论", topic: str = "前端框架选型:React vs Vue", participants: list[str] | None = None, max_rounds: int = 2, skip: bool = False, depends_on: list[str] | None = None, assigned_expert: str = "lead", ) -> PlanPhase: """创建测试用 DEBATE 阶段""" if participants is None: participants = ["member1", "member2"] debate_config: dict = { "topic": topic, "participants": participants, "max_rounds": max_rounds, } if skip: debate_config["skip"] = True return PlanPhase( id=phase_id, name=name, assigned_expert=assigned_expert, task_description=topic, depends_on=depends_on or [], phase_type=PhaseType.DEBATE, debate_config=debate_config, ) def _make_plan_with_debate_phase(phase: PlanPhase) -> TeamPlan: """创建包含单个 DEBATE 阶段的 TeamPlan""" return TeamPlan( id="test_plan", task="测试辩论任务", phases=[phase], lead_expert="lead", ) # ── Happy Path 测试 ─────────────────────────────────────── class TestDebatePhaseHappyPath: """辩论阶段 happy path 测试""" @pytest.mark.asyncio async def test_two_rounds_two_experts_completes(self): """2 轮辩论,2 个专家参与,phase 状态变为 COMPLETED""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(max_rounds=2, participants=["member1", "member2"]) plan = _make_plan_with_debate_phase(phase) result = await orchestrator._execute_debate_phase(phase, plan) assert phase.status == PhaseStatus.COMPLETED assert result["content"] == "采纳甲方方案,按此执行。" assert result["decision"] == "adopt" assert "verdict" in result assert result["verdict"]["decision"] == "adopt" @pytest.mark.asyncio async def test_debate_produces_verdict_with_required_fields(self): """辩论裁决包含 decision / rationale / conclusion 三个字段""" gateway = _make_smart_llm_gateway( verdict={ "decision": "compromise", "rationale": "双方各有优势", "conclusion": "采用折中方案。", } ) team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(max_rounds=1, participants=["member1"]) plan = _make_plan_with_debate_phase(phase) result = await orchestrator._execute_debate_phase(phase, plan) assert result["verdict"]["decision"] == "compromise" assert result["verdict"]["rationale"] == "双方各有优势" assert result["verdict"]["conclusion"] == "采用折中方案。" @pytest.mark.asyncio async def test_debate_emits_debate_started_event(self): """辩论开始时广播 debate_started 事件""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(max_rounds=1, participants=["member1"]) plan = _make_plan_with_debate_phase(phase) await orchestrator._execute_debate_phase(phase, plan) calls = team._handoff_transport.send.call_args_list event_types = [c[0][1]["type"] for c in calls] assert "debate_started" in event_types @pytest.mark.asyncio async def test_debate_emits_expert_argument_events(self): """每个专家发言时广播 expert_argument 事件""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(max_rounds=1, participants=["member1", "member2"]) plan = _make_plan_with_debate_phase(phase) await orchestrator._execute_debate_phase(phase, plan) calls = team._handoff_transport.send.call_args_list argument_events = [c[0][1] for c in calls if c[0][1].get("type") == "expert_argument"] # 2 experts × 1 round = 2 argument events assert len(argument_events) == 2 expert_ids = {e["expert_id"] for e in argument_events} assert expert_ids == {"member1", "member2"} @pytest.mark.asyncio async def test_debate_emits_round_summary_events(self): """每轮辩论结束时广播 debate_round_summary 事件""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(max_rounds=2, participants=["member1", "member2"]) plan = _make_plan_with_debate_phase(phase) await orchestrator._execute_debate_phase(phase, plan) calls = team._handoff_transport.send.call_args_list summary_events = [ c[0][1] for c in calls if c[0][1].get("type") == "debate_round_summary" ] assert len(summary_events) == 2 # 2 rounds # Round 1 summary should have continue=True, round 2 continue=False assert summary_events[0]["round"] == 1 assert summary_events[0]["continue"] is True assert summary_events[1]["round"] == 2 assert summary_events[1]["continue"] is False @pytest.mark.asyncio async def test_debate_emits_debate_resolved_event(self): """辩论裁决时广播 debate_resolved 事件""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(max_rounds=1, participants=["member1"]) plan = _make_plan_with_debate_phase(phase) await orchestrator._execute_debate_phase(phase, plan) calls = team._handoff_transport.send.call_args_list resolved_events = [ c[0][1] for c in calls if c[0][1].get("type") == "debate_resolved" ] assert len(resolved_events) == 1 assert resolved_events[0]["decision"] == "adopt" assert "conclusion" in resolved_events[0] @pytest.mark.asyncio async def test_debate_emits_phase_completed_event(self): """辩论阶段完成时广播 phase_completed 事件(与 EXECUTION 阶段一致)""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(max_rounds=1, participants=["member1"]) plan = _make_plan_with_debate_phase(phase) await orchestrator._execute_debate_phase(phase, plan) calls = team._handoff_transport.send.call_args_list completed_events = [ c[0][1] for c in calls if c[0][1].get("type") == "phase_completed" ] assert len(completed_events) == 1 assert completed_events[0]["phase_id"] == phase.id # ── 边界测试 ────────────────────────────────────────────── class TestDebatePhaseMaxRounds: """max_rounds 边界测试""" @pytest.mark.asyncio async def test_max_rounds_one_single_round(self): """max_rounds=1 时只辩论一轮就裁决""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(max_rounds=1, participants=["member1", "member2"]) plan = _make_plan_with_debate_phase(phase) await orchestrator._execute_debate_phase(phase, plan) # Count expert_argument events: 2 experts × 1 round = 2 calls = team._handoff_transport.send.call_args_list argument_events = [ c[0][1] for c in calls if c[0][1].get("type") == "expert_argument" ] assert len(argument_events) == 2 # Count summary events: 1 round = 1 summary summary_events = [ c[0][1] for c in calls if c[0][1].get("type") == "debate_round_summary" ] assert len(summary_events) == 1 @pytest.mark.asyncio async def test_max_rounds_capped_at_max_debate_rounds(self): """max_rounds 超过 MAX_DEBATE_ROUNDS 时被截断""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) # Request 10 rounds, should be capped to MAX_DEBATE_ROUNDS (4) phase = _make_debate_phase(max_rounds=10, participants=["member1"]) plan = _make_plan_with_debate_phase(phase) await orchestrator._execute_debate_phase(phase, plan) calls = team._handoff_transport.send.call_args_list summary_events = [ c[0][1] for c in calls if c[0][1].get("type") == "debate_round_summary" ] assert len(summary_events) == TeamOrchestrator.MAX_DEBATE_ROUNDS class TestDebatePhaseEmptyParticipants: """participants 为空时的边界测试""" @pytest.mark.asyncio async def test_empty_participants_lead_directly_adjudicates(self): """participants 为空时,Lead 直接给出结论(无辩论轮次)""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(participants=[], max_rounds=3) plan = _make_plan_with_debate_phase(phase) result = await orchestrator._execute_debate_phase(phase, plan) assert phase.status == PhaseStatus.COMPLETED # Should still have a conclusion from Lead verdict assert "content" in result # No expert_argument events should be emitted calls = team._handoff_transport.send.call_args_list argument_events = [ c[0][1] for c in calls if c[0][1].get("type") == "expert_argument" ] assert len(argument_events) == 0 # No round summary events summary_events = [ c[0][1] for c in calls if c[0][1].get("type") == "debate_round_summary" ] assert len(summary_events) == 0 @pytest.mark.asyncio async def test_empty_participants_still_emits_debate_started(self): """participants 为空时仍广播 debate_started(含空 participants 列表)""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(participants=[], max_rounds=2) plan = _make_plan_with_debate_phase(phase) await orchestrator._execute_debate_phase(phase, plan) calls = team._handoff_transport.send.call_args_list started_events = [ c[0][1] for c in calls if c[0][1].get("type") == "debate_started" ] assert len(started_events) == 1 assert started_events[0]["participants"] == [] # ── 用户停止测试 ────────────────────────────────────────── class TestDebatePhaseUserStop: """用户 /stop 干预测试""" @pytest.mark.asyncio async def test_stop_command_ends_debate_early(self): """辩论中收到 /stop,提前结束并裁决""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) # Mock intervention queue: return /stop on first check (round 1) team.consume_user_interventions = MagicMock(return_value=["/stop"]) phase = _make_debate_phase(max_rounds=3, participants=["member1", "member2"]) plan = _make_plan_with_debate_phase(phase) result = await orchestrator._execute_debate_phase(phase, plan) assert phase.status == PhaseStatus.COMPLETED # Should still produce a verdict assert "content" in result # No expert_argument events — stopped before round 1 arguments calls = team._handoff_transport.send.call_args_list argument_events = [ c[0][1] for c in calls if c[0][1].get("type") == "expert_argument" ] assert len(argument_events) == 0 @pytest.mark.asyncio async def test_chinese_stop_command_ends_debate(self): """中文 '停止' 命令也能结束辩论""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) team.consume_user_interventions = MagicMock(return_value=["停止"]) phase = _make_debate_phase(max_rounds=3, participants=["member1"]) plan = _make_plan_with_debate_phase(phase) await orchestrator._execute_debate_phase(phase, plan) assert phase.status == PhaseStatus.COMPLETED calls = team._handoff_transport.send.call_args_list argument_events = [ c[0][1] for c in calls if c[0][1].get("type") == "expert_argument" ] assert len(argument_events) == 0 @pytest.mark.asyncio async def test_non_stop_intervention_does_not_end_debate(self): """非停止命令的干预不会结束辩论""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) # Non-stop intervention should not end the debate team.consume_user_interventions = MagicMock(return_value=["继续讨论"]) phase = _make_debate_phase(max_rounds=1, participants=["member1", "member2"]) plan = _make_plan_with_debate_phase(phase) await orchestrator._execute_debate_phase(phase, plan) # Debate should proceed normally — arguments emitted calls = team._handoff_transport.send.call_args_list argument_events = [ c[0][1] for c in calls if c[0][1].get("type") == "expert_argument" ] assert len(argument_events) == 2 # 2 experts × 1 round # ── 逃生舱测试 ──────────────────────────────────────────── class TestDebatePhaseSkipEscapeHatch: """skip=True 逃生舱测试""" @pytest.mark.asyncio async def test_skip_true_short_circuits_debate(self): """debate_config.skip=true 时直接跳过,phase 状态 COMPLETED""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(skip=True, participants=["member1", "member2"]) plan = _make_plan_with_debate_phase(phase) result = await orchestrator._execute_debate_phase(phase, plan) assert phase.status == PhaseStatus.COMPLETED assert result["content"] == "无需辩论" assert result["skipped"] is True @pytest.mark.asyncio async def test_skip_true_does_not_call_llm(self): """skip=true 时不调用 LLM""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(skip=True) plan = _make_plan_with_debate_phase(phase) await orchestrator._execute_debate_phase(phase, plan) # LLM should not be called at all gateway.chat.assert_not_awaited() @pytest.mark.asyncio async def test_skip_true_emits_debate_resolved_with_skipped_decision(self): """skip=true 时广播 debate_resolved 事件,decision='skipped'""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(skip=True) plan = _make_plan_with_debate_phase(phase) await orchestrator._execute_debate_phase(phase, plan) calls = team._handoff_transport.send.call_args_list resolved_events = [ c[0][1] for c in calls if c[0][1].get("type") == "debate_resolved" ] assert len(resolved_events) == 1 assert resolved_events[0]["decision"] == "skipped" assert resolved_events[0]["conclusion"] == "无需辩论" @pytest.mark.asyncio async def test_skip_true_no_debate_started_event(self): """skip=true 时不广播 debate_started 事件""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(skip=True) plan = _make_plan_with_debate_phase(phase) await orchestrator._execute_debate_phase(phase, plan) calls = team._handoff_transport.send.call_args_list event_types = [c[0][1]["type"] for c in calls] assert "debate_started" not in event_types assert "expert_argument" not in event_types # ── LLM 不可用错误路径测试 ──────────────────────────────── class TestDebatePhaseLLMUnavailable: """LLM 不可用时的错误路径测试""" @pytest.mark.asyncio async def test_no_llm_gateway_uses_template_verdict(self): """LLM 不可用时,Lead 用模板文本裁决,不抛异常""" # No gateway provided — all experts have _llm_gateway=None team = _make_team_with_experts(gateway=None) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(max_rounds=2, participants=["member1", "member2"]) plan = _make_plan_with_debate_phase(phase) result = await orchestrator._execute_debate_phase(phase, plan) assert phase.status == PhaseStatus.COMPLETED # Should have a template conclusion (not raise) assert "content" in result assert result["decision"] == "inconclusive" @pytest.mark.asyncio async def test_no_llm_gateway_opening_uses_template(self): """LLM 不可用时,开场使用模板文本""" team = _make_team_with_experts(gateway=None) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(max_rounds=1, participants=["member1"]) plan = _make_plan_with_debate_phase(phase) await orchestrator._execute_debate_phase(phase, plan) calls = team._handoff_transport.send.call_args_list started_events = [ c[0][1] for c in calls if c[0][1].get("type") == "debate_started" ] assert len(started_events) == 1 # Opening should contain the topic (template text) assert "前端框架选型" in started_events[0]["opening"] @pytest.mark.asyncio async def test_llm_gateway_exception_does_not_crash(self): """LLM gateway 抛异常时不崩溃,用模板裁决""" gateway = AsyncMock() gateway.chat = AsyncMock(side_effect=RuntimeError("LLM service down")) team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(max_rounds=1, participants=["member1"]) plan = _make_plan_with_debate_phase(phase) result = await orchestrator._execute_debate_phase(phase, plan) assert phase.status == PhaseStatus.COMPLETED assert result["decision"] == "inconclusive" @pytest.mark.asyncio async def test_verdict_json_parse_failure_returns_inconclusive(self): """裁决 JSON 解析失败时返回 inconclusive""" gateway = AsyncMock() # Return non-JSON for all calls response = MagicMock() response.content = "这不是JSON格式" gateway.chat = AsyncMock(return_value=response) team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(max_rounds=1, participants=["member1"]) plan = _make_plan_with_debate_phase(phase) result = await orchestrator._execute_debate_phase(phase, plan) assert phase.status == PhaseStatus.COMPLETED assert result["decision"] == "inconclusive" # Conclusion should fall back to raw content assert "content" in result # ── SharedWorkspace 集成测试 ────────────────────────────── class TestDebatePhaseSharedWorkspace: """辩论结论写入 SharedWorkspace 测试""" @pytest.mark.asyncio async def test_conclusion_written_to_workspace(self): """辩论结论写入 SharedWorkspace""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(max_rounds=1, participants=["member1"]) plan = _make_plan_with_debate_phase(phase) await orchestrator._execute_debate_phase(phase, plan) # Verify workspace has the debate output workspace = team.workspace output_key = f"{plan.id}/phase/{phase.id}/output" data = await workspace.read(output_key) assert data is not None assert data["value"] == "采纳甲方方案,按此执行。" assert data["agent_id"] == "lead" @pytest.mark.asyncio async def test_phase_result_stored_on_phase_object(self): """辩论结果存储在 phase.result 上""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(max_rounds=1, participants=["member1"]) plan = _make_plan_with_debate_phase(phase) await orchestrator._execute_debate_phase(phase, plan) assert phase.result is not None assert phase.result["content"] == "采纳甲方方案,按此执行。" assert phase.result["decision"] == "adopt" assert "verdict" in phase.result # ── 干预通道兼容性测试 ──────────────────────────────────── class TestInterventionChannelCompatibility: """干预通道兼容性测试(U4 已实现干预队列)""" @pytest.mark.asyncio async def test_empty_interventions_returns_empty(self): """干预队列为空时返回空列表,辩论正常执行""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) # U4: ExpertTeam now has consume_user_interventions; empty queue returns [] assert hasattr(team, "consume_user_interventions") assert team.consume_user_interventions() == [] phase = _make_debate_phase(max_rounds=1, participants=["member1"]) plan = _make_plan_with_debate_phase(phase) # Should not raise — empty interventions, debate proceeds normally await orchestrator._execute_debate_phase(phase, plan) assert phase.status == PhaseStatus.COMPLETED @pytest.mark.asyncio async def test_intervention_method_exception_returns_empty(self): """consume_user_interventions 抛异常时返回空列表""" gateway = _make_smart_llm_gateway() team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) # Set a broken intervention method team.consume_user_interventions = MagicMock(side_effect=RuntimeError("broken")) phase = _make_debate_phase(max_rounds=1, participants=["member1"]) plan = _make_plan_with_debate_phase(phase) # Should not raise — exception caught, returns empty list await orchestrator._execute_debate_phase(phase, plan) assert phase.status == PhaseStatus.COMPLETED # ── Phase 分发测试 ──────────────────────────────────────── class TestPhaseDispatch: """_execute_phase 分发器测试""" @pytest.mark.asyncio async def test_execution_phase_dispatches_to_execution_method(self): """EXECUTION 类型阶段分发到 _execute_execution_phase""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) # Mock both execution methods to track dispatch orchestrator._execute_execution_phase = AsyncMock( return_value={"content": "execution result"} ) orchestrator._execute_debate_phase = AsyncMock( return_value={"content": "debate result"} ) phase = PlanPhase(name="执行阶段", assigned_expert="lead", task_description="任务") plan = _make_plan_with_debate_phase(phase) await orchestrator._execute_phase(phase, plan) orchestrator._execute_execution_phase.assert_awaited_once_with(phase, plan) orchestrator._execute_debate_phase.assert_not_awaited() @pytest.mark.asyncio async def test_debate_phase_dispatches_to_debate_method(self): """DEBATE 类型阶段分发到 _execute_debate_phase""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) orchestrator._execute_execution_phase = AsyncMock( return_value={"content": "execution result"} ) orchestrator._execute_debate_phase = AsyncMock( return_value={"content": "debate result"} ) phase = _make_debate_phase() plan = _make_plan_with_debate_phase(phase) await orchestrator._execute_phase(phase, plan) orchestrator._execute_debate_phase.assert_awaited_once_with(phase, plan) orchestrator._execute_execution_phase.assert_not_awaited() # ── 辅助方法单元测试 ────────────────────────────────────── class TestHelperMethods: """辅助方法单元测试""" def test_has_stop_command_detects_stop_commands(self): """_has_stop_command 检测停止命令""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) assert orchestrator._has_stop_command(["/stop"]) is True assert orchestrator._has_stop_command(["停止"]) is True assert orchestrator._has_stop_command(["stop"]) is True assert orchestrator._has_stop_command(["结束"]) is True def test_has_stop_command_ignores_non_stop(self): """_has_stop_command 忽略非停止命令""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) assert orchestrator._has_stop_command(["继续"]) is False assert orchestrator._has_stop_command(["/continue"]) is False assert orchestrator._has_stop_command([]) is False def test_has_stop_command_case_insensitive(self): """_has_stop_command 大小写不敏感""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) assert orchestrator._has_stop_command(["STOP"]) is True assert orchestrator._has_stop_command([" /stop "]) is True def test_format_debate_history_empty(self): """_format_debate_history 空历史返回空字符串""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) assert orchestrator._format_debate_history([]) == "" def test_format_debate_history_with_entries(self): """_format_debate_history 格式化历史条目""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) history = [ {"expert": "lead", "content": "开场白", "round": 0, "role": "moderator"}, {"expert": "member1", "content": "我的论点", "round": 1, "role": "expert"}, ] result = orchestrator._format_debate_history(history) assert "开场白" in result assert "我的论点" in result assert "主持人" in result assert "专家" in result assert "[开场]" in result assert "[第1轮]" in result def test_build_dependency_context_no_deps(self): """_build_dependency_context 无依赖时返回空字符串""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) phase = _make_debate_phase(depends_on=[]) plan = _make_plan_with_debate_phase(phase) assert orchestrator._build_dependency_context(phase, plan) == "" def test_build_dependency_context_with_completed_dep(self): """_build_dependency_context 包含已完成依赖的输出""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) # Create a dependency phase that's completed dep_phase = PlanPhase( id="dep_1", name="前置阶段", assigned_expert="lead", task_description="前置任务", depends_on=[], ) dep_phase.status = PhaseStatus.COMPLETED dep_phase.result = {"content": "前置阶段输出内容"} debate_phase = _make_debate_phase(depends_on=["dep_1"]) plan = TeamPlan( id="test_plan", task="测试", phases=[dep_phase, debate_phase], lead_expert="lead", ) context = orchestrator._build_dependency_context(debate_phase, plan) assert "前置阶段" in context assert "前置阶段输出内容" in context def test_build_dependency_context_ignores_incomplete_dep(self): """_build_dependency_context 忽略未完成的依赖""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) # Dependency phase is still PENDING dep_phase = PlanPhase( id="dep_1", name="前置阶段", assigned_expert="lead", task_description="前置任务", ) debate_phase = _make_debate_phase(depends_on=["dep_1"]) plan = TeamPlan( id="test_plan", task="测试", phases=[dep_phase, debate_phase], lead_expert="lead", ) context = orchestrator._build_dependency_context(debate_phase, plan) assert context == ""