diff --git a/src/agentkit/experts/orchestrator.py b/src/agentkit/experts/orchestrator.py index 2bb80c4..a39cd4a 100644 --- a/src/agentkit/experts/orchestrator.py +++ b/src/agentkit/experts/orchestrator.py @@ -55,6 +55,7 @@ class TeamOrchestrator: MAX_PHASES = 10 # Maximum phases Lead Expert can decompose MAX_RETRIES = 1 # Retry once on phase failure before marking failed MAX_DEBATE_ROUNDS = 4 # Hard cap on debate rounds per phase + MAX_DEBATES = 3 # Hard cap on auto-inserted debate phases per execution STOP_COMMANDS = frozenset({"/stop", "停止", "stop", "结束"}) def __init__(self, team: ExpertTeam) -> None: @@ -62,6 +63,8 @@ class TeamOrchestrator: # Track temporary agent names created for context isolation (KTD3) # Maps phase_id -> temp_agent_name for cleanup self._temp_agents: dict[str, str] = {} + # Count of auto-inserted debate phases (bounded by MAX_DEBATES) + self._debate_count = 0 async def execute(self, task: str) -> dict[str, Any]: """Execute a task in pipeline mode. @@ -135,6 +138,9 @@ class TeamOrchestrator: plan.phases = phases[: self.MAX_PHASES] + # U3: Optionally add plan review debate before execution + await self._maybe_add_plan_review_debate(lead, plan, task) + # 3. Emit plan_update with phase list await self._broadcast_event( "plan_update", @@ -149,13 +155,22 @@ class TeamOrchestrator: phase_results: dict[str, dict[str, Any]] = {} try: - # Topological sort phases into execution layers - layers = plan.topological_sort() + # Execute layers sequentially, phases within layer in parallel. + # U3: while-loop re-computes topological_sort each iteration so + # dynamically inserted DEBATE phases (from divergence detection) + # are picked up correctly. + while True: + layers = plan.topological_sort() + # Find the next layer that still has PENDING phases + current_layer: list[PlanPhase] | None = None + for layer in layers: + if any(ph.status == PhaseStatus.PENDING for ph in layer): + current_layer = layer + break + if current_layer is None: + break # No more pending phases — done - # Execute layers sequentially, phases within layer in parallel - for layer in layers: - # Filter out already-failed phases (from dependency failures) - ready = [ph for ph in layer if ph.status == PhaseStatus.PENDING] + ready = [ph for ph in current_layer if ph.status == PhaseStatus.PENDING] if not ready: continue @@ -186,6 +201,17 @@ class TeamOrchestrator: else: phase_results[ph.id] = result + # U3: Divergence detection — check completed phases for conflicts + # and dynamically insert DEBATE phases if needed + if self._debate_count < self.MAX_DEBATES: + completed_now = [ + ph for ph in ready if ph.status == PhaseStatus.COMPLETED + ] + if completed_now: + await self._check_divergence_and_insert_debates( + lead, plan, completed_now + ) + # 5. Check if all phases failed completed = plan.completed_phases if not completed: @@ -946,6 +972,223 @@ class TeamOrchestrator: return True return False + # ── U3: Divergence detection + dynamic debate insertion ──────────── + + async def _maybe_add_plan_review_debate( + self, lead: Expert, plan: TeamPlan, task: str + ) -> None: + """Optionally add a plan review debate phase before execution. + + Skips for simple tasks (<= 2 phases) or when LLM judges it unnecessary. + When added, all existing phases depend on the debate phase so it runs first. + """ + if len(plan.phases) <= 2: + return # Simple task, skip plan review + + if self._debate_count >= self.MAX_DEBATES: + return + + gateway = self._get_llm_gateway(lead) + if not gateway: + return + + member_names = [ + e.config.name + for e in self._team.active_experts + if e.config.name != lead.config.name + ] + if not member_names: + return + + prompt = ( + f"你是团队 Lead {lead.config.name},需要判断以下任务是否需要方案评审辩论。\n\n" + f"任务:{task}\n" + f"分解的阶段:{', '.join(ph.name for ph in plan.phases)}\n" + f"团队成员:{', '.join(member_names)}\n\n" + "以下情况需要方案评审:\n" + "1) 任务复杂,涉及多个技术方向\n" + "2) 方案选择影响重大,值得先讨论再执行\n" + "3) 团队成员可能有不同观点\n" + "简单任务不需要评审。\n\n" + "只回答 true 或 false。" + ) + + try: + response = await gateway.chat( + messages=[{"role": "user", "content": prompt}], + model=self._get_model(lead), + ) + if not response.content.strip().lower().startswith("true"): + return + except Exception as e: + logger.warning(f"Plan review judgment failed: {e}") + return + + # Insert plan review DEBATE phase at the head + debate_phase = PlanPhase( + name="方案评审", + assigned_expert=lead.config.name, + task_description=f"方案评审:{task}", + depends_on=[], + phase_type=PhaseType.DEBATE, + debate_config={ + "topic": f"方案评审:{task}", + "participants": member_names, + "max_rounds": 2, + }, + ) + + # All existing phases now depend on the debate phase + for ph in plan.phases: + ph.depends_on.append(debate_phase.id) + + plan.phases.insert(0, debate_phase) + self._debate_count += 1 + logger.info(f"Added plan review debate phase {debate_phase.id}") + + async def _detect_divergence( + self, lead: Expert, completed_phase: PlanPhase, plan: TeamPlan + ) -> bool: + """Use LLM to detect if a completed phase's output has divergence worth debating. + + Returns False if LLM unavailable, detection fails, or no other completed + phases to compare against. Prefers false negatives over false positives. + """ + gateway = self._get_llm_gateway(lead) + if not gateway: + return False + + # Need other completed phases to compare against + other_completed = [ + ph + for ph in plan.completed_phases + if ph.id != completed_phase.id and ph.result + ] + if not other_completed: + return False + + other_outputs = [] + for ph in other_completed: + content = ph.result.get("content", str(ph.result)) if ph.result else "" + other_outputs.append(f"[{ph.name}]:\n{content[:300]}") + + current_output = "" + if completed_phase.result: + current_output = completed_phase.result.get( + "content", str(completed_phase.result) + )[:500] + + prompt = ( + f"你是团队 Lead {lead.config.name},需要判断刚完成的阶段产出是否与其他阶段存在分歧。\n\n" + f"原始任务:{plan.task}\n\n" + f"刚完成的阶段:{completed_phase.name}\n" + f"产出:{current_output}\n\n" + f"其他已完成阶段的产出:\n" + + "\n---\n".join(other_outputs) + + "\n\n" + "请判断是否值得发起辩论。以下情况值得辩论:\n" + "1) 两个阶段产出存在矛盾或冲突\n" + "2) 阶段产出与原始任务约束冲突\n" + "3) 存在多个合理方案需要抉择\n" + "其他情况不值得辩论。\n\n" + "只回答 true 或 false,不要其他文字。" + ) + + try: + response = await gateway.chat( + messages=[{"role": "user", "content": prompt}], + model=self._get_model(lead), + ) + return response.content.strip().lower().startswith("true") + except Exception as e: + logger.warning(f"Divergence detection failed: {e}") + return False + + def _insert_debate_phase( + self, + plan: TeamPlan, + trigger_phase: PlanPhase, + topic: str, + participants: list[str], + ) -> PlanPhase | None: + """Insert a DEBATE phase after the trigger phase, rewiring dependents. + + Phases that depended on trigger_phase now depend on the DEBATE phase, + so they wait for the debate conclusion before executing. + """ + if not participants: + return None + + lead = self._team.lead_expert + assigned = lead.config.name if lead else trigger_phase.assigned_expert + + debate_phase = PlanPhase( + name=f"辩论: {topic[:20]}", + assigned_expert=assigned, + task_description=topic, + depends_on=[trigger_phase.id], + phase_type=PhaseType.DEBATE, + debate_config={ + "topic": topic, + "participants": participants, + "max_rounds": 2, + }, + ) + + # Rewire: phases that depended on trigger_phase now depend on debate_phase + for ph in plan.phases: + if trigger_phase.id in ph.depends_on: + ph.depends_on.remove(trigger_phase.id) + ph.depends_on.append(debate_phase.id) + + plan.phases.append(debate_phase) + self._debate_count += 1 + logger.info(f"Inserted debate phase {debate_phase.id} after {trigger_phase.id}") + return debate_phase + + async def _check_divergence_and_insert_debates( + self, + lead: Expert, + plan: TeamPlan, + completed_in_layer: list[PlanPhase], + ) -> None: + """Check for divergence on newly completed phases and insert debates. + + Called after each layer completes. Stops early if MAX_DEBATES is reached. + """ + for ph in completed_in_layer: + if ph.status != PhaseStatus.COMPLETED: + continue + if self._debate_count >= self.MAX_DEBATES: + logger.info( + f"Max debates ({self.MAX_DEBATES}) reached, skipping divergence detection" + ) + return + + has_divergence = await self._detect_divergence(lead, ph, plan) + if not has_divergence: + continue + + # Determine participants: all active experts except lead + participants = [ + e.config.name + for e in self._team.active_experts + if e.config.name != lead.config.name + ] + topic = f"阶段 '{ph.name}' 产出分歧" + debate = self._insert_debate_phase(plan, ph, topic, participants) + if debate: + await self._broadcast_event( + "plan_update", + { + "plan_id": plan.id, + "plan_phases": [p.to_dict() for p in plan.phases], + "debate_inserted": debate.id, + }, + ) + + # ── U3 end ───────────────────────────────────────────────────────── + async def _get_isolated_agent(self, expert: Expert, phase: PlanPhase) -> ConfigDrivenAgent: """Get an isolated ConfigDrivenAgent instance for the phase. diff --git a/tests/unit/experts/test_divergence_detection.py b/tests/unit/experts/test_divergence_detection.py new file mode 100644 index 0000000..073ce21 --- /dev/null +++ b/tests/unit/experts/test_divergence_detection.py @@ -0,0 +1,756 @@ +"""TeamOrchestrator 分歧检测 + 方案评审辩论单元测试 (U3) + +测试覆盖: +- 方案评审辩论 (_maybe_add_plan_review_debate) + * Happy path: LLM 判断需要评审 → 插入 DEBATE phase,所有原 phase 依赖它 + * 边界: phases <= 2 时跳过 + * 边界: MAX_DEBATES 已达上限时跳过 + * 边界: 无其他成员时跳过 + * 错误路径: LLM 不可用时跳过 + * 错误路径: LLM 抛异常时跳过 +- 分歧检测 (_detect_divergence) + * Happy path: LLM 判断有分歧 → 返回 True + * Happy path: LLM 判断无分歧 → 返回 False + * 边界: 无其他已完成阶段时返回 False + * 错误路径: LLM 不可用时返回 False + * 错误路径: LLM 抛异常时返回 False +- 动态插入辩论 (_insert_debate_phase) + * Happy path: 插入 DEBATE,依赖重 wiring + * 边界: participants 为空时返回 None +- 协调入口 (_check_divergence_and_insert_debates) + * Happy path: 检测到分歧 → 插入辩论 + 广播 plan_update + * Happy path: 无分歧 → 不插入 + * 边界: MAX_DEBATES 达上限时跳过 +- 集成: 插入的 DEBATE phase 在 topological_sort 中正确分层 +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from agentkit.core.handoff_transport import InProcessHandoffTransport +from agentkit.experts.config import ExpertConfig +from agentkit.experts.orchestrator import TeamOrchestrator +from agentkit.experts.plan import PhaseStatus, PhaseType, PlanPhase, TeamPlan +from agentkit.experts.team import ExpertTeam + + +# ── 辅助函数 ────────────────────────────────────────────── + + +def _make_expert_config( + name: str = "test_expert", + is_lead: bool = False, +) -> ExpertConfig: + return ExpertConfig( + name=name, + agent_type="expert", + persona=f"{name}的角色描述", + thinking_style="逻辑推理", + speaking_style="简洁直接", + decision_framework="数据驱动决策", + bound_skills=["skill_a"], + is_lead=is_lead, + task_mode="llm_generate", + prompt={"identity": "测试"}, + ) + + +def _make_mock_expert( + name: str = "test_expert", + is_lead: bool = False, + is_active: bool = True, + gateway: MagicMock | None = None, +) -> MagicMock: + config = _make_expert_config(name=name, is_lead=is_lead) + expert = MagicMock() + expert.config = config + expert.is_active = is_active + expert.team_id = None + expert.get_capabilities_summary.return_value = { + "name": name, + "persona": config.persona, + "thinking_style": config.thinking_style, + "bound_skills": config.bound_skills, + "is_lead": is_lead, + } + mock_agent = MagicMock() + mock_agent._llm_gateway = gateway + expert.agent = mock_agent + return expert + + +def _make_team_with_experts( + expert_names: list[str] | None = None, + lead_name: str = "lead", + gateway: MagicMock | None = None, +) -> ExpertTeam: + team = ExpertTeam() + transport = AsyncMock(spec=InProcessHandoffTransport) + team._handoff_transport = transport + + if expert_names is None: + expert_names = [lead_name, "member1", "member2"] + + for name in expert_names: + is_lead = name == lead_name + expert = _make_mock_expert(name=name, is_lead=is_lead, gateway=gateway) + team._experts[name] = expert + if is_lead: + team._lead_expert_name = name + + return team + + +def _make_execution_phase( + phase_id: str = "phase_1", + name: str = "阶段一", + assigned_expert: str = "member1", + depends_on: list[str] | None = None, + status: PhaseStatus = PhaseStatus.PENDING, + result: dict | None = None, +) -> PlanPhase: + """创建测试用 EXECUTION 阶段""" + return PlanPhase( + id=phase_id, + name=name, + assigned_expert=assigned_expert, + task_description=f"{name}的任务描述", + depends_on=depends_on or [], + phase_type=PhaseType.EXECUTION, + status=status, + result=result, + ) + + +def _make_plan( + phases: list[PlanPhase], + task: str = "测试任务", + lead_expert: str = "lead", +) -> TeamPlan: + return TeamPlan( + id="test_plan", + task=task, + phases=phases, + lead_expert=lead_expert, + ) + + +def _make_bool_gateway( + responses: list[bool], +) -> AsyncMock: + """创建返回 true/false 字符串的 mock LLM gateway + + Args: + responses: 按调用顺序返回的布尔值列表 + """ + queue = list(responses) + + async def chat_side_effect(messages, model=None, **kwargs): + if not queue: + # Default to false if exhausted + response = MagicMock() + response.content = "false" + return response + val = queue.pop(0) + response = MagicMock() + response.content = "true" if val else "false" + return response + + gateway = AsyncMock() + gateway.chat = AsyncMock(side_effect=chat_side_effect) + return gateway + + +def _make_error_gateway() -> AsyncMock: + """创建总是抛异常的 mock LLM gateway""" + + async def chat_side_effect(messages, model=None, **kwargs): + raise RuntimeError("LLM unavailable") + + gateway = AsyncMock() + gateway.chat = AsyncMock(side_effect=chat_side_effect) + return gateway + + +# ── 方案评审辩论测试 ───────────────────────────────────── + + +class TestMaybeAddPlanReviewDebate: + """_maybe_add_plan_review_debate 测试""" + + @pytest.mark.asyncio + async def test_adds_plan_review_debate_when_llm_says_yes(self): + """LLM 判断需要评审 → 插入 DEBATE phase,所有原 phase 依赖它""" + gateway = _make_bool_gateway([True]) + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + # 3 个执行阶段(>2 才会考虑评审) + phases = [ + _make_execution_phase(phase_id="p1", name="阶段一"), + _make_execution_phase(phase_id="p2", name="阶段二"), + _make_execution_phase(phase_id="p3", name="阶段三"), + ] + plan = _make_plan(phases=phases, task="复杂任务") + + await orchestrator._maybe_add_plan_review_debate( + team.lead_expert, plan, "复杂任务" + ) + + # 应该插入一个 DEBATE phase 在最前面 + assert len(plan.phases) == 4 + review_phase = plan.phases[0] + assert review_phase.phase_type == PhaseType.DEBATE + assert review_phase.name == "方案评审" + assert review_phase.assigned_expert == "lead" + assert review_phase.debate_config is not None + assert review_phase.debate_config["participants"] == ["member1", "member2"] + assert review_phase.debate_config["max_rounds"] == 2 + + # 所有原 phase 都应该依赖 review_phase + for ph in plan.phases[1:]: + assert review_phase.id in ph.depends_on + + # debate_count 应该 +1 + assert orchestrator._debate_count == 1 + + @pytest.mark.asyncio + async def test_skips_when_llm_says_no(self): + """LLM 判断不需要评审 → 不插入""" + gateway = _make_bool_gateway([False]) + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phases = [ + _make_execution_phase(phase_id="p1"), + _make_execution_phase(phase_id="p2"), + _make_execution_phase(phase_id="p3"), + ] + plan = _make_plan(phases=phases) + + await orchestrator._maybe_add_plan_review_debate( + team.lead_expert, plan, "简单任务" + ) + + assert len(plan.phases) == 3 + assert orchestrator._debate_count == 0 + + @pytest.mark.asyncio + async def test_skips_when_phases_le_two(self): + """phases <= 2 时跳过(简单任务)""" + gateway = _make_bool_gateway([True]) + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phases = [ + _make_execution_phase(phase_id="p1"), + _make_execution_phase(phase_id="p2"), + ] + plan = _make_plan(phases=phases) + + await orchestrator._maybe_add_plan_review_debate( + team.lead_expert, plan, "任务" + ) + + assert len(plan.phases) == 2 + assert orchestrator._debate_count == 0 + + @pytest.mark.asyncio + async def test_skips_when_max_debates_reached(self): + """MAX_DEBATES 已达上限时跳过""" + gateway = _make_bool_gateway([True]) + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + orchestrator._debate_count = orchestrator.MAX_DEBATES + + phases = [ + _make_execution_phase(phase_id="p1"), + _make_execution_phase(phase_id="p2"), + _make_execution_phase(phase_id="p3"), + ] + plan = _make_plan(phases=phases) + + await orchestrator._maybe_add_plan_review_debate( + team.lead_expert, plan, "任务" + ) + + assert len(plan.phases) == 3 + assert orchestrator._debate_count == orchestrator.MAX_DEBATES + + @pytest.mark.asyncio + async def test_skips_when_no_other_members(self): + """无其他成员时跳过(只有 lead)""" + gateway = _make_bool_gateway([True]) + team = _make_team_with_experts( + expert_names=["lead"], gateway=gateway + ) + orchestrator = TeamOrchestrator(team) + + phases = [ + _make_execution_phase(phase_id="p1"), + _make_execution_phase(phase_id="p2"), + _make_execution_phase(phase_id="p3"), + ] + plan = _make_plan(phases=phases) + + await orchestrator._maybe_add_plan_review_debate( + team.lead_expert, plan, "任务" + ) + + assert len(plan.phases) == 3 + assert orchestrator._debate_count == 0 + + @pytest.mark.asyncio + async def test_skips_when_llm_unavailable(self): + """LLM gateway 为 None 时跳过""" + team = _make_team_with_experts(gateway=None) + orchestrator = TeamOrchestrator(team) + + phases = [ + _make_execution_phase(phase_id="p1"), + _make_execution_phase(phase_id="p2"), + _make_execution_phase(phase_id="p3"), + ] + plan = _make_plan(phases=phases) + + await orchestrator._maybe_add_plan_review_debate( + team.lead_expert, plan, "任务" + ) + + assert len(plan.phases) == 3 + assert orchestrator._debate_count == 0 + + @pytest.mark.asyncio + async def test_skips_when_llm_raises_exception(self): + """LLM 抛异常时跳过,不抛出""" + gateway = _make_error_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phases = [ + _make_execution_phase(phase_id="p1"), + _make_execution_phase(phase_id="p2"), + _make_execution_phase(phase_id="p3"), + ] + plan = _make_plan(phases=phases) + + # 不应该抛异常 + await orchestrator._maybe_add_plan_review_debate( + team.lead_expert, plan, "任务" + ) + + assert len(plan.phases) == 3 + assert orchestrator._debate_count == 0 + + +# ── 分歧检测测试 ───────────────────────────────────────── + + +class TestDetectDivergence: + """_detect_divergence 测试""" + + @pytest.mark.asyncio + async def test_returns_true_when_llm_detects_divergence(self): + """LLM 判断有分歧 → 返回 True""" + gateway = _make_bool_gateway([True]) + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + # 两个已完成的阶段,产出不同 + phase_a = _make_execution_phase( + phase_id="a", + name="阶段A", + status=PhaseStatus.COMPLETED, + result={"content": "采用 React"}, + ) + phase_b = _make_execution_phase( + phase_id="b", + name="阶段B", + status=PhaseStatus.COMPLETED, + result={"content": "采用 Vue"}, + ) + plan = _make_plan(phases=[phase_a, phase_b]) + + result = await orchestrator._detect_divergence( + team.lead_expert, phase_a, plan + ) + + assert result is True + + @pytest.mark.asyncio + async def test_returns_false_when_llm_says_no_divergence(self): + """LLM 判断无分歧 → 返回 False""" + gateway = _make_bool_gateway([False]) + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase_a = _make_execution_phase( + phase_id="a", + status=PhaseStatus.COMPLETED, + result={"content": "结果A"}, + ) + phase_b = _make_execution_phase( + phase_id="b", + status=PhaseStatus.COMPLETED, + result={"content": "结果B"}, + ) + plan = _make_plan(phases=[phase_a, phase_b]) + + result = await orchestrator._detect_divergence( + team.lead_expert, phase_a, plan + ) + + assert result is False + + @pytest.mark.asyncio + async def test_returns_false_when_no_other_completed_phases(self): + """无其他已完成阶段时返回 False(无法比较)""" + gateway = _make_bool_gateway([True]) + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase_a = _make_execution_phase( + phase_id="a", + status=PhaseStatus.COMPLETED, + result={"content": "结果A"}, + ) + # 另一个阶段还在 PENDING + phase_b = _make_execution_phase(phase_id="b", status=PhaseStatus.PENDING) + plan = _make_plan(phases=[phase_a, phase_b]) + + result = await orchestrator._detect_divergence( + team.lead_expert, phase_a, plan + ) + + assert result is False + + @pytest.mark.asyncio + async def test_returns_false_when_llm_unavailable(self): + """LLM gateway 为 None 时返回 False""" + team = _make_team_with_experts(gateway=None) + orchestrator = TeamOrchestrator(team) + + phase_a = _make_execution_phase( + phase_id="a", + status=PhaseStatus.COMPLETED, + result={"content": "结果A"}, + ) + phase_b = _make_execution_phase( + phase_id="b", + status=PhaseStatus.COMPLETED, + result={"content": "结果B"}, + ) + plan = _make_plan(phases=[phase_a, phase_b]) + + result = await orchestrator._detect_divergence( + team.lead_expert, phase_a, plan + ) + + assert result is False + + @pytest.mark.asyncio + async def test_returns_false_when_llm_raises_exception(self): + """LLM 抛异常时返回 False,不抛出""" + gateway = _make_error_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase_a = _make_execution_phase( + phase_id="a", + status=PhaseStatus.COMPLETED, + result={"content": "结果A"}, + ) + phase_b = _make_execution_phase( + phase_id="b", + status=PhaseStatus.COMPLETED, + result={"content": "结果B"}, + ) + plan = _make_plan(phases=[phase_a, phase_b]) + + result = await orchestrator._detect_divergence( + team.lead_expert, phase_a, plan + ) + + assert result is False + + +# ── 动态插入辩论测试 ───────────────────────────────────── + + +class TestInsertDebatePhase: + """_insert_debate_phase 测试""" + + def test_inserts_debate_and_rewires_dependencies(self): + """插入 DEBATE phase,依赖重 wiring:原依赖 trigger 的 phase 现在依赖 DEBATE""" + gateway = _make_bool_gateway([]) + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + trigger = _make_execution_phase(phase_id="trigger", name="触发阶段") + dependent = _make_execution_phase( + phase_id="dependent", + name="依赖阶段", + depends_on=["trigger"], + ) + plan = _make_plan(phases=[trigger, dependent]) + + debate = orchestrator._insert_debate_phase( + plan, trigger, "产出分歧", ["member1", "member2"] + ) + + assert debate is not None + assert debate.phase_type == PhaseType.DEBATE + assert debate.depends_on == ["trigger"] + assert debate.debate_config["topic"] == "产出分歧" + assert debate.debate_config["participants"] == ["member1", "member2"] + assert debate.debate_config["max_rounds"] == 2 + + # dependent 现在依赖 debate,不再直接依赖 trigger + assert debate.id in dependent.depends_on + assert "trigger" not in dependent.depends_on + + # debate 被加入 plan + assert debate in plan.phases + assert orchestrator._debate_count == 1 + + def test_returns_none_when_no_participants(self): + """participants 为空时返回 None""" + gateway = _make_bool_gateway([]) + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + trigger = _make_execution_phase(phase_id="trigger") + plan = _make_plan(phases=[trigger]) + + debate = orchestrator._insert_debate_phase( + plan, trigger, "产出分歧", [] + ) + + assert debate is None + assert orchestrator._debate_count == 0 + + def test_debate_assigned_to_lead(self): + """DEBATE phase 的 assigned_expert 是 lead""" + gateway = _make_bool_gateway([]) + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + trigger = _make_execution_phase(phase_id="trigger") + plan = _make_plan(phases=[trigger]) + + debate = orchestrator._insert_debate_phase( + plan, trigger, "分歧", ["member1"] + ) + + assert debate is not None + assert debate.assigned_expert == "lead" + + +# ── 协调入口测试 ───────────────────────────────────────── + + +class TestCheckDivergenceAndInsertDebates: + """_check_divergence_and_insert_debates 测试""" + + @pytest.mark.asyncio + async def test_inserts_debate_when_divergence_detected(self): + """检测到分歧 → 插入辩论 + 广播 plan_update""" + gateway = _make_bool_gateway([True]) # 检测到分歧 + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase_a = _make_execution_phase( + phase_id="a", + name="阶段A", + status=PhaseStatus.COMPLETED, + result={"content": "采用 React"}, + ) + phase_b = _make_execution_phase( + phase_id="b", + name="阶段B", + status=PhaseStatus.COMPLETED, + result={"content": "采用 Vue"}, + ) + plan = _make_plan(phases=[phase_a, phase_b]) + + await orchestrator._check_divergence_and_insert_debates( + team.lead_expert, plan, [phase_a] + ) + + # 应该插入一个 DEBATE phase + assert len(plan.phases) == 3 + debate = plan.phases[-1] + assert debate.phase_type == PhaseType.DEBATE + assert orchestrator._debate_count == 1 + + # 应该广播 plan_update 事件 + transport = team._handoff_transport + assert transport.send.called + # 最后一次 send 应该是 plan_update + last_call = transport.send.call_args_list[-1] + event_data = last_call[0][1] # 第二个位置参数是 data dict + assert event_data["type"] == "plan_update" + assert "debate_inserted" in event_data + + @pytest.mark.asyncio + async def test_no_debate_when_no_divergence(self): + """无分歧 → 不插入辩论""" + gateway = _make_bool_gateway([False]) + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase_a = _make_execution_phase( + phase_id="a", + status=PhaseStatus.COMPLETED, + result={"content": "结果A"}, + ) + phase_b = _make_execution_phase( + phase_id="b", + status=PhaseStatus.COMPLETED, + result={"content": "结果B"}, + ) + plan = _make_plan(phases=[phase_a, phase_b]) + + await orchestrator._check_divergence_and_insert_debates( + team.lead_expert, plan, [phase_a] + ) + + assert len(plan.phases) == 2 + assert orchestrator._debate_count == 0 + + @pytest.mark.asyncio + async def test_skips_when_max_debates_reached(self): + """MAX_DEBATES 达上限时跳过检测""" + gateway = _make_bool_gateway([True]) + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + orchestrator._debate_count = orchestrator.MAX_DEBATES + + phase_a = _make_execution_phase( + phase_id="a", + status=PhaseStatus.COMPLETED, + result={"content": "结果A"}, + ) + phase_b = _make_execution_phase( + phase_id="b", + status=PhaseStatus.COMPLETED, + result={"content": "结果B"}, + ) + plan = _make_plan(phases=[phase_a, phase_b]) + + await orchestrator._check_divergence_and_insert_debates( + team.lead_expert, plan, [phase_a] + ) + + assert len(plan.phases) == 2 + assert orchestrator._debate_count == orchestrator.MAX_DEBATES + + @pytest.mark.asyncio + async def test_skips_non_completed_phases(self): + """非 COMPLETED 状态的 phase 被跳过""" + gateway = _make_bool_gateway([True]) + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + # 传入一个 PENDING 的 phase(不应该被检测) + phase_pending = _make_execution_phase( + phase_id="pending", status=PhaseStatus.PENDING + ) + phase_completed = _make_execution_phase( + phase_id="completed", + status=PhaseStatus.COMPLETED, + result={"content": "结果"}, + ) + plan = _make_plan(phases=[phase_pending, phase_completed]) + + await orchestrator._check_divergence_and_insert_debates( + team.lead_expert, plan, [phase_pending, phase_completed] + ) + + # phase_pending 被跳过;phase_completed 无其他完成阶段可比较 → 无分歧 + assert orchestrator._debate_count == 0 + + +# ── 集成测试 ───────────────────────────────────────────── + + +class TestInsertedDebateLayering: + """插入的 DEBATE phase 在 topological_sort 中正确分层""" + + def test_inserted_debate_blocks_dependents(self): + """插入的 DEBATE phase 应该在 trigger 之后、dependent 之前""" + gateway = _make_bool_gateway([]) + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + trigger = _make_execution_phase( + phase_id="trigger", + name="触发阶段", + status=PhaseStatus.COMPLETED, + result={"content": "触发结果"}, + ) + dependent = _make_execution_phase( + phase_id="dependent", + name="依赖阶段", + depends_on=["trigger"], + ) + plan = _make_plan(phases=[trigger, dependent]) + + debate = orchestrator._insert_debate_phase( + plan, trigger, "分歧", ["member1", "member2"] + ) + + assert debate is not None + + layers = plan.topological_sort() + # 找到各 phase 所在的层 + trigger_layer = None + debate_layer = None + dependent_layer = None + for i, layer in enumerate(layers): + for ph in layer: + if ph.id == "trigger": + trigger_layer = i + elif ph.id == debate.id: + debate_layer = i + elif ph.id == "dependent": + dependent_layer = i + + assert trigger_layer is not None + assert debate_layer is not None + assert dependent_layer is not None + # trigger < debate < dependent + assert trigger_layer < debate_layer + assert debate_layer < dependent_layer + + @pytest.mark.asyncio + async def test_plan_review_debate_runs_first(self): + """方案评审 DEBATE 应该在第 0 层,所有执行阶段在后续层""" + gateway = _make_bool_gateway([True]) + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phases = [ + _make_execution_phase(phase_id="p1", name="阶段一"), + _make_execution_phase(phase_id="p2", name="阶段二"), + _make_execution_phase(phase_id="p3", name="阶段三"), + ] + plan = _make_plan(phases=phases, task="复杂任务") + + await orchestrator._maybe_add_plan_review_debate( + team.lead_expert, plan, "复杂任务" + ) + + layers = plan.topological_sort() + # 第 0 层应该只有方案评审 DEBATE + assert len(layers[0]) == 1 + assert layers[0][0].phase_type == PhaseType.DEBATE + assert layers[0][0].name == "方案评审" + + # 所有执行阶段在后续层 + for layer in layers[1:]: + for ph in layer: + assert ph.phase_type == PhaseType.EXECUTION diff --git a/tests/unit/experts/test_team_orchestrator.py b/tests/unit/experts/test_team_orchestrator.py index 57c9db5..d884a0c 100644 --- a/tests/unit/experts/test_team_orchestrator.py +++ b/tests/unit/experts/test_team_orchestrator.py @@ -130,7 +130,17 @@ def _make_mock_llm_gateway( decomp_response.content = phases_json synth_response = MagicMock() synth_response.content = synthesis_content - gateway.chat = AsyncMock(side_effect=[decomp_response, synth_response, synth_response]) + # U3: 分歧检测会在 decomposition 与 synthesis 之间插入额外的 LLM 调用, + # 因此用函数式 side_effect:首次返回 decomposition,其余一律返回 synthesis。 + call_count = [0] + + async def chat_side_effect(messages, model=None, **kwargs): + call_count[0] += 1 + if call_count[0] == 1: + return decomp_response + return synth_response + + gateway.chat = AsyncMock(side_effect=chat_side_effect) else: response = MagicMock() response.content = synthesis_content