fischer-agentkit/tests/unit/experts/test_orchestrator_debate.py

924 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""TeamOrchestrator 辩论阶段执行器单元测试 (U2)
测试覆盖:
- Happy path: 2 轮辩论2 个专家参与Lead 裁决产出结论
- 边界: max_rounds=1 时只辩论一轮就裁决
- 边界: participants 为空时Lead 直接给出结论(无辩论)
- 用户停止: 辩论中收到 /stop提前结束并裁决
- 逃生舱: debate_config.skip=true 时直接跳过
- 错误路径: LLM 不可用时Lead 用模板文本裁决,不抛异常
- 集成: 辩论结论写入 SharedWorkspace
- 事件广播: debate_started / expert_argument / debate_round_summary / debate_resolved
- 干预通道: _consume_team_interventions getattr 回退U4 兼容)
"""
from __future__ import annotations
import json
from unittest.mock import AsyncMock, MagicMock
import pytest
from agentkit.core.handoff_transport import InProcessHandoffTransport
from agentkit.experts.config import ExpertConfig
from agentkit.experts.expert import Expert
from agentkit.experts.orchestrator import TeamOrchestrator
from agentkit.experts.plan import PhaseStatus, PhaseType, PlanPhase, TeamPlan
from agentkit.experts.team import ExpertTeam
# ── 辅助函数 ──────────────────────────────────────────────
def _make_expert_config(
name: str = "test_expert",
is_lead: bool = False,
llm: dict | None = None,
) -> ExpertConfig:
"""创建测试用 ExpertConfig含辩论 prompt 所需的角色字段)"""
return ExpertConfig(
name=name,
agent_type="expert",
persona=f"{name}的角色描述",
thinking_style="逻辑推理",
speaking_style="简洁直接",
decision_framework="数据驱动决策",
bound_skills=["skill_a"],
is_lead=is_lead,
task_mode="llm_generate",
prompt={"identity": "测试"},
llm=llm,
)
def _make_mock_expert(
name: str = "test_expert",
is_lead: bool = False,
is_active: bool = True,
llm: dict | None = None,
gateway: MagicMock | None = None,
) -> MagicMock:
"""创建 mock Expert
Args:
gateway: 如果提供,设置到 expert.agent._llm_gateway 上
"""
config = _make_expert_config(name=name, is_lead=is_lead, llm=llm)
expert = MagicMock(spec=Expert)
expert.config = config
expert.is_active = is_active
expert.team_id = None
expert.get_capabilities_summary.return_value = {
"name": name,
"persona": config.persona,
"thinking_style": config.thinking_style,
"bound_skills": config.bound_skills,
"is_lead": is_lead,
}
mock_agent = MagicMock()
mock_agent._llm_gateway = gateway
expert.agent = mock_agent
return expert
def _make_team_with_experts(
expert_names: list[str] | None = None,
lead_name: str = "lead",
gateway: MagicMock | None = None,
) -> ExpertTeam:
"""创建包含 mock experts 的 ExpertTeam
Args:
gateway: 如果提供,设置到所有 expert 的 agent._llm_gateway 上
"""
team = ExpertTeam()
transport = AsyncMock(spec=InProcessHandoffTransport)
team._handoff_transport = transport
if expert_names is None:
expert_names = [lead_name, "member1", "member2"]
for name in expert_names:
is_lead = name == lead_name
expert = _make_mock_expert(name=name, is_lead=is_lead, gateway=gateway)
team._experts[name] = expert
if is_lead:
team._lead_expert_name = name
return team
def _make_smart_llm_gateway(
opening: str = "开场:我们需要讨论这个分歧点。",
argument_template: str = "[{expert}] 我认为应该采用这个方案。",
summary: str = "本轮小结:双方各有道理。",
verdict: dict | None = None,
) -> AsyncMock:
"""创建智能 mock LLM gateway根据 prompt 内容返回不同响应
通过 prompt 关键词区分:开场 / 论点 / 小结 / 裁决
避免依赖并行调用顺序。
"""
if verdict is None:
verdict = {
"decision": "adopt",
"rationale": "甲方论据更充分",
"conclusion": "采纳甲方方案,按此执行。",
}
verdict_json = json.dumps(verdict, ensure_ascii=False)
async def chat_side_effect(messages, model=None, **kwargs):
prompt = messages[0]["content"] if messages else ""
response = MagicMock()
# Order matters: check most specific first — verdict/summary prompts
# contain debate history which includes opening/argument text.
if "最终裁决" in prompt:
response.content = f"```json\n{verdict_json}\n```"
elif "小结本轮辩论" in prompt:
response.content = summary
elif "发表你的论点" in prompt:
# Extract expert name from prompt: "你是 {name},正在参加"
import re
name_match = re.search(r"你是 (\w+),正在参加", prompt)
expert_name = name_match.group(1) if name_match else "expert"
response.content = argument_template.format(expert=expert_name)
elif "主持人开场" in prompt:
response.content = opening
else:
response.content = "默认响应"
return response
gateway = AsyncMock()
gateway.chat = AsyncMock(side_effect=chat_side_effect)
return gateway
def _make_debate_phase(
phase_id: str = "debate_1",
name: str = "架构辩论",
topic: str = "前端框架选型React vs Vue",
participants: list[str] | None = None,
max_rounds: int = 2,
skip: bool = False,
depends_on: list[str] | None = None,
assigned_expert: str = "lead",
) -> PlanPhase:
"""创建测试用 DEBATE 阶段"""
if participants is None:
participants = ["member1", "member2"]
debate_config: dict = {
"topic": topic,
"participants": participants,
"max_rounds": max_rounds,
}
if skip:
debate_config["skip"] = True
return PlanPhase(
id=phase_id,
name=name,
assigned_expert=assigned_expert,
task_description=topic,
depends_on=depends_on or [],
phase_type=PhaseType.DEBATE,
debate_config=debate_config,
)
def _make_plan_with_debate_phase(phase: PlanPhase) -> TeamPlan:
"""创建包含单个 DEBATE 阶段的 TeamPlan"""
return TeamPlan(
id="test_plan",
task="测试辩论任务",
phases=[phase],
lead_expert="lead",
)
# ── Happy Path 测试 ───────────────────────────────────────
class TestDebatePhaseHappyPath:
"""辩论阶段 happy path 测试"""
@pytest.mark.asyncio
async def test_two_rounds_two_experts_completes(self):
"""2 轮辩论2 个专家参与phase 状态变为 COMPLETED"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(max_rounds=2, participants=["member1", "member2"])
plan = _make_plan_with_debate_phase(phase)
result = await orchestrator._execute_debate_phase(phase, plan)
assert phase.status == PhaseStatus.COMPLETED
assert result["content"] == "采纳甲方方案,按此执行。"
assert result["decision"] == "adopt"
assert "verdict" in result
assert result["verdict"]["decision"] == "adopt"
@pytest.mark.asyncio
async def test_debate_produces_verdict_with_required_fields(self):
"""辩论裁决包含 decision / rationale / conclusion 三个字段"""
gateway = _make_smart_llm_gateway(
verdict={
"decision": "compromise",
"rationale": "双方各有优势",
"conclusion": "采用折中方案。",
}
)
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(max_rounds=1, participants=["member1"])
plan = _make_plan_with_debate_phase(phase)
result = await orchestrator._execute_debate_phase(phase, plan)
assert result["verdict"]["decision"] == "compromise"
assert result["verdict"]["rationale"] == "双方各有优势"
assert result["verdict"]["conclusion"] == "采用折中方案。"
@pytest.mark.asyncio
async def test_debate_emits_debate_started_event(self):
"""辩论开始时广播 debate_started 事件"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(max_rounds=1, participants=["member1"])
plan = _make_plan_with_debate_phase(phase)
await orchestrator._execute_debate_phase(phase, plan)
calls = team._handoff_transport.send.call_args_list
event_types = [c[0][1]["type"] for c in calls]
assert "debate_started" in event_types
@pytest.mark.asyncio
async def test_debate_emits_expert_argument_events(self):
"""每个专家发言时广播 expert_argument 事件"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(max_rounds=1, participants=["member1", "member2"])
plan = _make_plan_with_debate_phase(phase)
await orchestrator._execute_debate_phase(phase, plan)
calls = team._handoff_transport.send.call_args_list
argument_events = [c[0][1] for c in calls if c[0][1].get("type") == "expert_argument"]
# 2 experts × 1 round = 2 argument events
assert len(argument_events) == 2
expert_ids = {e["expert_id"] for e in argument_events}
assert expert_ids == {"member1", "member2"}
@pytest.mark.asyncio
async def test_debate_emits_round_summary_events(self):
"""每轮辩论结束时广播 debate_round_summary 事件"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(max_rounds=2, participants=["member1", "member2"])
plan = _make_plan_with_debate_phase(phase)
await orchestrator._execute_debate_phase(phase, plan)
calls = team._handoff_transport.send.call_args_list
summary_events = [
c[0][1] for c in calls if c[0][1].get("type") == "debate_round_summary"
]
assert len(summary_events) == 2 # 2 rounds
# Round 1 summary should have continue=True, round 2 continue=False
assert summary_events[0]["round"] == 1
assert summary_events[0]["continue"] is True
assert summary_events[1]["round"] == 2
assert summary_events[1]["continue"] is False
@pytest.mark.asyncio
async def test_debate_emits_debate_resolved_event(self):
"""辩论裁决时广播 debate_resolved 事件"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(max_rounds=1, participants=["member1"])
plan = _make_plan_with_debate_phase(phase)
await orchestrator._execute_debate_phase(phase, plan)
calls = team._handoff_transport.send.call_args_list
resolved_events = [
c[0][1] for c in calls if c[0][1].get("type") == "debate_resolved"
]
assert len(resolved_events) == 1
assert resolved_events[0]["decision"] == "adopt"
assert "conclusion" in resolved_events[0]
@pytest.mark.asyncio
async def test_debate_emits_phase_completed_event(self):
"""辩论阶段完成时广播 phase_completed 事件(与 EXECUTION 阶段一致)"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(max_rounds=1, participants=["member1"])
plan = _make_plan_with_debate_phase(phase)
await orchestrator._execute_debate_phase(phase, plan)
calls = team._handoff_transport.send.call_args_list
completed_events = [
c[0][1] for c in calls if c[0][1].get("type") == "phase_completed"
]
assert len(completed_events) == 1
assert completed_events[0]["phase_id"] == phase.id
# ── 边界测试 ──────────────────────────────────────────────
class TestDebatePhaseMaxRounds:
"""max_rounds 边界测试"""
@pytest.mark.asyncio
async def test_max_rounds_one_single_round(self):
"""max_rounds=1 时只辩论一轮就裁决"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(max_rounds=1, participants=["member1", "member2"])
plan = _make_plan_with_debate_phase(phase)
await orchestrator._execute_debate_phase(phase, plan)
# Count expert_argument events: 2 experts × 1 round = 2
calls = team._handoff_transport.send.call_args_list
argument_events = [
c[0][1] for c in calls if c[0][1].get("type") == "expert_argument"
]
assert len(argument_events) == 2
# Count summary events: 1 round = 1 summary
summary_events = [
c[0][1] for c in calls if c[0][1].get("type") == "debate_round_summary"
]
assert len(summary_events) == 1
@pytest.mark.asyncio
async def test_max_rounds_capped_at_max_debate_rounds(self):
"""max_rounds 超过 MAX_DEBATE_ROUNDS 时被截断"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
# Request 10 rounds, should be capped to MAX_DEBATE_ROUNDS (4)
phase = _make_debate_phase(max_rounds=10, participants=["member1"])
plan = _make_plan_with_debate_phase(phase)
await orchestrator._execute_debate_phase(phase, plan)
calls = team._handoff_transport.send.call_args_list
summary_events = [
c[0][1] for c in calls if c[0][1].get("type") == "debate_round_summary"
]
assert len(summary_events) == TeamOrchestrator.MAX_DEBATE_ROUNDS
class TestDebatePhaseEmptyParticipants:
"""participants 为空时的边界测试"""
@pytest.mark.asyncio
async def test_empty_participants_lead_directly_adjudicates(self):
"""participants 为空时Lead 直接给出结论(无辩论轮次)"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(participants=[], max_rounds=3)
plan = _make_plan_with_debate_phase(phase)
result = await orchestrator._execute_debate_phase(phase, plan)
assert phase.status == PhaseStatus.COMPLETED
# Should still have a conclusion from Lead verdict
assert "content" in result
# No expert_argument events should be emitted
calls = team._handoff_transport.send.call_args_list
argument_events = [
c[0][1] for c in calls if c[0][1].get("type") == "expert_argument"
]
assert len(argument_events) == 0
# No round summary events
summary_events = [
c[0][1] for c in calls if c[0][1].get("type") == "debate_round_summary"
]
assert len(summary_events) == 0
@pytest.mark.asyncio
async def test_empty_participants_still_emits_debate_started(self):
"""participants 为空时仍广播 debate_started含空 participants 列表)"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(participants=[], max_rounds=2)
plan = _make_plan_with_debate_phase(phase)
await orchestrator._execute_debate_phase(phase, plan)
calls = team._handoff_transport.send.call_args_list
started_events = [
c[0][1] for c in calls if c[0][1].get("type") == "debate_started"
]
assert len(started_events) == 1
assert started_events[0]["participants"] == []
# ── 用户停止测试 ──────────────────────────────────────────
class TestDebatePhaseUserStop:
"""用户 /stop 干预测试"""
@pytest.mark.asyncio
async def test_stop_command_ends_debate_early(self):
"""辩论中收到 /stop提前结束并裁决"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
# Mock intervention queue: return /stop on first check (round 1)
team.consume_user_interventions = MagicMock(return_value=["/stop"])
phase = _make_debate_phase(max_rounds=3, participants=["member1", "member2"])
plan = _make_plan_with_debate_phase(phase)
result = await orchestrator._execute_debate_phase(phase, plan)
assert phase.status == PhaseStatus.COMPLETED
# Should still produce a verdict
assert "content" in result
# No expert_argument events — stopped before round 1 arguments
calls = team._handoff_transport.send.call_args_list
argument_events = [
c[0][1] for c in calls if c[0][1].get("type") == "expert_argument"
]
assert len(argument_events) == 0
@pytest.mark.asyncio
async def test_chinese_stop_command_ends_debate(self):
"""中文 '停止' 命令也能结束辩论"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
team.consume_user_interventions = MagicMock(return_value=["停止"])
phase = _make_debate_phase(max_rounds=3, participants=["member1"])
plan = _make_plan_with_debate_phase(phase)
await orchestrator._execute_debate_phase(phase, plan)
assert phase.status == PhaseStatus.COMPLETED
calls = team._handoff_transport.send.call_args_list
argument_events = [
c[0][1] for c in calls if c[0][1].get("type") == "expert_argument"
]
assert len(argument_events) == 0
@pytest.mark.asyncio
async def test_non_stop_intervention_does_not_end_debate(self):
"""非停止命令的干预不会结束辩论"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
# Non-stop intervention should not end the debate
team.consume_user_interventions = MagicMock(return_value=["继续讨论"])
phase = _make_debate_phase(max_rounds=1, participants=["member1", "member2"])
plan = _make_plan_with_debate_phase(phase)
await orchestrator._execute_debate_phase(phase, plan)
# Debate should proceed normally — arguments emitted
calls = team._handoff_transport.send.call_args_list
argument_events = [
c[0][1] for c in calls if c[0][1].get("type") == "expert_argument"
]
assert len(argument_events) == 2 # 2 experts × 1 round
# ── 逃生舱测试 ────────────────────────────────────────────
class TestDebatePhaseSkipEscapeHatch:
"""skip=True 逃生舱测试"""
@pytest.mark.asyncio
async def test_skip_true_short_circuits_debate(self):
"""debate_config.skip=true 时直接跳过phase 状态 COMPLETED"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(skip=True, participants=["member1", "member2"])
plan = _make_plan_with_debate_phase(phase)
result = await orchestrator._execute_debate_phase(phase, plan)
assert phase.status == PhaseStatus.COMPLETED
assert result["content"] == "无需辩论"
assert result["skipped"] is True
@pytest.mark.asyncio
async def test_skip_true_does_not_call_llm(self):
"""skip=true 时不调用 LLM"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(skip=True)
plan = _make_plan_with_debate_phase(phase)
await orchestrator._execute_debate_phase(phase, plan)
# LLM should not be called at all
gateway.chat.assert_not_awaited()
@pytest.mark.asyncio
async def test_skip_true_emits_debate_resolved_with_skipped_decision(self):
"""skip=true 时广播 debate_resolved 事件decision='skipped'"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(skip=True)
plan = _make_plan_with_debate_phase(phase)
await orchestrator._execute_debate_phase(phase, plan)
calls = team._handoff_transport.send.call_args_list
resolved_events = [
c[0][1] for c in calls if c[0][1].get("type") == "debate_resolved"
]
assert len(resolved_events) == 1
assert resolved_events[0]["decision"] == "skipped"
assert resolved_events[0]["conclusion"] == "无需辩论"
@pytest.mark.asyncio
async def test_skip_true_no_debate_started_event(self):
"""skip=true 时不广播 debate_started 事件"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(skip=True)
plan = _make_plan_with_debate_phase(phase)
await orchestrator._execute_debate_phase(phase, plan)
calls = team._handoff_transport.send.call_args_list
event_types = [c[0][1]["type"] for c in calls]
assert "debate_started" not in event_types
assert "expert_argument" not in event_types
# ── LLM 不可用错误路径测试 ────────────────────────────────
class TestDebatePhaseLLMUnavailable:
"""LLM 不可用时的错误路径测试"""
@pytest.mark.asyncio
async def test_no_llm_gateway_uses_template_verdict(self):
"""LLM 不可用时Lead 用模板文本裁决,不抛异常"""
# No gateway provided — all experts have _llm_gateway=None
team = _make_team_with_experts(gateway=None)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(max_rounds=2, participants=["member1", "member2"])
plan = _make_plan_with_debate_phase(phase)
result = await orchestrator._execute_debate_phase(phase, plan)
assert phase.status == PhaseStatus.COMPLETED
# Should have a template conclusion (not raise)
assert "content" in result
assert result["decision"] == "inconclusive"
@pytest.mark.asyncio
async def test_no_llm_gateway_opening_uses_template(self):
"""LLM 不可用时,开场使用模板文本"""
team = _make_team_with_experts(gateway=None)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(max_rounds=1, participants=["member1"])
plan = _make_plan_with_debate_phase(phase)
await orchestrator._execute_debate_phase(phase, plan)
calls = team._handoff_transport.send.call_args_list
started_events = [
c[0][1] for c in calls if c[0][1].get("type") == "debate_started"
]
assert len(started_events) == 1
# Opening should contain the topic (template text)
assert "前端框架选型" in started_events[0]["opening"]
@pytest.mark.asyncio
async def test_llm_gateway_exception_does_not_crash(self):
"""LLM gateway 抛异常时不崩溃,用模板裁决"""
gateway = AsyncMock()
gateway.chat = AsyncMock(side_effect=RuntimeError("LLM service down"))
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(max_rounds=1, participants=["member1"])
plan = _make_plan_with_debate_phase(phase)
result = await orchestrator._execute_debate_phase(phase, plan)
assert phase.status == PhaseStatus.COMPLETED
assert result["decision"] == "inconclusive"
@pytest.mark.asyncio
async def test_verdict_json_parse_failure_returns_inconclusive(self):
"""裁决 JSON 解析失败时返回 inconclusive"""
gateway = AsyncMock()
# Return non-JSON for all calls
response = MagicMock()
response.content = "这不是JSON格式"
gateway.chat = AsyncMock(return_value=response)
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(max_rounds=1, participants=["member1"])
plan = _make_plan_with_debate_phase(phase)
result = await orchestrator._execute_debate_phase(phase, plan)
assert phase.status == PhaseStatus.COMPLETED
assert result["decision"] == "inconclusive"
# Conclusion should fall back to raw content
assert "content" in result
# ── SharedWorkspace 集成测试 ──────────────────────────────
class TestDebatePhaseSharedWorkspace:
"""辩论结论写入 SharedWorkspace 测试"""
@pytest.mark.asyncio
async def test_conclusion_written_to_workspace(self):
"""辩论结论写入 SharedWorkspace"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(max_rounds=1, participants=["member1"])
plan = _make_plan_with_debate_phase(phase)
await orchestrator._execute_debate_phase(phase, plan)
# Verify workspace has the debate output
workspace = team.workspace
output_key = f"{plan.id}/phase/{phase.id}/output"
data = await workspace.read(output_key)
assert data is not None
assert data["value"] == "采纳甲方方案,按此执行。"
assert data["agent_id"] == "lead"
@pytest.mark.asyncio
async def test_phase_result_stored_on_phase_object(self):
"""辩论结果存储在 phase.result 上"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(max_rounds=1, participants=["member1"])
plan = _make_plan_with_debate_phase(phase)
await orchestrator._execute_debate_phase(phase, plan)
assert phase.result is not None
assert phase.result["content"] == "采纳甲方方案,按此执行。"
assert phase.result["decision"] == "adopt"
assert "verdict" in phase.result
# ── 干预通道兼容性测试 ────────────────────────────────────
class TestInterventionChannelCompatibility:
"""干预通道 getattr 回退测试U4 兼容)"""
@pytest.mark.asyncio
async def test_no_intervention_method_returns_empty(self):
"""team 没有 consume_user_interventions 方法时返回空列表"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
# ExpertTeam doesn't have consume_user_interventions yet (U4 not implemented)
assert not hasattr(team, "consume_user_interventions")
phase = _make_debate_phase(max_rounds=1, participants=["member1"])
plan = _make_plan_with_debate_phase(phase)
# Should not raise — falls back to empty list
await orchestrator._execute_debate_phase(phase, plan)
assert phase.status == PhaseStatus.COMPLETED
@pytest.mark.asyncio
async def test_intervention_method_exception_returns_empty(self):
"""consume_user_interventions 抛异常时返回空列表"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
# Set a broken intervention method
team.consume_user_interventions = MagicMock(side_effect=RuntimeError("broken"))
phase = _make_debate_phase(max_rounds=1, participants=["member1"])
plan = _make_plan_with_debate_phase(phase)
# Should not raise — exception caught, returns empty list
await orchestrator._execute_debate_phase(phase, plan)
assert phase.status == PhaseStatus.COMPLETED
# ── Phase 分发测试 ────────────────────────────────────────
class TestPhaseDispatch:
"""_execute_phase 分发器测试"""
@pytest.mark.asyncio
async def test_execution_phase_dispatches_to_execution_method(self):
"""EXECUTION 类型阶段分发到 _execute_execution_phase"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
# Mock both execution methods to track dispatch
orchestrator._execute_execution_phase = AsyncMock(
return_value={"content": "execution result"}
)
orchestrator._execute_debate_phase = AsyncMock(
return_value={"content": "debate result"}
)
phase = PlanPhase(name="执行阶段", assigned_expert="lead", task_description="任务")
plan = _make_plan_with_debate_phase(phase)
await orchestrator._execute_phase(phase, plan)
orchestrator._execute_execution_phase.assert_awaited_once_with(phase, plan)
orchestrator._execute_debate_phase.assert_not_awaited()
@pytest.mark.asyncio
async def test_debate_phase_dispatches_to_debate_method(self):
"""DEBATE 类型阶段分发到 _execute_debate_phase"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
orchestrator._execute_execution_phase = AsyncMock(
return_value={"content": "execution result"}
)
orchestrator._execute_debate_phase = AsyncMock(
return_value={"content": "debate result"}
)
phase = _make_debate_phase()
plan = _make_plan_with_debate_phase(phase)
await orchestrator._execute_phase(phase, plan)
orchestrator._execute_debate_phase.assert_awaited_once_with(phase, plan)
orchestrator._execute_execution_phase.assert_not_awaited()
# ── 辅助方法单元测试 ──────────────────────────────────────
class TestHelperMethods:
"""辅助方法单元测试"""
def test_has_stop_command_detects_stop_commands(self):
"""_has_stop_command 检测停止命令"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
assert orchestrator._has_stop_command(["/stop"]) is True
assert orchestrator._has_stop_command(["停止"]) is True
assert orchestrator._has_stop_command(["stop"]) is True
assert orchestrator._has_stop_command(["结束"]) is True
def test_has_stop_command_ignores_non_stop(self):
"""_has_stop_command 忽略非停止命令"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
assert orchestrator._has_stop_command(["继续"]) is False
assert orchestrator._has_stop_command(["/continue"]) is False
assert orchestrator._has_stop_command([]) is False
def test_has_stop_command_case_insensitive(self):
"""_has_stop_command 大小写不敏感"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
assert orchestrator._has_stop_command(["STOP"]) is True
assert orchestrator._has_stop_command([" /stop "]) is True
def test_format_debate_history_empty(self):
"""_format_debate_history 空历史返回空字符串"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
assert orchestrator._format_debate_history([]) == ""
def test_format_debate_history_with_entries(self):
"""_format_debate_history 格式化历史条目"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
history = [
{"expert": "lead", "content": "开场白", "round": 0, "role": "moderator"},
{"expert": "member1", "content": "我的论点", "round": 1, "role": "expert"},
]
result = orchestrator._format_debate_history(history)
assert "开场白" in result
assert "我的论点" in result
assert "主持人" in result
assert "专家" in result
assert "[开场]" in result
assert "[第1轮]" in result
def test_build_dependency_context_no_deps(self):
"""_build_dependency_context 无依赖时返回空字符串"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
phase = _make_debate_phase(depends_on=[])
plan = _make_plan_with_debate_phase(phase)
assert orchestrator._build_dependency_context(phase, plan) == ""
def test_build_dependency_context_with_completed_dep(self):
"""_build_dependency_context 包含已完成依赖的输出"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
# Create a dependency phase that's completed
dep_phase = PlanPhase(
id="dep_1",
name="前置阶段",
assigned_expert="lead",
task_description="前置任务",
depends_on=[],
)
dep_phase.status = PhaseStatus.COMPLETED
dep_phase.result = {"content": "前置阶段输出内容"}
debate_phase = _make_debate_phase(depends_on=["dep_1"])
plan = TeamPlan(
id="test_plan",
task="测试",
phases=[dep_phase, debate_phase],
lead_expert="lead",
)
context = orchestrator._build_dependency_context(debate_phase, plan)
assert "前置阶段" in context
assert "前置阶段输出内容" in context
def test_build_dependency_context_ignores_incomplete_dep(self):
"""_build_dependency_context 忽略未完成的依赖"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
# Dependency phase is still PENDING
dep_phase = PlanPhase(
id="dep_1",
name="前置阶段",
assigned_expert="lead",
task_description="前置任务",
)
debate_phase = _make_debate_phase(depends_on=["dep_1"])
plan = TeamPlan(
id="test_plan",
task="测试",
phases=[dep_phase, debate_phase],
lead_expert="lead",
)
context = orchestrator._build_dependency_context(debate_phase, plan)
assert context == ""