fischer-agentkit/tests/unit/experts/test_team_intervention.py

485 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""ExpertTeam 用户干预通道 + TeamOrchestrator 干预处理单元测试 (U4)
测试覆盖:
- ExpertTeam 干预队列
* add_user_intervention → consume_user_interventions 返回消息
* 多条干预消息累积,一次性消费
* consume 后队列清空,再次 consume 返回空
* broadcast_user_message 同时入队干预队列
- TeamOrchestrator._process_interventions
* /stop → 返回 True终止执行+ 广播 plan_update
* /debate <topic> → 插入 DEBATE phase + 广播 plan_update
* /debate 受 MAX_DEBATES 限制
* /debate 无 topic 时忽略
* 普通文本 → 累积到 _user_context
* 空干预队列 → 返回 False无副作用
- 集成: _user_context 影响 synthesis prompt
"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock
import pytest
from agentkit.core.handoff_transport import InProcessHandoffTransport
from agentkit.experts.config import ExpertConfig
from agentkit.experts.orchestrator import TeamOrchestrator
from agentkit.experts.plan import PhaseStatus, PhaseType, PlanPhase, TeamPlan
from agentkit.experts.team import ExpertTeam
# ── 辅助函数 ──────────────────────────────────────────────
def _make_expert_config(name: str = "test_expert", is_lead: bool = False) -> ExpertConfig:
return ExpertConfig(
name=name,
agent_type="expert",
persona=f"{name}的角色描述",
thinking_style="逻辑推理",
speaking_style="简洁直接",
decision_framework="数据驱动决策",
bound_skills=["skill_a"],
is_lead=is_lead,
task_mode="llm_generate",
prompt={"identity": "测试"},
)
def _make_mock_expert(
name: str = "test_expert",
is_lead: bool = False,
is_active: bool = True,
gateway: MagicMock | None = None,
) -> MagicMock:
config = _make_expert_config(name=name, is_lead=is_lead)
expert = MagicMock()
expert.config = config
expert.is_active = is_active
expert.team_id = None
expert.get_capabilities_summary.return_value = {
"name": name,
"persona": config.persona,
"thinking_style": config.thinking_style,
"bound_skills": config.bound_skills,
"is_lead": is_lead,
}
mock_agent = MagicMock()
mock_agent._llm_gateway = gateway
expert.agent = mock_agent
return expert
def _make_team_with_experts(
expert_names: list[str] | None = None,
lead_name: str = "lead",
gateway: MagicMock | None = None,
) -> ExpertTeam:
team = ExpertTeam()
transport = AsyncMock(spec=InProcessHandoffTransport)
team._handoff_transport = transport
if expert_names is None:
expert_names = [lead_name, "member1", "member2"]
for name in expert_names:
is_lead = name == lead_name
expert = _make_mock_expert(name=name, is_lead=is_lead, gateway=gateway)
team._experts[name] = expert
if is_lead:
team._lead_expert_name = name
return team
def _make_execution_phase(
phase_id: str = "phase_1",
name: str = "阶段一",
assigned_expert: str = "member1",
depends_on: list[str] | None = None,
status: PhaseStatus = PhaseStatus.PENDING,
result: dict | None = None,
) -> PlanPhase:
return PlanPhase(
id=phase_id,
name=name,
assigned_expert=assigned_expert,
task_description=f"{name}的任务描述",
depends_on=depends_on or [],
phase_type=PhaseType.EXECUTION,
status=status,
result=result,
)
def _make_plan(
phases: list[PlanPhase],
task: str = "测试任务",
lead_expert: str = "lead",
) -> TeamPlan:
return TeamPlan(
id="test_plan",
task=task,
phases=phases,
lead_expert=lead_expert,
)
# ── ExpertTeam 干预队列测试 ──────────────────────────────
class TestExpertTeamInterventionQueue:
"""ExpertTeam 干预队列基础功能测试"""
@pytest.mark.asyncio
async def test_add_and_consume_intervention(self):
"""add_user_intervention → consume_user_interventions 返回消息"""
team = ExpertTeam()
team._handoff_transport = AsyncMock(spec=InProcessHandoffTransport)
await team.add_user_intervention("/debate 前端框架选型")
interventions = team.consume_user_interventions()
assert interventions == ["/debate 前端框架选型"]
@pytest.mark.asyncio
async def test_multiple_interventions_accumulate(self):
"""多条干预消息累积,一次性消费"""
team = ExpertTeam()
team._handoff_transport = AsyncMock(spec=InProcessHandoffTransport)
await team.add_user_intervention("第一条")
await team.add_user_intervention("第二条")
await team.add_user_intervention("第三条")
interventions = team.consume_user_interventions()
assert len(interventions) == 3
assert interventions[0] == "第一条"
assert interventions[1] == "第二条"
assert interventions[2] == "第三条"
@pytest.mark.asyncio
async def test_consume_clears_queue(self):
"""consume 后队列清空,再次 consume 返回空"""
team = ExpertTeam()
team._handoff_transport = AsyncMock(spec=InProcessHandoffTransport)
await team.add_user_intervention("消息")
first = team.consume_user_interventions()
assert len(first) == 1
second = team.consume_user_interventions()
assert second == []
def test_consume_empty_queue_returns_empty_list(self):
"""空队列 consume 返回空列表"""
team = ExpertTeam()
interventions = team.consume_user_interventions()
assert interventions == []
@pytest.mark.asyncio
async def test_broadcast_user_message_enqueues_intervention(self):
"""broadcast_user_message 同时入队干预队列"""
team = ExpertTeam()
team._handoff_transport = AsyncMock(spec=InProcessHandoffTransport)
await team.broadcast_user_message("测试消息")
interventions = team.consume_user_interventions()
assert interventions == ["测试消息"]
@pytest.mark.asyncio
async def test_add_user_intervention_broadcasts_to_channel(self):
"""add_user_intervention 广播到 team channel"""
team = ExpertTeam()
transport = AsyncMock(spec=InProcessHandoffTransport)
team._handoff_transport = transport
await team.add_user_intervention("/stop")
assert transport.send.called
call_args = transport.send.call_args
channel = call_args[0][0]
message = call_args[0][1]
assert channel == team.team_channel
assert message["type"] == "user_intervention"
assert message["content"] == "/stop"
# ── TeamOrchestrator._process_interventions 测试 ────────
class TestProcessInterventionsStop:
"""_process_interventions /stop 处理测试"""
@pytest.mark.asyncio
async def test_stop_returns_true(self):
"""/stop → 返回 True终止执行"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
plan = _make_plan(phases=[_make_execution_phase()])
await team.add_user_intervention("/stop")
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is True
@pytest.mark.asyncio
async def test_stop_broadcasts_plan_update(self):
"""/stop → 广播 plan_update with stopped_by_user"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
plan = _make_plan(phases=[_make_execution_phase()])
await team.add_user_intervention("/stop")
await orchestrator._process_interventions(team.lead_expert, plan)
transport = team._handoff_transport
assert transport.send.called
last_call = transport.send.call_args_list[-1]
event_data = last_call[0][1]
assert event_data["type"] == "plan_update"
assert event_data["stopped_by_user"] is True
@pytest.mark.asyncio
async def test_stop_chinese_alias_works(self):
"""中文停止命令 '停止' 也能终止"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
plan = _make_plan(phases=[_make_execution_phase()])
await team.add_user_intervention("停止")
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is True
class TestProcessInterventionsDebate:
"""_process_interventions /debate 处理测试"""
@pytest.mark.asyncio
async def test_debate_inserts_debate_phase(self):
"""/debate <topic> → 插入 DEBATE phase"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
# 需要一个已完成的 phase 作为 anchor
completed = _make_execution_phase(
phase_id="p1", status=PhaseStatus.COMPLETED, result={"content": "结果"}
)
pending = _make_execution_phase(phase_id="p2", depends_on=["p1"])
plan = _make_plan(phases=[completed, pending])
await team.add_user_intervention("/debate 前端框架选型")
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is False # 不终止
assert orchestrator._debate_count == 1
# 应该新增一个 DEBATE phase
debate_phases = [p for p in plan.phases if p.phase_type == PhaseType.DEBATE]
assert len(debate_phases) == 1
assert "前端框架选型" in debate_phases[0].debate_config["topic"]
@pytest.mark.asyncio
async def test_debate_broadcasts_plan_update(self):
"""/debate → 广播 plan_update with debate_inserted"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
completed = _make_execution_phase(
phase_id="p1", status=PhaseStatus.COMPLETED, result={"content": "结果"}
)
plan = _make_plan(phases=[completed])
await team.add_user_intervention("/debate 测试话题")
await orchestrator._process_interventions(team.lead_expert, plan)
transport = team._handoff_transport
last_call = transport.send.call_args_list[-1]
event_data = last_call[0][1]
assert event_data["type"] == "plan_update"
assert "debate_inserted" in event_data
@pytest.mark.asyncio
async def test_debate_respects_max_debates(self):
"""/debate 受 MAX_DEBATES 限制"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
orchestrator._debate_count = orchestrator.MAX_DEBATES
completed = _make_execution_phase(
phase_id="p1", status=PhaseStatus.COMPLETED, result={"content": "结果"}
)
plan = _make_plan(phases=[completed])
await team.add_user_intervention("/debate 话题")
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is False
assert orchestrator._debate_count == orchestrator.MAX_DEBATES
# 不应该新增 DEBATE phase
debate_phases = [p for p in plan.phases if p.phase_type == PhaseType.DEBATE]
assert len(debate_phases) == 0
@pytest.mark.asyncio
async def test_debate_without_topic_ignored(self):
"""/debate 无 topic 时忽略"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
plan = _make_plan(phases=[_make_execution_phase()])
await team.add_user_intervention("/debate")
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is False
assert orchestrator._debate_count == 0
@pytest.mark.asyncio
async def test_debate_without_members_ignored(self):
"""/debate 无其他成员时忽略(只有 lead"""
team = _make_team_with_experts(expert_names=["lead"])
orchestrator = TeamOrchestrator(team)
completed = _make_execution_phase(
phase_id="p1", status=PhaseStatus.COMPLETED, result={"content": "结果"}
)
plan = _make_plan(phases=[completed])
await team.add_user_intervention("/debate 话题")
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is False
assert orchestrator._debate_count == 0
class TestProcessInterventionsPlainText:
"""_process_interventions 普通文本处理测试"""
@pytest.mark.asyncio
async def test_plain_text_accumulates_to_user_context(self):
"""普通文本 → 累积到 _user_context"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
plan = _make_plan(phases=[_make_execution_phase()])
await team.add_user_intervention("请关注性能优化")
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is False
assert "请关注性能优化" in orchestrator._user_context
@pytest.mark.asyncio
async def test_multiple_plain_texts_accumulate(self):
"""多条普通文本都累积"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
plan = _make_plan(phases=[_make_execution_phase()])
await team.add_user_intervention("第一条建议")
await team.add_user_intervention("第二条建议")
await orchestrator._process_interventions(team.lead_expert, plan)
assert len(orchestrator._user_context) == 2
assert "第一条建议" in orchestrator._user_context
assert "第二条建议" in orchestrator._user_context
@pytest.mark.asyncio
async def test_user_context_influences_synthesis_prompt(self):
"""_user_context 被追加到 synthesis prompt"""
# 用一个能捕获 prompt 的 gateway
captured_prompt = []
async def chat_side_effect(messages, model=None, **kwargs):
captured_prompt.append(messages[0]["content"])
response = MagicMock()
response.content = "综合结果"
return response
gateway = AsyncMock()
gateway.chat = AsyncMock(side_effect=chat_side_effect)
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
orchestrator._user_context.append("请重点关注安全性")
phases = [
_make_execution_phase(
phase_id="p1",
name="阶段A",
status=PhaseStatus.COMPLETED,
result={"content": "结果A"},
),
_make_execution_phase(
phase_id="p2",
name="阶段B",
status=PhaseStatus.COMPLETED,
result={"content": "结果B"},
),
]
await orchestrator._synthesize_results(team.lead_expert, "任务", phases)
assert len(captured_prompt) == 1
assert "请重点关注安全性" in captured_prompt[0]
assert "用户在执行期间补充的指导意见" in captured_prompt[0]
class TestProcessInterventionsEmpty:
"""_process_interventions 空队列测试"""
@pytest.mark.asyncio
async def test_empty_interventions_returns_false(self):
"""空干预队列 → 返回 False无副作用"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
plan = _make_plan(phases=[_make_execution_phase()])
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is False
assert orchestrator._debate_count == 0
assert orchestrator._user_context == []
class TestProcessInterventionsMixed:
"""_process_interventions 混合消息测试"""
@pytest.mark.asyncio
async def test_mixed_messages_processed_in_order(self):
"""混合消息按顺序处理:文本 + debate + 文本"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
completed = _make_execution_phase(
phase_id="p1", status=PhaseStatus.COMPLETED, result={"content": "结果"}
)
plan = _make_plan(phases=[completed])
await team.add_user_intervention("先补充个上下文")
await team.add_user_intervention("/debate 架构选型")
await team.add_user_intervention("再补充一条")
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is False
# debate 插入了
assert orchestrator._debate_count == 1
# 两条普通文本都累积了
assert len(orchestrator._user_context) == 2
assert "先补充个上下文" in orchestrator._user_context
assert "再补充一条" in orchestrator._user_context
@pytest.mark.asyncio
async def test_stop_terminates_even_with_other_messages(self):
"""混合消息中 /stop 终止执行(即使前面有其他消息)"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
completed = _make_execution_phase(
phase_id="p1", status=PhaseStatus.COMPLETED, result={"content": "结果"}
)
plan = _make_plan(phases=[completed])
await team.add_user_intervention("/debate 话题")
await team.add_user_intervention("/stop")
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is True
# debate 在 stop 之前处理了
assert orchestrator._debate_count == 1