feat(experts): U3 分歧检测 + 方案评审辩论自动触发

在 TeamOrchestrator 中新增 4 个方法实现自动辩论触发:

- _maybe_add_plan_review_debate: 任务分解后可选插入方案评审 DEBATE
  phase(phases > 2 且 LLM 判断需要时),所有执行阶段依赖它
- _detect_divergence: 每层执行后用 LLM 判断已完成阶段产出是否与其他
  阶段存在分歧,偏好 false negative
- _insert_debate_phase: 动态插入 DEBATE phase 并重 wiring 依赖
  (原依赖 trigger 的 phase 现在依赖 DEBATE)
- _check_divergence_and_insert_debates: 每层完成后的协调入口,
  受 MAX_DEBATES=3 上限保护

主循环从 `for layer in layers` 改为 `while True` + 重新计算
topological_sort(),以支持动态插入 DEBATE phase 后的依赖分层。

测试:tests/unit/experts/test_divergence_detection.py(21 测试),
覆盖 happy path / 边界 / 错误路径 / 集成分层。同步修复
test_team_orchestrator.py 的 mock gateway 以适配 U3 的额外 LLM 调用。

全部 398 测试通过。
This commit is contained in:
chiguyong 2026-06-24 11:09:53 +08:00
parent fbe08cb1e2
commit ac26d417b3
3 changed files with 1016 additions and 7 deletions

View File

@ -55,6 +55,7 @@ class TeamOrchestrator:
MAX_PHASES = 10 # Maximum phases Lead Expert can decompose
MAX_RETRIES = 1 # Retry once on phase failure before marking failed
MAX_DEBATE_ROUNDS = 4 # Hard cap on debate rounds per phase
MAX_DEBATES = 3 # Hard cap on auto-inserted debate phases per execution
STOP_COMMANDS = frozenset({"/stop", "停止", "stop", "结束"})
def __init__(self, team: ExpertTeam) -> None:
@ -62,6 +63,8 @@ class TeamOrchestrator:
# Track temporary agent names created for context isolation (KTD3)
# Maps phase_id -> temp_agent_name for cleanup
self._temp_agents: dict[str, str] = {}
# Count of auto-inserted debate phases (bounded by MAX_DEBATES)
self._debate_count = 0
async def execute(self, task: str) -> dict[str, Any]:
"""Execute a task in pipeline mode.
@ -135,6 +138,9 @@ class TeamOrchestrator:
plan.phases = phases[: self.MAX_PHASES]
# U3: Optionally add plan review debate before execution
await self._maybe_add_plan_review_debate(lead, plan, task)
# 3. Emit plan_update with phase list
await self._broadcast_event(
"plan_update",
@ -149,13 +155,22 @@ class TeamOrchestrator:
phase_results: dict[str, dict[str, Any]] = {}
try:
# Topological sort phases into execution layers
# Execute layers sequentially, phases within layer in parallel.
# U3: while-loop re-computes topological_sort each iteration so
# dynamically inserted DEBATE phases (from divergence detection)
# are picked up correctly.
while True:
layers = plan.topological_sort()
# Execute layers sequentially, phases within layer in parallel
# Find the next layer that still has PENDING phases
current_layer: list[PlanPhase] | None = None
for layer in layers:
# Filter out already-failed phases (from dependency failures)
ready = [ph for ph in layer if ph.status == PhaseStatus.PENDING]
if any(ph.status == PhaseStatus.PENDING for ph in layer):
current_layer = layer
break
if current_layer is None:
break # No more pending phases — done
ready = [ph for ph in current_layer if ph.status == PhaseStatus.PENDING]
if not ready:
continue
@ -186,6 +201,17 @@ class TeamOrchestrator:
else:
phase_results[ph.id] = result
# U3: Divergence detection — check completed phases for conflicts
# and dynamically insert DEBATE phases if needed
if self._debate_count < self.MAX_DEBATES:
completed_now = [
ph for ph in ready if ph.status == PhaseStatus.COMPLETED
]
if completed_now:
await self._check_divergence_and_insert_debates(
lead, plan, completed_now
)
# 5. Check if all phases failed
completed = plan.completed_phases
if not completed:
@ -946,6 +972,223 @@ class TeamOrchestrator:
return True
return False
# ── U3: Divergence detection + dynamic debate insertion ────────────
async def _maybe_add_plan_review_debate(
self, lead: Expert, plan: TeamPlan, task: str
) -> None:
"""Optionally add a plan review debate phase before execution.
Skips for simple tasks (<= 2 phases) or when LLM judges it unnecessary.
When added, all existing phases depend on the debate phase so it runs first.
"""
if len(plan.phases) <= 2:
return # Simple task, skip plan review
if self._debate_count >= self.MAX_DEBATES:
return
gateway = self._get_llm_gateway(lead)
if not gateway:
return
member_names = [
e.config.name
for e in self._team.active_experts
if e.config.name != lead.config.name
]
if not member_names:
return
prompt = (
f"你是团队 Lead {lead.config.name},需要判断以下任务是否需要方案评审辩论。\n\n"
f"任务:{task}\n"
f"分解的阶段:{', '.join(ph.name for ph in plan.phases)}\n"
f"团队成员:{', '.join(member_names)}\n\n"
"以下情况需要方案评审:\n"
"1) 任务复杂,涉及多个技术方向\n"
"2) 方案选择影响重大,值得先讨论再执行\n"
"3) 团队成员可能有不同观点\n"
"简单任务不需要评审。\n\n"
"只回答 true 或 false。"
)
try:
response = await gateway.chat(
messages=[{"role": "user", "content": prompt}],
model=self._get_model(lead),
)
if not response.content.strip().lower().startswith("true"):
return
except Exception as e:
logger.warning(f"Plan review judgment failed: {e}")
return
# Insert plan review DEBATE phase at the head
debate_phase = PlanPhase(
name="方案评审",
assigned_expert=lead.config.name,
task_description=f"方案评审:{task}",
depends_on=[],
phase_type=PhaseType.DEBATE,
debate_config={
"topic": f"方案评审:{task}",
"participants": member_names,
"max_rounds": 2,
},
)
# All existing phases now depend on the debate phase
for ph in plan.phases:
ph.depends_on.append(debate_phase.id)
plan.phases.insert(0, debate_phase)
self._debate_count += 1
logger.info(f"Added plan review debate phase {debate_phase.id}")
async def _detect_divergence(
self, lead: Expert, completed_phase: PlanPhase, plan: TeamPlan
) -> bool:
"""Use LLM to detect if a completed phase's output has divergence worth debating.
Returns False if LLM unavailable, detection fails, or no other completed
phases to compare against. Prefers false negatives over false positives.
"""
gateway = self._get_llm_gateway(lead)
if not gateway:
return False
# Need other completed phases to compare against
other_completed = [
ph
for ph in plan.completed_phases
if ph.id != completed_phase.id and ph.result
]
if not other_completed:
return False
other_outputs = []
for ph in other_completed:
content = ph.result.get("content", str(ph.result)) if ph.result else ""
other_outputs.append(f"[{ph.name}]:\n{content[:300]}")
current_output = ""
if completed_phase.result:
current_output = completed_phase.result.get(
"content", str(completed_phase.result)
)[:500]
prompt = (
f"你是团队 Lead {lead.config.name},需要判断刚完成的阶段产出是否与其他阶段存在分歧。\n\n"
f"原始任务:{plan.task}\n\n"
f"刚完成的阶段:{completed_phase.name}\n"
f"产出:{current_output}\n\n"
f"其他已完成阶段的产出:\n"
+ "\n---\n".join(other_outputs)
+ "\n\n"
"请判断是否值得发起辩论。以下情况值得辩论:\n"
"1) 两个阶段产出存在矛盾或冲突\n"
"2) 阶段产出与原始任务约束冲突\n"
"3) 存在多个合理方案需要抉择\n"
"其他情况不值得辩论。\n\n"
"只回答 true 或 false不要其他文字。"
)
try:
response = await gateway.chat(
messages=[{"role": "user", "content": prompt}],
model=self._get_model(lead),
)
return response.content.strip().lower().startswith("true")
except Exception as e:
logger.warning(f"Divergence detection failed: {e}")
return False
def _insert_debate_phase(
self,
plan: TeamPlan,
trigger_phase: PlanPhase,
topic: str,
participants: list[str],
) -> PlanPhase | None:
"""Insert a DEBATE phase after the trigger phase, rewiring dependents.
Phases that depended on trigger_phase now depend on the DEBATE phase,
so they wait for the debate conclusion before executing.
"""
if not participants:
return None
lead = self._team.lead_expert
assigned = lead.config.name if lead else trigger_phase.assigned_expert
debate_phase = PlanPhase(
name=f"辩论: {topic[:20]}",
assigned_expert=assigned,
task_description=topic,
depends_on=[trigger_phase.id],
phase_type=PhaseType.DEBATE,
debate_config={
"topic": topic,
"participants": participants,
"max_rounds": 2,
},
)
# Rewire: phases that depended on trigger_phase now depend on debate_phase
for ph in plan.phases:
if trigger_phase.id in ph.depends_on:
ph.depends_on.remove(trigger_phase.id)
ph.depends_on.append(debate_phase.id)
plan.phases.append(debate_phase)
self._debate_count += 1
logger.info(f"Inserted debate phase {debate_phase.id} after {trigger_phase.id}")
return debate_phase
async def _check_divergence_and_insert_debates(
self,
lead: Expert,
plan: TeamPlan,
completed_in_layer: list[PlanPhase],
) -> None:
"""Check for divergence on newly completed phases and insert debates.
Called after each layer completes. Stops early if MAX_DEBATES is reached.
"""
for ph in completed_in_layer:
if ph.status != PhaseStatus.COMPLETED:
continue
if self._debate_count >= self.MAX_DEBATES:
logger.info(
f"Max debates ({self.MAX_DEBATES}) reached, skipping divergence detection"
)
return
has_divergence = await self._detect_divergence(lead, ph, plan)
if not has_divergence:
continue
# Determine participants: all active experts except lead
participants = [
e.config.name
for e in self._team.active_experts
if e.config.name != lead.config.name
]
topic = f"阶段 '{ph.name}' 产出分歧"
debate = self._insert_debate_phase(plan, ph, topic, participants)
if debate:
await self._broadcast_event(
"plan_update",
{
"plan_id": plan.id,
"plan_phases": [p.to_dict() for p in plan.phases],
"debate_inserted": debate.id,
},
)
# ── U3 end ─────────────────────────────────────────────────────────
async def _get_isolated_agent(self, expert: Expert, phase: PlanPhase) -> ConfigDrivenAgent:
"""Get an isolated ConfigDrivenAgent instance for the phase.

View File

@ -0,0 +1,756 @@
"""TeamOrchestrator 分歧检测 + 方案评审辩论单元测试 (U3)
测试覆盖
- 方案评审辩论 (_maybe_add_plan_review_debate)
* Happy path: LLM 判断需要评审 插入 DEBATE phase所有原 phase 依赖它
* 边界: phases <= 2 时跳过
* 边界: MAX_DEBATES 已达上限时跳过
* 边界: 无其他成员时跳过
* 错误路径: LLM 不可用时跳过
* 错误路径: LLM 抛异常时跳过
- 分歧检测 (_detect_divergence)
* Happy path: LLM 判断有分歧 返回 True
* Happy path: LLM 判断无分歧 返回 False
* 边界: 无其他已完成阶段时返回 False
* 错误路径: LLM 不可用时返回 False
* 错误路径: LLM 抛异常时返回 False
- 动态插入辩论 (_insert_debate_phase)
* Happy path: 插入 DEBATE依赖重 wiring
* 边界: participants 为空时返回 None
- 协调入口 (_check_divergence_and_insert_debates)
* Happy path: 检测到分歧 插入辩论 + 广播 plan_update
* Happy path: 无分歧 不插入
* 边界: MAX_DEBATES 达上限时跳过
- 集成: 插入的 DEBATE phase topological_sort 中正确分层
"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock
import pytest
from agentkit.core.handoff_transport import InProcessHandoffTransport
from agentkit.experts.config import ExpertConfig
from agentkit.experts.orchestrator import TeamOrchestrator
from agentkit.experts.plan import PhaseStatus, PhaseType, PlanPhase, TeamPlan
from agentkit.experts.team import ExpertTeam
# ── 辅助函数 ──────────────────────────────────────────────
def _make_expert_config(
name: str = "test_expert",
is_lead: bool = False,
) -> ExpertConfig:
return ExpertConfig(
name=name,
agent_type="expert",
persona=f"{name}的角色描述",
thinking_style="逻辑推理",
speaking_style="简洁直接",
decision_framework="数据驱动决策",
bound_skills=["skill_a"],
is_lead=is_lead,
task_mode="llm_generate",
prompt={"identity": "测试"},
)
def _make_mock_expert(
name: str = "test_expert",
is_lead: bool = False,
is_active: bool = True,
gateway: MagicMock | None = None,
) -> MagicMock:
config = _make_expert_config(name=name, is_lead=is_lead)
expert = MagicMock()
expert.config = config
expert.is_active = is_active
expert.team_id = None
expert.get_capabilities_summary.return_value = {
"name": name,
"persona": config.persona,
"thinking_style": config.thinking_style,
"bound_skills": config.bound_skills,
"is_lead": is_lead,
}
mock_agent = MagicMock()
mock_agent._llm_gateway = gateway
expert.agent = mock_agent
return expert
def _make_team_with_experts(
expert_names: list[str] | None = None,
lead_name: str = "lead",
gateway: MagicMock | None = None,
) -> ExpertTeam:
team = ExpertTeam()
transport = AsyncMock(spec=InProcessHandoffTransport)
team._handoff_transport = transport
if expert_names is None:
expert_names = [lead_name, "member1", "member2"]
for name in expert_names:
is_lead = name == lead_name
expert = _make_mock_expert(name=name, is_lead=is_lead, gateway=gateway)
team._experts[name] = expert
if is_lead:
team._lead_expert_name = name
return team
def _make_execution_phase(
phase_id: str = "phase_1",
name: str = "阶段一",
assigned_expert: str = "member1",
depends_on: list[str] | None = None,
status: PhaseStatus = PhaseStatus.PENDING,
result: dict | None = None,
) -> PlanPhase:
"""创建测试用 EXECUTION 阶段"""
return PlanPhase(
id=phase_id,
name=name,
assigned_expert=assigned_expert,
task_description=f"{name}的任务描述",
depends_on=depends_on or [],
phase_type=PhaseType.EXECUTION,
status=status,
result=result,
)
def _make_plan(
phases: list[PlanPhase],
task: str = "测试任务",
lead_expert: str = "lead",
) -> TeamPlan:
return TeamPlan(
id="test_plan",
task=task,
phases=phases,
lead_expert=lead_expert,
)
def _make_bool_gateway(
responses: list[bool],
) -> AsyncMock:
"""创建返回 true/false 字符串的 mock LLM gateway
Args:
responses: 按调用顺序返回的布尔值列表
"""
queue = list(responses)
async def chat_side_effect(messages, model=None, **kwargs):
if not queue:
# Default to false if exhausted
response = MagicMock()
response.content = "false"
return response
val = queue.pop(0)
response = MagicMock()
response.content = "true" if val else "false"
return response
gateway = AsyncMock()
gateway.chat = AsyncMock(side_effect=chat_side_effect)
return gateway
def _make_error_gateway() -> AsyncMock:
"""创建总是抛异常的 mock LLM gateway"""
async def chat_side_effect(messages, model=None, **kwargs):
raise RuntimeError("LLM unavailable")
gateway = AsyncMock()
gateway.chat = AsyncMock(side_effect=chat_side_effect)
return gateway
# ── 方案评审辩论测试 ─────────────────────────────────────
class TestMaybeAddPlanReviewDebate:
"""_maybe_add_plan_review_debate 测试"""
@pytest.mark.asyncio
async def test_adds_plan_review_debate_when_llm_says_yes(self):
"""LLM 判断需要评审 → 插入 DEBATE phase所有原 phase 依赖它"""
gateway = _make_bool_gateway([True])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
# 3 个执行阶段(>2 才会考虑评审)
phases = [
_make_execution_phase(phase_id="p1", name="阶段一"),
_make_execution_phase(phase_id="p2", name="阶段二"),
_make_execution_phase(phase_id="p3", name="阶段三"),
]
plan = _make_plan(phases=phases, task="复杂任务")
await orchestrator._maybe_add_plan_review_debate(
team.lead_expert, plan, "复杂任务"
)
# 应该插入一个 DEBATE phase 在最前面
assert len(plan.phases) == 4
review_phase = plan.phases[0]
assert review_phase.phase_type == PhaseType.DEBATE
assert review_phase.name == "方案评审"
assert review_phase.assigned_expert == "lead"
assert review_phase.debate_config is not None
assert review_phase.debate_config["participants"] == ["member1", "member2"]
assert review_phase.debate_config["max_rounds"] == 2
# 所有原 phase 都应该依赖 review_phase
for ph in plan.phases[1:]:
assert review_phase.id in ph.depends_on
# debate_count 应该 +1
assert orchestrator._debate_count == 1
@pytest.mark.asyncio
async def test_skips_when_llm_says_no(self):
"""LLM 判断不需要评审 → 不插入"""
gateway = _make_bool_gateway([False])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phases = [
_make_execution_phase(phase_id="p1"),
_make_execution_phase(phase_id="p2"),
_make_execution_phase(phase_id="p3"),
]
plan = _make_plan(phases=phases)
await orchestrator._maybe_add_plan_review_debate(
team.lead_expert, plan, "简单任务"
)
assert len(plan.phases) == 3
assert orchestrator._debate_count == 0
@pytest.mark.asyncio
async def test_skips_when_phases_le_two(self):
"""phases <= 2 时跳过(简单任务)"""
gateway = _make_bool_gateway([True])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phases = [
_make_execution_phase(phase_id="p1"),
_make_execution_phase(phase_id="p2"),
]
plan = _make_plan(phases=phases)
await orchestrator._maybe_add_plan_review_debate(
team.lead_expert, plan, "任务"
)
assert len(plan.phases) == 2
assert orchestrator._debate_count == 0
@pytest.mark.asyncio
async def test_skips_when_max_debates_reached(self):
"""MAX_DEBATES 已达上限时跳过"""
gateway = _make_bool_gateway([True])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
orchestrator._debate_count = orchestrator.MAX_DEBATES
phases = [
_make_execution_phase(phase_id="p1"),
_make_execution_phase(phase_id="p2"),
_make_execution_phase(phase_id="p3"),
]
plan = _make_plan(phases=phases)
await orchestrator._maybe_add_plan_review_debate(
team.lead_expert, plan, "任务"
)
assert len(plan.phases) == 3
assert orchestrator._debate_count == orchestrator.MAX_DEBATES
@pytest.mark.asyncio
async def test_skips_when_no_other_members(self):
"""无其他成员时跳过(只有 lead"""
gateway = _make_bool_gateway([True])
team = _make_team_with_experts(
expert_names=["lead"], gateway=gateway
)
orchestrator = TeamOrchestrator(team)
phases = [
_make_execution_phase(phase_id="p1"),
_make_execution_phase(phase_id="p2"),
_make_execution_phase(phase_id="p3"),
]
plan = _make_plan(phases=phases)
await orchestrator._maybe_add_plan_review_debate(
team.lead_expert, plan, "任务"
)
assert len(plan.phases) == 3
assert orchestrator._debate_count == 0
@pytest.mark.asyncio
async def test_skips_when_llm_unavailable(self):
"""LLM gateway 为 None 时跳过"""
team = _make_team_with_experts(gateway=None)
orchestrator = TeamOrchestrator(team)
phases = [
_make_execution_phase(phase_id="p1"),
_make_execution_phase(phase_id="p2"),
_make_execution_phase(phase_id="p3"),
]
plan = _make_plan(phases=phases)
await orchestrator._maybe_add_plan_review_debate(
team.lead_expert, plan, "任务"
)
assert len(plan.phases) == 3
assert orchestrator._debate_count == 0
@pytest.mark.asyncio
async def test_skips_when_llm_raises_exception(self):
"""LLM 抛异常时跳过,不抛出"""
gateway = _make_error_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phases = [
_make_execution_phase(phase_id="p1"),
_make_execution_phase(phase_id="p2"),
_make_execution_phase(phase_id="p3"),
]
plan = _make_plan(phases=phases)
# 不应该抛异常
await orchestrator._maybe_add_plan_review_debate(
team.lead_expert, plan, "任务"
)
assert len(plan.phases) == 3
assert orchestrator._debate_count == 0
# ── 分歧检测测试 ─────────────────────────────────────────
class TestDetectDivergence:
"""_detect_divergence 测试"""
@pytest.mark.asyncio
async def test_returns_true_when_llm_detects_divergence(self):
"""LLM 判断有分歧 → 返回 True"""
gateway = _make_bool_gateway([True])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
# 两个已完成的阶段,产出不同
phase_a = _make_execution_phase(
phase_id="a",
name="阶段A",
status=PhaseStatus.COMPLETED,
result={"content": "采用 React"},
)
phase_b = _make_execution_phase(
phase_id="b",
name="阶段B",
status=PhaseStatus.COMPLETED,
result={"content": "采用 Vue"},
)
plan = _make_plan(phases=[phase_a, phase_b])
result = await orchestrator._detect_divergence(
team.lead_expert, phase_a, plan
)
assert result is True
@pytest.mark.asyncio
async def test_returns_false_when_llm_says_no_divergence(self):
"""LLM 判断无分歧 → 返回 False"""
gateway = _make_bool_gateway([False])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase_a = _make_execution_phase(
phase_id="a",
status=PhaseStatus.COMPLETED,
result={"content": "结果A"},
)
phase_b = _make_execution_phase(
phase_id="b",
status=PhaseStatus.COMPLETED,
result={"content": "结果B"},
)
plan = _make_plan(phases=[phase_a, phase_b])
result = await orchestrator._detect_divergence(
team.lead_expert, phase_a, plan
)
assert result is False
@pytest.mark.asyncio
async def test_returns_false_when_no_other_completed_phases(self):
"""无其他已完成阶段时返回 False无法比较"""
gateway = _make_bool_gateway([True])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase_a = _make_execution_phase(
phase_id="a",
status=PhaseStatus.COMPLETED,
result={"content": "结果A"},
)
# 另一个阶段还在 PENDING
phase_b = _make_execution_phase(phase_id="b", status=PhaseStatus.PENDING)
plan = _make_plan(phases=[phase_a, phase_b])
result = await orchestrator._detect_divergence(
team.lead_expert, phase_a, plan
)
assert result is False
@pytest.mark.asyncio
async def test_returns_false_when_llm_unavailable(self):
"""LLM gateway 为 None 时返回 False"""
team = _make_team_with_experts(gateway=None)
orchestrator = TeamOrchestrator(team)
phase_a = _make_execution_phase(
phase_id="a",
status=PhaseStatus.COMPLETED,
result={"content": "结果A"},
)
phase_b = _make_execution_phase(
phase_id="b",
status=PhaseStatus.COMPLETED,
result={"content": "结果B"},
)
plan = _make_plan(phases=[phase_a, phase_b])
result = await orchestrator._detect_divergence(
team.lead_expert, phase_a, plan
)
assert result is False
@pytest.mark.asyncio
async def test_returns_false_when_llm_raises_exception(self):
"""LLM 抛异常时返回 False不抛出"""
gateway = _make_error_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase_a = _make_execution_phase(
phase_id="a",
status=PhaseStatus.COMPLETED,
result={"content": "结果A"},
)
phase_b = _make_execution_phase(
phase_id="b",
status=PhaseStatus.COMPLETED,
result={"content": "结果B"},
)
plan = _make_plan(phases=[phase_a, phase_b])
result = await orchestrator._detect_divergence(
team.lead_expert, phase_a, plan
)
assert result is False
# ── 动态插入辩论测试 ─────────────────────────────────────
class TestInsertDebatePhase:
"""_insert_debate_phase 测试"""
def test_inserts_debate_and_rewires_dependencies(self):
"""插入 DEBATE phase依赖重 wiring原依赖 trigger 的 phase 现在依赖 DEBATE"""
gateway = _make_bool_gateway([])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
trigger = _make_execution_phase(phase_id="trigger", name="触发阶段")
dependent = _make_execution_phase(
phase_id="dependent",
name="依赖阶段",
depends_on=["trigger"],
)
plan = _make_plan(phases=[trigger, dependent])
debate = orchestrator._insert_debate_phase(
plan, trigger, "产出分歧", ["member1", "member2"]
)
assert debate is not None
assert debate.phase_type == PhaseType.DEBATE
assert debate.depends_on == ["trigger"]
assert debate.debate_config["topic"] == "产出分歧"
assert debate.debate_config["participants"] == ["member1", "member2"]
assert debate.debate_config["max_rounds"] == 2
# dependent 现在依赖 debate不再直接依赖 trigger
assert debate.id in dependent.depends_on
assert "trigger" not in dependent.depends_on
# debate 被加入 plan
assert debate in plan.phases
assert orchestrator._debate_count == 1
def test_returns_none_when_no_participants(self):
"""participants 为空时返回 None"""
gateway = _make_bool_gateway([])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
trigger = _make_execution_phase(phase_id="trigger")
plan = _make_plan(phases=[trigger])
debate = orchestrator._insert_debate_phase(
plan, trigger, "产出分歧", []
)
assert debate is None
assert orchestrator._debate_count == 0
def test_debate_assigned_to_lead(self):
"""DEBATE phase 的 assigned_expert 是 lead"""
gateway = _make_bool_gateway([])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
trigger = _make_execution_phase(phase_id="trigger")
plan = _make_plan(phases=[trigger])
debate = orchestrator._insert_debate_phase(
plan, trigger, "分歧", ["member1"]
)
assert debate is not None
assert debate.assigned_expert == "lead"
# ── 协调入口测试 ─────────────────────────────────────────
class TestCheckDivergenceAndInsertDebates:
"""_check_divergence_and_insert_debates 测试"""
@pytest.mark.asyncio
async def test_inserts_debate_when_divergence_detected(self):
"""检测到分歧 → 插入辩论 + 广播 plan_update"""
gateway = _make_bool_gateway([True]) # 检测到分歧
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase_a = _make_execution_phase(
phase_id="a",
name="阶段A",
status=PhaseStatus.COMPLETED,
result={"content": "采用 React"},
)
phase_b = _make_execution_phase(
phase_id="b",
name="阶段B",
status=PhaseStatus.COMPLETED,
result={"content": "采用 Vue"},
)
plan = _make_plan(phases=[phase_a, phase_b])
await orchestrator._check_divergence_and_insert_debates(
team.lead_expert, plan, [phase_a]
)
# 应该插入一个 DEBATE phase
assert len(plan.phases) == 3
debate = plan.phases[-1]
assert debate.phase_type == PhaseType.DEBATE
assert orchestrator._debate_count == 1
# 应该广播 plan_update 事件
transport = team._handoff_transport
assert transport.send.called
# 最后一次 send 应该是 plan_update
last_call = transport.send.call_args_list[-1]
event_data = last_call[0][1] # 第二个位置参数是 data dict
assert event_data["type"] == "plan_update"
assert "debate_inserted" in event_data
@pytest.mark.asyncio
async def test_no_debate_when_no_divergence(self):
"""无分歧 → 不插入辩论"""
gateway = _make_bool_gateway([False])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase_a = _make_execution_phase(
phase_id="a",
status=PhaseStatus.COMPLETED,
result={"content": "结果A"},
)
phase_b = _make_execution_phase(
phase_id="b",
status=PhaseStatus.COMPLETED,
result={"content": "结果B"},
)
plan = _make_plan(phases=[phase_a, phase_b])
await orchestrator._check_divergence_and_insert_debates(
team.lead_expert, plan, [phase_a]
)
assert len(plan.phases) == 2
assert orchestrator._debate_count == 0
@pytest.mark.asyncio
async def test_skips_when_max_debates_reached(self):
"""MAX_DEBATES 达上限时跳过检测"""
gateway = _make_bool_gateway([True])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
orchestrator._debate_count = orchestrator.MAX_DEBATES
phase_a = _make_execution_phase(
phase_id="a",
status=PhaseStatus.COMPLETED,
result={"content": "结果A"},
)
phase_b = _make_execution_phase(
phase_id="b",
status=PhaseStatus.COMPLETED,
result={"content": "结果B"},
)
plan = _make_plan(phases=[phase_a, phase_b])
await orchestrator._check_divergence_and_insert_debates(
team.lead_expert, plan, [phase_a]
)
assert len(plan.phases) == 2
assert orchestrator._debate_count == orchestrator.MAX_DEBATES
@pytest.mark.asyncio
async def test_skips_non_completed_phases(self):
"""非 COMPLETED 状态的 phase 被跳过"""
gateway = _make_bool_gateway([True])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
# 传入一个 PENDING 的 phase不应该被检测
phase_pending = _make_execution_phase(
phase_id="pending", status=PhaseStatus.PENDING
)
phase_completed = _make_execution_phase(
phase_id="completed",
status=PhaseStatus.COMPLETED,
result={"content": "结果"},
)
plan = _make_plan(phases=[phase_pending, phase_completed])
await orchestrator._check_divergence_and_insert_debates(
team.lead_expert, plan, [phase_pending, phase_completed]
)
# phase_pending 被跳过phase_completed 无其他完成阶段可比较 → 无分歧
assert orchestrator._debate_count == 0
# ── 集成测试 ─────────────────────────────────────────────
class TestInsertedDebateLayering:
"""插入的 DEBATE phase 在 topological_sort 中正确分层"""
def test_inserted_debate_blocks_dependents(self):
"""插入的 DEBATE phase 应该在 trigger 之后、dependent 之前"""
gateway = _make_bool_gateway([])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
trigger = _make_execution_phase(
phase_id="trigger",
name="触发阶段",
status=PhaseStatus.COMPLETED,
result={"content": "触发结果"},
)
dependent = _make_execution_phase(
phase_id="dependent",
name="依赖阶段",
depends_on=["trigger"],
)
plan = _make_plan(phases=[trigger, dependent])
debate = orchestrator._insert_debate_phase(
plan, trigger, "分歧", ["member1", "member2"]
)
assert debate is not None
layers = plan.topological_sort()
# 找到各 phase 所在的层
trigger_layer = None
debate_layer = None
dependent_layer = None
for i, layer in enumerate(layers):
for ph in layer:
if ph.id == "trigger":
trigger_layer = i
elif ph.id == debate.id:
debate_layer = i
elif ph.id == "dependent":
dependent_layer = i
assert trigger_layer is not None
assert debate_layer is not None
assert dependent_layer is not None
# trigger < debate < dependent
assert trigger_layer < debate_layer
assert debate_layer < dependent_layer
@pytest.mark.asyncio
async def test_plan_review_debate_runs_first(self):
"""方案评审 DEBATE 应该在第 0 层,所有执行阶段在后续层"""
gateway = _make_bool_gateway([True])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phases = [
_make_execution_phase(phase_id="p1", name="阶段一"),
_make_execution_phase(phase_id="p2", name="阶段二"),
_make_execution_phase(phase_id="p3", name="阶段三"),
]
plan = _make_plan(phases=phases, task="复杂任务")
await orchestrator._maybe_add_plan_review_debate(
team.lead_expert, plan, "复杂任务"
)
layers = plan.topological_sort()
# 第 0 层应该只有方案评审 DEBATE
assert len(layers[0]) == 1
assert layers[0][0].phase_type == PhaseType.DEBATE
assert layers[0][0].name == "方案评审"
# 所有执行阶段在后续层
for layer in layers[1:]:
for ph in layer:
assert ph.phase_type == PhaseType.EXECUTION

View File

@ -130,7 +130,17 @@ def _make_mock_llm_gateway(
decomp_response.content = phases_json
synth_response = MagicMock()
synth_response.content = synthesis_content
gateway.chat = AsyncMock(side_effect=[decomp_response, synth_response, synth_response])
# U3: 分歧检测会在 decomposition 与 synthesis 之间插入额外的 LLM 调用,
# 因此用函数式 side_effect首次返回 decomposition其余一律返回 synthesis。
call_count = [0]
async def chat_side_effect(messages, model=None, **kwargs):
call_count[0] += 1
if call_count[0] == 1:
return decomp_response
return synth_response
gateway.chat = AsyncMock(side_effect=chat_side_effect)
else:
response = MagicMock()
response.content = synthesis_content