fischer-agentkit/tests/unit/experts/test_divergence_detection.py

757 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""TeamOrchestrator 分歧检测 + 方案评审辩论单元测试 (U3)
测试覆盖:
- 方案评审辩论 (_maybe_add_plan_review_debate)
* Happy path: LLM 判断需要评审 → 插入 DEBATE phase所有原 phase 依赖它
* 边界: phases <= 2 时跳过
* 边界: MAX_DEBATES 已达上限时跳过
* 边界: 无其他成员时跳过
* 错误路径: LLM 不可用时跳过
* 错误路径: LLM 抛异常时跳过
- 分歧检测 (_detect_divergence)
* Happy path: LLM 判断有分歧 → 返回 True
* Happy path: LLM 判断无分歧 → 返回 False
* 边界: 无其他已完成阶段时返回 False
* 错误路径: LLM 不可用时返回 False
* 错误路径: LLM 抛异常时返回 False
- 动态插入辩论 (_insert_debate_phase)
* Happy path: 插入 DEBATE依赖重 wiring
* 边界: participants 为空时返回 None
- 协调入口 (_check_divergence_and_insert_debates)
* Happy path: 检测到分歧 → 插入辩论 + 广播 plan_update
* Happy path: 无分歧 → 不插入
* 边界: MAX_DEBATES 达上限时跳过
- 集成: 插入的 DEBATE phase 在 topological_sort 中正确分层
"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock
import pytest
from agentkit.core.handoff_transport import InProcessHandoffTransport
from agentkit.experts.config import ExpertConfig
from agentkit.experts.orchestrator import TeamOrchestrator
from agentkit.experts.plan import PhaseStatus, PhaseType, PlanPhase, TeamPlan
from agentkit.experts.team import ExpertTeam
# ── 辅助函数 ──────────────────────────────────────────────
def _make_expert_config(
name: str = "test_expert",
is_lead: bool = False,
) -> ExpertConfig:
return ExpertConfig(
name=name,
agent_type="expert",
persona=f"{name}的角色描述",
thinking_style="逻辑推理",
speaking_style="简洁直接",
decision_framework="数据驱动决策",
bound_skills=["skill_a"],
is_lead=is_lead,
task_mode="llm_generate",
prompt={"identity": "测试"},
)
def _make_mock_expert(
name: str = "test_expert",
is_lead: bool = False,
is_active: bool = True,
gateway: MagicMock | None = None,
) -> MagicMock:
config = _make_expert_config(name=name, is_lead=is_lead)
expert = MagicMock()
expert.config = config
expert.is_active = is_active
expert.team_id = None
expert.get_capabilities_summary.return_value = {
"name": name,
"persona": config.persona,
"thinking_style": config.thinking_style,
"bound_skills": config.bound_skills,
"is_lead": is_lead,
}
mock_agent = MagicMock()
mock_agent._llm_gateway = gateway
expert.agent = mock_agent
return expert
def _make_team_with_experts(
expert_names: list[str] | None = None,
lead_name: str = "lead",
gateway: MagicMock | None = None,
) -> ExpertTeam:
team = ExpertTeam()
transport = AsyncMock(spec=InProcessHandoffTransport)
team._handoff_transport = transport
if expert_names is None:
expert_names = [lead_name, "member1", "member2"]
for name in expert_names:
is_lead = name == lead_name
expert = _make_mock_expert(name=name, is_lead=is_lead, gateway=gateway)
team._experts[name] = expert
if is_lead:
team._lead_expert_name = name
return team
def _make_execution_phase(
phase_id: str = "phase_1",
name: str = "阶段一",
assigned_expert: str = "member1",
depends_on: list[str] | None = None,
status: PhaseStatus = PhaseStatus.PENDING,
result: dict | None = None,
) -> PlanPhase:
"""创建测试用 EXECUTION 阶段"""
return PlanPhase(
id=phase_id,
name=name,
assigned_expert=assigned_expert,
task_description=f"{name}的任务描述",
depends_on=depends_on or [],
phase_type=PhaseType.EXECUTION,
status=status,
result=result,
)
def _make_plan(
phases: list[PlanPhase],
task: str = "测试任务",
lead_expert: str = "lead",
) -> TeamPlan:
return TeamPlan(
id="test_plan",
task=task,
phases=phases,
lead_expert=lead_expert,
)
def _make_bool_gateway(
responses: list[bool],
) -> AsyncMock:
"""创建返回 true/false 字符串的 mock LLM gateway
Args:
responses: 按调用顺序返回的布尔值列表
"""
queue = list(responses)
async def chat_side_effect(messages, model=None, **kwargs):
if not queue:
# Default to false if exhausted
response = MagicMock()
response.content = "false"
return response
val = queue.pop(0)
response = MagicMock()
response.content = "true" if val else "false"
return response
gateway = AsyncMock()
gateway.chat = AsyncMock(side_effect=chat_side_effect)
return gateway
def _make_error_gateway() -> AsyncMock:
"""创建总是抛异常的 mock LLM gateway"""
async def chat_side_effect(messages, model=None, **kwargs):
raise RuntimeError("LLM unavailable")
gateway = AsyncMock()
gateway.chat = AsyncMock(side_effect=chat_side_effect)
return gateway
# ── 方案评审辩论测试 ─────────────────────────────────────
class TestMaybeAddPlanReviewDebate:
"""_maybe_add_plan_review_debate 测试"""
@pytest.mark.asyncio
async def test_adds_plan_review_debate_when_llm_says_yes(self):
"""LLM 判断需要评审 → 插入 DEBATE phase所有原 phase 依赖它"""
gateway = _make_bool_gateway([True])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
# 3 个执行阶段(>2 才会考虑评审)
phases = [
_make_execution_phase(phase_id="p1", name="阶段一"),
_make_execution_phase(phase_id="p2", name="阶段二"),
_make_execution_phase(phase_id="p3", name="阶段三"),
]
plan = _make_plan(phases=phases, task="复杂任务")
await orchestrator._maybe_add_plan_review_debate(
team.lead_expert, plan, "复杂任务"
)
# 应该插入一个 DEBATE phase 在最前面
assert len(plan.phases) == 4
review_phase = plan.phases[0]
assert review_phase.phase_type == PhaseType.DEBATE
assert review_phase.name == "方案评审"
assert review_phase.assigned_expert == "lead"
assert review_phase.debate_config is not None
assert review_phase.debate_config["participants"] == ["member1", "member2"]
assert review_phase.debate_config["max_rounds"] == 2
# 所有原 phase 都应该依赖 review_phase
for ph in plan.phases[1:]:
assert review_phase.id in ph.depends_on
# debate_count 应该 +1
assert orchestrator._debate_count == 1
@pytest.mark.asyncio
async def test_skips_when_llm_says_no(self):
"""LLM 判断不需要评审 → 不插入"""
gateway = _make_bool_gateway([False])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phases = [
_make_execution_phase(phase_id="p1"),
_make_execution_phase(phase_id="p2"),
_make_execution_phase(phase_id="p3"),
]
plan = _make_plan(phases=phases)
await orchestrator._maybe_add_plan_review_debate(
team.lead_expert, plan, "简单任务"
)
assert len(plan.phases) == 3
assert orchestrator._debate_count == 0
@pytest.mark.asyncio
async def test_skips_when_phases_le_two(self):
"""phases <= 2 时跳过(简单任务)"""
gateway = _make_bool_gateway([True])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phases = [
_make_execution_phase(phase_id="p1"),
_make_execution_phase(phase_id="p2"),
]
plan = _make_plan(phases=phases)
await orchestrator._maybe_add_plan_review_debate(
team.lead_expert, plan, "任务"
)
assert len(plan.phases) == 2
assert orchestrator._debate_count == 0
@pytest.mark.asyncio
async def test_skips_when_max_debates_reached(self):
"""MAX_DEBATES 已达上限时跳过"""
gateway = _make_bool_gateway([True])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
orchestrator._debate_count = orchestrator.MAX_DEBATES
phases = [
_make_execution_phase(phase_id="p1"),
_make_execution_phase(phase_id="p2"),
_make_execution_phase(phase_id="p3"),
]
plan = _make_plan(phases=phases)
await orchestrator._maybe_add_plan_review_debate(
team.lead_expert, plan, "任务"
)
assert len(plan.phases) == 3
assert orchestrator._debate_count == orchestrator.MAX_DEBATES
@pytest.mark.asyncio
async def test_skips_when_no_other_members(self):
"""无其他成员时跳过(只有 lead"""
gateway = _make_bool_gateway([True])
team = _make_team_with_experts(
expert_names=["lead"], gateway=gateway
)
orchestrator = TeamOrchestrator(team)
phases = [
_make_execution_phase(phase_id="p1"),
_make_execution_phase(phase_id="p2"),
_make_execution_phase(phase_id="p3"),
]
plan = _make_plan(phases=phases)
await orchestrator._maybe_add_plan_review_debate(
team.lead_expert, plan, "任务"
)
assert len(plan.phases) == 3
assert orchestrator._debate_count == 0
@pytest.mark.asyncio
async def test_skips_when_llm_unavailable(self):
"""LLM gateway 为 None 时跳过"""
team = _make_team_with_experts(gateway=None)
orchestrator = TeamOrchestrator(team)
phases = [
_make_execution_phase(phase_id="p1"),
_make_execution_phase(phase_id="p2"),
_make_execution_phase(phase_id="p3"),
]
plan = _make_plan(phases=phases)
await orchestrator._maybe_add_plan_review_debate(
team.lead_expert, plan, "任务"
)
assert len(plan.phases) == 3
assert orchestrator._debate_count == 0
@pytest.mark.asyncio
async def test_skips_when_llm_raises_exception(self):
"""LLM 抛异常时跳过,不抛出"""
gateway = _make_error_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phases = [
_make_execution_phase(phase_id="p1"),
_make_execution_phase(phase_id="p2"),
_make_execution_phase(phase_id="p3"),
]
plan = _make_plan(phases=phases)
# 不应该抛异常
await orchestrator._maybe_add_plan_review_debate(
team.lead_expert, plan, "任务"
)
assert len(plan.phases) == 3
assert orchestrator._debate_count == 0
# ── 分歧检测测试 ─────────────────────────────────────────
class TestDetectDivergence:
"""_detect_divergence 测试"""
@pytest.mark.asyncio
async def test_returns_true_when_llm_detects_divergence(self):
"""LLM 判断有分歧 → 返回 True"""
gateway = _make_bool_gateway([True])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
# 两个已完成的阶段,产出不同
phase_a = _make_execution_phase(
phase_id="a",
name="阶段A",
status=PhaseStatus.COMPLETED,
result={"content": "采用 React"},
)
phase_b = _make_execution_phase(
phase_id="b",
name="阶段B",
status=PhaseStatus.COMPLETED,
result={"content": "采用 Vue"},
)
plan = _make_plan(phases=[phase_a, phase_b])
result = await orchestrator._detect_divergence(
team.lead_expert, phase_a, plan
)
assert result is True
@pytest.mark.asyncio
async def test_returns_false_when_llm_says_no_divergence(self):
"""LLM 判断无分歧 → 返回 False"""
gateway = _make_bool_gateway([False])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase_a = _make_execution_phase(
phase_id="a",
status=PhaseStatus.COMPLETED,
result={"content": "结果A"},
)
phase_b = _make_execution_phase(
phase_id="b",
status=PhaseStatus.COMPLETED,
result={"content": "结果B"},
)
plan = _make_plan(phases=[phase_a, phase_b])
result = await orchestrator._detect_divergence(
team.lead_expert, phase_a, plan
)
assert result is False
@pytest.mark.asyncio
async def test_returns_false_when_no_other_completed_phases(self):
"""无其他已完成阶段时返回 False无法比较"""
gateway = _make_bool_gateway([True])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase_a = _make_execution_phase(
phase_id="a",
status=PhaseStatus.COMPLETED,
result={"content": "结果A"},
)
# 另一个阶段还在 PENDING
phase_b = _make_execution_phase(phase_id="b", status=PhaseStatus.PENDING)
plan = _make_plan(phases=[phase_a, phase_b])
result = await orchestrator._detect_divergence(
team.lead_expert, phase_a, plan
)
assert result is False
@pytest.mark.asyncio
async def test_returns_false_when_llm_unavailable(self):
"""LLM gateway 为 None 时返回 False"""
team = _make_team_with_experts(gateway=None)
orchestrator = TeamOrchestrator(team)
phase_a = _make_execution_phase(
phase_id="a",
status=PhaseStatus.COMPLETED,
result={"content": "结果A"},
)
phase_b = _make_execution_phase(
phase_id="b",
status=PhaseStatus.COMPLETED,
result={"content": "结果B"},
)
plan = _make_plan(phases=[phase_a, phase_b])
result = await orchestrator._detect_divergence(
team.lead_expert, phase_a, plan
)
assert result is False
@pytest.mark.asyncio
async def test_returns_false_when_llm_raises_exception(self):
"""LLM 抛异常时返回 False不抛出"""
gateway = _make_error_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase_a = _make_execution_phase(
phase_id="a",
status=PhaseStatus.COMPLETED,
result={"content": "结果A"},
)
phase_b = _make_execution_phase(
phase_id="b",
status=PhaseStatus.COMPLETED,
result={"content": "结果B"},
)
plan = _make_plan(phases=[phase_a, phase_b])
result = await orchestrator._detect_divergence(
team.lead_expert, phase_a, plan
)
assert result is False
# ── 动态插入辩论测试 ─────────────────────────────────────
class TestInsertDebatePhase:
"""_insert_debate_phase 测试"""
def test_inserts_debate_and_rewires_dependencies(self):
"""插入 DEBATE phase依赖重 wiring原依赖 trigger 的 phase 现在依赖 DEBATE"""
gateway = _make_bool_gateway([])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
trigger = _make_execution_phase(phase_id="trigger", name="触发阶段")
dependent = _make_execution_phase(
phase_id="dependent",
name="依赖阶段",
depends_on=["trigger"],
)
plan = _make_plan(phases=[trigger, dependent])
debate = orchestrator._insert_debate_phase(
plan, trigger, "产出分歧", ["member1", "member2"]
)
assert debate is not None
assert debate.phase_type == PhaseType.DEBATE
assert debate.depends_on == ["trigger"]
assert debate.debate_config["topic"] == "产出分歧"
assert debate.debate_config["participants"] == ["member1", "member2"]
assert debate.debate_config["max_rounds"] == 2
# dependent 现在依赖 debate不再直接依赖 trigger
assert debate.id in dependent.depends_on
assert "trigger" not in dependent.depends_on
# debate 被加入 plan
assert debate in plan.phases
assert orchestrator._debate_count == 1
def test_returns_none_when_no_participants(self):
"""participants 为空时返回 None"""
gateway = _make_bool_gateway([])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
trigger = _make_execution_phase(phase_id="trigger")
plan = _make_plan(phases=[trigger])
debate = orchestrator._insert_debate_phase(
plan, trigger, "产出分歧", []
)
assert debate is None
assert orchestrator._debate_count == 0
def test_debate_assigned_to_lead(self):
"""DEBATE phase 的 assigned_expert 是 lead"""
gateway = _make_bool_gateway([])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
trigger = _make_execution_phase(phase_id="trigger")
plan = _make_plan(phases=[trigger])
debate = orchestrator._insert_debate_phase(
plan, trigger, "分歧", ["member1"]
)
assert debate is not None
assert debate.assigned_expert == "lead"
# ── 协调入口测试 ─────────────────────────────────────────
class TestCheckDivergenceAndInsertDebates:
"""_check_divergence_and_insert_debates 测试"""
@pytest.mark.asyncio
async def test_inserts_debate_when_divergence_detected(self):
"""检测到分歧 → 插入辩论 + 广播 plan_update"""
gateway = _make_bool_gateway([True]) # 检测到分歧
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase_a = _make_execution_phase(
phase_id="a",
name="阶段A",
status=PhaseStatus.COMPLETED,
result={"content": "采用 React"},
)
phase_b = _make_execution_phase(
phase_id="b",
name="阶段B",
status=PhaseStatus.COMPLETED,
result={"content": "采用 Vue"},
)
plan = _make_plan(phases=[phase_a, phase_b])
await orchestrator._check_divergence_and_insert_debates(
team.lead_expert, plan, [phase_a]
)
# 应该插入一个 DEBATE phase
assert len(plan.phases) == 3
debate = plan.phases[-1]
assert debate.phase_type == PhaseType.DEBATE
assert orchestrator._debate_count == 1
# 应该广播 plan_update 事件
transport = team._handoff_transport
assert transport.send.called
# 最后一次 send 应该是 plan_update
last_call = transport.send.call_args_list[-1]
event_data = last_call[0][1] # 第二个位置参数是 data dict
assert event_data["type"] == "plan_update"
assert "debate_inserted" in event_data
@pytest.mark.asyncio
async def test_no_debate_when_no_divergence(self):
"""无分歧 → 不插入辩论"""
gateway = _make_bool_gateway([False])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase_a = _make_execution_phase(
phase_id="a",
status=PhaseStatus.COMPLETED,
result={"content": "结果A"},
)
phase_b = _make_execution_phase(
phase_id="b",
status=PhaseStatus.COMPLETED,
result={"content": "结果B"},
)
plan = _make_plan(phases=[phase_a, phase_b])
await orchestrator._check_divergence_and_insert_debates(
team.lead_expert, plan, [phase_a]
)
assert len(plan.phases) == 2
assert orchestrator._debate_count == 0
@pytest.mark.asyncio
async def test_skips_when_max_debates_reached(self):
"""MAX_DEBATES 达上限时跳过检测"""
gateway = _make_bool_gateway([True])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
orchestrator._debate_count = orchestrator.MAX_DEBATES
phase_a = _make_execution_phase(
phase_id="a",
status=PhaseStatus.COMPLETED,
result={"content": "结果A"},
)
phase_b = _make_execution_phase(
phase_id="b",
status=PhaseStatus.COMPLETED,
result={"content": "结果B"},
)
plan = _make_plan(phases=[phase_a, phase_b])
await orchestrator._check_divergence_and_insert_debates(
team.lead_expert, plan, [phase_a]
)
assert len(plan.phases) == 2
assert orchestrator._debate_count == orchestrator.MAX_DEBATES
@pytest.mark.asyncio
async def test_skips_non_completed_phases(self):
"""非 COMPLETED 状态的 phase 被跳过"""
gateway = _make_bool_gateway([True])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
# 传入一个 PENDING 的 phase不应该被检测
phase_pending = _make_execution_phase(
phase_id="pending", status=PhaseStatus.PENDING
)
phase_completed = _make_execution_phase(
phase_id="completed",
status=PhaseStatus.COMPLETED,
result={"content": "结果"},
)
plan = _make_plan(phases=[phase_pending, phase_completed])
await orchestrator._check_divergence_and_insert_debates(
team.lead_expert, plan, [phase_pending, phase_completed]
)
# phase_pending 被跳过phase_completed 无其他完成阶段可比较 → 无分歧
assert orchestrator._debate_count == 0
# ── 集成测试 ─────────────────────────────────────────────
class TestInsertedDebateLayering:
"""插入的 DEBATE phase 在 topological_sort 中正确分层"""
def test_inserted_debate_blocks_dependents(self):
"""插入的 DEBATE phase 应该在 trigger 之后、dependent 之前"""
gateway = _make_bool_gateway([])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
trigger = _make_execution_phase(
phase_id="trigger",
name="触发阶段",
status=PhaseStatus.COMPLETED,
result={"content": "触发结果"},
)
dependent = _make_execution_phase(
phase_id="dependent",
name="依赖阶段",
depends_on=["trigger"],
)
plan = _make_plan(phases=[trigger, dependent])
debate = orchestrator._insert_debate_phase(
plan, trigger, "分歧", ["member1", "member2"]
)
assert debate is not None
layers = plan.topological_sort()
# 找到各 phase 所在的层
trigger_layer = None
debate_layer = None
dependent_layer = None
for i, layer in enumerate(layers):
for ph in layer:
if ph.id == "trigger":
trigger_layer = i
elif ph.id == debate.id:
debate_layer = i
elif ph.id == "dependent":
dependent_layer = i
assert trigger_layer is not None
assert debate_layer is not None
assert dependent_layer is not None
# trigger < debate < dependent
assert trigger_layer < debate_layer
assert debate_layer < dependent_layer
@pytest.mark.asyncio
async def test_plan_review_debate_runs_first(self):
"""方案评审 DEBATE 应该在第 0 层,所有执行阶段在后续层"""
gateway = _make_bool_gateway([True])
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phases = [
_make_execution_phase(phase_id="p1", name="阶段一"),
_make_execution_phase(phase_id="p2", name="阶段二"),
_make_execution_phase(phase_id="p3", name="阶段三"),
]
plan = _make_plan(phases=phases, task="复杂任务")
await orchestrator._maybe_add_plan_review_debate(
team.lead_expert, plan, "复杂任务"
)
layers = plan.topological_sort()
# 第 0 层应该只有方案评审 DEBATE
assert len(layers[0]) == 1
assert layers[0][0].phase_type == PhaseType.DEBATE
assert layers[0][0].name == "方案评审"
# 所有执行阶段在后续层
for layer in layers[1:]:
for ph in layer:
assert ph.phase_type == PhaseType.EXECUTION