"""ExpertTeam 用户干预通道 + TeamOrchestrator 干预处理单元测试 (U4) 测试覆盖: - ExpertTeam 干预队列 * add_user_intervention → consume_user_interventions 返回消息 * 多条干预消息累积,一次性消费 * consume 后队列清空,再次 consume 返回空 * broadcast_user_message 同时入队干预队列 - TeamOrchestrator._process_interventions * /stop → 返回 True(终止执行)+ 广播 plan_update * /debate → 插入 DEBATE phase + 广播 plan_update * /debate 受 MAX_DEBATES 限制 * /debate 无 topic 时忽略 * 普通文本 → 累积到 _user_context * 空干预队列 → 返回 False,无副作用 - 集成: _user_context 影响 synthesis prompt """ from __future__ import annotations from unittest.mock import AsyncMock, MagicMock import pytest from agentkit.core.handoff_transport import InProcessHandoffTransport from agentkit.experts.config import ExpertConfig from agentkit.experts.orchestrator import TeamOrchestrator from agentkit.experts.plan import PhaseStatus, PhaseType, PlanPhase, TeamPlan from agentkit.experts.team import ExpertTeam # ── 辅助函数 ────────────────────────────────────────────── def _make_expert_config(name: str = "test_expert", is_lead: bool = False) -> ExpertConfig: return ExpertConfig( name=name, agent_type="expert", persona=f"{name}的角色描述", thinking_style="逻辑推理", speaking_style="简洁直接", decision_framework="数据驱动决策", bound_skills=["skill_a"], is_lead=is_lead, task_mode="llm_generate", prompt={"identity": "测试"}, ) def _make_mock_expert( name: str = "test_expert", is_lead: bool = False, is_active: bool = True, gateway: MagicMock | None = None, ) -> MagicMock: config = _make_expert_config(name=name, is_lead=is_lead) expert = MagicMock() expert.config = config expert.is_active = is_active expert.team_id = None expert.get_capabilities_summary.return_value = { "name": name, "persona": config.persona, "thinking_style": config.thinking_style, "bound_skills": config.bound_skills, "is_lead": is_lead, } mock_agent = MagicMock() mock_agent._llm_gateway = gateway expert.agent = mock_agent return expert def _make_team_with_experts( expert_names: list[str] | None = None, lead_name: str = "lead", gateway: MagicMock | None = None, ) -> ExpertTeam: team = ExpertTeam() transport = AsyncMock(spec=InProcessHandoffTransport) team._handoff_transport = transport if expert_names is None: expert_names = [lead_name, "member1", "member2"] for name in expert_names: is_lead = name == lead_name expert = _make_mock_expert(name=name, is_lead=is_lead, gateway=gateway) team._experts[name] = expert if is_lead: team._lead_expert_name = name return team def _make_execution_phase( phase_id: str = "phase_1", name: str = "阶段一", assigned_expert: str = "member1", depends_on: list[str] | None = None, status: PhaseStatus = PhaseStatus.PENDING, result: dict | None = None, ) -> PlanPhase: return PlanPhase( id=phase_id, name=name, assigned_expert=assigned_expert, task_description=f"{name}的任务描述", depends_on=depends_on or [], phase_type=PhaseType.EXECUTION, status=status, result=result, ) def _make_plan( phases: list[PlanPhase], task: str = "测试任务", lead_expert: str = "lead", ) -> TeamPlan: return TeamPlan( id="test_plan", task=task, phases=phases, lead_expert=lead_expert, ) # ── ExpertTeam 干预队列测试 ────────────────────────────── class TestExpertTeamInterventionQueue: """ExpertTeam 干预队列基础功能测试""" @pytest.mark.asyncio async def test_add_and_consume_intervention(self): """add_user_intervention → consume_user_interventions 返回消息""" team = ExpertTeam() team._handoff_transport = AsyncMock(spec=InProcessHandoffTransport) await team.add_user_intervention("/debate 前端框架选型") interventions = team.consume_user_interventions() assert interventions == ["/debate 前端框架选型"] @pytest.mark.asyncio async def test_multiple_interventions_accumulate(self): """多条干预消息累积,一次性消费""" team = ExpertTeam() team._handoff_transport = AsyncMock(spec=InProcessHandoffTransport) await team.add_user_intervention("第一条") await team.add_user_intervention("第二条") await team.add_user_intervention("第三条") interventions = team.consume_user_interventions() assert len(interventions) == 3 assert interventions[0] == "第一条" assert interventions[1] == "第二条" assert interventions[2] == "第三条" @pytest.mark.asyncio async def test_consume_clears_queue(self): """consume 后队列清空,再次 consume 返回空""" team = ExpertTeam() team._handoff_transport = AsyncMock(spec=InProcessHandoffTransport) await team.add_user_intervention("消息") first = team.consume_user_interventions() assert len(first) == 1 second = team.consume_user_interventions() assert second == [] def test_consume_empty_queue_returns_empty_list(self): """空队列 consume 返回空列表""" team = ExpertTeam() interventions = team.consume_user_interventions() assert interventions == [] @pytest.mark.asyncio async def test_broadcast_user_message_enqueues_intervention(self): """broadcast_user_message 同时入队干预队列""" team = ExpertTeam() team._handoff_transport = AsyncMock(spec=InProcessHandoffTransport) await team.broadcast_user_message("测试消息") interventions = team.consume_user_interventions() assert interventions == ["测试消息"] @pytest.mark.asyncio async def test_add_user_intervention_broadcasts_to_channel(self): """add_user_intervention 广播到 team channel""" team = ExpertTeam() transport = AsyncMock(spec=InProcessHandoffTransport) team._handoff_transport = transport await team.add_user_intervention("/stop") assert transport.send.called call_args = transport.send.call_args channel = call_args[0][0] message = call_args[0][1] assert channel == team.team_channel assert message["type"] == "user_intervention" assert message["content"] == "/stop" # ── TeamOrchestrator._process_interventions 测试 ──────── class TestProcessInterventionsStop: """_process_interventions /stop 处理测试""" @pytest.mark.asyncio async def test_stop_returns_true(self): """/stop → 返回 True(终止执行)""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) plan = _make_plan(phases=[_make_execution_phase()]) await team.add_user_intervention("/stop") result = await orchestrator._process_interventions(team.lead_expert, plan) assert result is True @pytest.mark.asyncio async def test_stop_broadcasts_plan_update(self): """/stop → 广播 plan_update with stopped_by_user""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) plan = _make_plan(phases=[_make_execution_phase()]) await team.add_user_intervention("/stop") await orchestrator._process_interventions(team.lead_expert, plan) transport = team._handoff_transport assert transport.send.called last_call = transport.send.call_args_list[-1] event_data = last_call[0][1] assert event_data["type"] == "plan_update" assert event_data["stopped_by_user"] is True @pytest.mark.asyncio async def test_stop_chinese_alias_works(self): """中文停止命令 '停止' 也能终止""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) plan = _make_plan(phases=[_make_execution_phase()]) await team.add_user_intervention("停止") result = await orchestrator._process_interventions(team.lead_expert, plan) assert result is True class TestProcessInterventionsDebate: """_process_interventions /debate 处理测试""" @pytest.mark.asyncio async def test_debate_inserts_debate_phase(self): """/debate → 插入 DEBATE phase""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) # 需要一个已完成的 phase 作为 anchor completed = _make_execution_phase( phase_id="p1", status=PhaseStatus.COMPLETED, result={"content": "结果"} ) pending = _make_execution_phase(phase_id="p2", depends_on=["p1"]) plan = _make_plan(phases=[completed, pending]) await team.add_user_intervention("/debate 前端框架选型") result = await orchestrator._process_interventions(team.lead_expert, plan) assert result is False # 不终止 assert orchestrator._debate_count == 1 # 应该新增一个 DEBATE phase debate_phases = [p for p in plan.phases if p.phase_type == PhaseType.DEBATE] assert len(debate_phases) == 1 assert "前端框架选型" in debate_phases[0].debate_config["topic"] @pytest.mark.asyncio async def test_debate_broadcasts_plan_update(self): """/debate → 广播 plan_update with debate_inserted""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) completed = _make_execution_phase( phase_id="p1", status=PhaseStatus.COMPLETED, result={"content": "结果"} ) plan = _make_plan(phases=[completed]) await team.add_user_intervention("/debate 测试话题") await orchestrator._process_interventions(team.lead_expert, plan) transport = team._handoff_transport last_call = transport.send.call_args_list[-1] event_data = last_call[0][1] assert event_data["type"] == "plan_update" assert "debate_inserted" in event_data @pytest.mark.asyncio async def test_debate_respects_max_debates(self): """/debate 受 MAX_DEBATES 限制""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) orchestrator._debate_count = orchestrator.MAX_DEBATES completed = _make_execution_phase( phase_id="p1", status=PhaseStatus.COMPLETED, result={"content": "结果"} ) plan = _make_plan(phases=[completed]) await team.add_user_intervention("/debate 话题") result = await orchestrator._process_interventions(team.lead_expert, plan) assert result is False assert orchestrator._debate_count == orchestrator.MAX_DEBATES # 不应该新增 DEBATE phase debate_phases = [p for p in plan.phases if p.phase_type == PhaseType.DEBATE] assert len(debate_phases) == 0 @pytest.mark.asyncio async def test_debate_without_topic_ignored(self): """/debate 无 topic 时忽略""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) plan = _make_plan(phases=[_make_execution_phase()]) await team.add_user_intervention("/debate") result = await orchestrator._process_interventions(team.lead_expert, plan) assert result is False assert orchestrator._debate_count == 0 @pytest.mark.asyncio async def test_debate_without_members_ignored(self): """/debate 无其他成员时忽略(只有 lead)""" team = _make_team_with_experts(expert_names=["lead"]) orchestrator = TeamOrchestrator(team) completed = _make_execution_phase( phase_id="p1", status=PhaseStatus.COMPLETED, result={"content": "结果"} ) plan = _make_plan(phases=[completed]) await team.add_user_intervention("/debate 话题") result = await orchestrator._process_interventions(team.lead_expert, plan) assert result is False assert orchestrator._debate_count == 0 class TestProcessInterventionsPlainText: """_process_interventions 普通文本处理测试""" @pytest.mark.asyncio async def test_plain_text_accumulates_to_user_context(self): """普通文本 → 累积到 _user_context""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) plan = _make_plan(phases=[_make_execution_phase()]) await team.add_user_intervention("请关注性能优化") result = await orchestrator._process_interventions(team.lead_expert, plan) assert result is False assert "请关注性能优化" in orchestrator._user_context @pytest.mark.asyncio async def test_multiple_plain_texts_accumulate(self): """多条普通文本都累积""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) plan = _make_plan(phases=[_make_execution_phase()]) await team.add_user_intervention("第一条建议") await team.add_user_intervention("第二条建议") await orchestrator._process_interventions(team.lead_expert, plan) assert len(orchestrator._user_context) == 2 assert "第一条建议" in orchestrator._user_context assert "第二条建议" in orchestrator._user_context @pytest.mark.asyncio async def test_user_context_influences_synthesis_prompt(self): """_user_context 被追加到 synthesis prompt""" # 用一个能捕获 prompt 的 gateway captured_prompt = [] async def chat_side_effect(messages, model=None, **kwargs): captured_prompt.append(messages[0]["content"]) response = MagicMock() response.content = "综合结果" return response gateway = AsyncMock() gateway.chat = AsyncMock(side_effect=chat_side_effect) team = _make_team_with_experts(gateway=gateway) orchestrator = TeamOrchestrator(team) orchestrator._user_context.append("请重点关注安全性") phases = [ _make_execution_phase( phase_id="p1", name="阶段A", status=PhaseStatus.COMPLETED, result={"content": "结果A"}, ), _make_execution_phase( phase_id="p2", name="阶段B", status=PhaseStatus.COMPLETED, result={"content": "结果B"}, ), ] await orchestrator._synthesize_results(team.lead_expert, "任务", phases) assert len(captured_prompt) == 1 assert "请重点关注安全性" in captured_prompt[0] assert "用户在执行期间补充的指导意见" in captured_prompt[0] class TestProcessInterventionsEmpty: """_process_interventions 空队列测试""" @pytest.mark.asyncio async def test_empty_interventions_returns_false(self): """空干预队列 → 返回 False,无副作用""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) plan = _make_plan(phases=[_make_execution_phase()]) result = await orchestrator._process_interventions(team.lead_expert, plan) assert result is False assert orchestrator._debate_count == 0 assert orchestrator._user_context == [] class TestProcessInterventionsMixed: """_process_interventions 混合消息测试""" @pytest.mark.asyncio async def test_mixed_messages_processed_in_order(self): """混合消息按顺序处理:文本 + debate + 文本""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) completed = _make_execution_phase( phase_id="p1", status=PhaseStatus.COMPLETED, result={"content": "结果"} ) plan = _make_plan(phases=[completed]) await team.add_user_intervention("先补充个上下文") await team.add_user_intervention("/debate 架构选型") await team.add_user_intervention("再补充一条") result = await orchestrator._process_interventions(team.lead_expert, plan) assert result is False # debate 插入了 assert orchestrator._debate_count == 1 # 两条普通文本都累积了 assert len(orchestrator._user_context) == 2 assert "先补充个上下文" in orchestrator._user_context assert "再补充一条" in orchestrator._user_context @pytest.mark.asyncio async def test_stop_terminates_even_with_other_messages(self): """混合消息中 /stop 终止执行(即使前面有其他消息)""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) completed = _make_execution_phase( phase_id="p1", status=PhaseStatus.COMPLETED, result={"content": "结果"} ) plan = _make_plan(phases=[completed]) await team.add_user_intervention("/debate 话题") await team.add_user_intervention("/stop") result = await orchestrator._process_interventions(team.lead_expert, plan) assert result is True # debate 在 stop 之前处理了 assert orchestrator._debate_count == 1