"""U1: Lead 生成协作契约单元测试 测试覆盖: - _parse_phases 正确解析 LLM 返回的协作契约 - _parse_phases 对格式不正确的协作契约优雅降级 - Lead 分解任务时生成的 phases 包含协作契约(端到端 execute) - plan_update 事件包含协作契约信息 """ from __future__ import annotations import json from unittest.mock import AsyncMock, MagicMock import pytest from agentkit.core.handoff_transport import InProcessHandoffTransport from agentkit.core.protocol import TaskResult, TaskStatus from agentkit.experts.config import ExpertConfig from agentkit.experts.expert import Expert from agentkit.experts.orchestrator import TeamOrchestrator from agentkit.experts.plan import CollaborationContract, PhaseStatus, PlanPhase, TeamPlan from agentkit.experts.team import ExpertTeam # ── 辅助函数 ────────────────────────────────────────────── def _make_expert_config( name: str = "test_expert", is_lead: bool = False, llm: dict | None = None, ) -> ExpertConfig: """创建测试用 ExpertConfig""" return ExpertConfig( name=name, agent_type="expert", persona=f"{name}的角色", thinking_style="逻辑推理", bound_skills=["skill_a"], is_lead=is_lead, task_mode="llm_generate", prompt={"identity": "测试"}, llm=llm, ) def _make_mock_expert( name: str = "test_expert", is_lead: bool = False, is_active: bool = True, gateway: MagicMock | None = None, ) -> MagicMock: """创建 mock Expert""" config = _make_expert_config(name=name, is_lead=is_lead) expert = MagicMock(spec=Expert) expert.config = config expert.is_active = is_active expert.team_id = None expert.get_capabilities_summary.return_value = { "name": name, "persona": config.persona, "thinking_style": config.thinking_style, "bound_skills": config.bound_skills, "is_lead": is_lead, } mock_agent = MagicMock() mock_agent._llm_gateway = gateway # 默认 agent.execute 返回成功结果 mock_agent.execute = AsyncMock( return_value=TaskResult( task_id="test", agent_name=name, status=TaskStatus.COMPLETED.value, output_data={"content": f"Result from {name}"}, error_message=None, started_at=None, completed_at=None, ) ) expert.agent = mock_agent return expert def _make_team_with_experts( expert_names: list[str] | None = None, lead_name: str = "lead", gateway: MagicMock | None = None, ) -> ExpertTeam: """创建包含 mock experts 的 ExpertTeam""" team = ExpertTeam() transport = AsyncMock(spec=InProcessHandoffTransport) team._handoff_transport = transport if expert_names is None: expert_names = [lead_name, "backend", "frontend"] for name in expert_names: is_lead = name == lead_name expert = _make_mock_expert(name=name, is_lead=is_lead, gateway=gateway) team._experts[name] = expert if is_lead: team._lead_expert_name = name return team def _make_mock_llm_gateway( phases: list[dict], synthesis_content: str = "综合结果", ) -> MagicMock: """创建 mock LLM gateway. 首次 chat 返回 phases 的 JSON(用于任务分解),后续调用返回 synthesis_content。 """ gateway = AsyncMock() phases_json = json.dumps(phases) decomp_response = MagicMock() decomp_response.content = phases_json synth_response = MagicMock() synth_response.content = synthesis_content call_count = [0] async def chat_side_effect(messages, model=None, **kwargs): call_count[0] += 1 if call_count[0] == 1: return decomp_response return synth_response gateway.chat = AsyncMock(side_effect=chat_side_effect) return gateway def _make_review_gateway(review_results: list[tuple[bool, str]]) -> MagicMock: """创建 mock LLM gateway 用于验收。 review_results: (passed, feedback) 列表,按顺序返回。 若调用次数超过列表长度,重复返回最后一个结果。 """ gateway = AsyncMock() responses = [] for passed, feedback in review_results: resp = MagicMock() resp.content = json.dumps({"passed": passed, "feedback": feedback}) responses.append(resp) call_count = [0] async def chat_side_effect(messages, model=None, **kwargs): idx = min(call_count[0], len(responses) - 1) call_count[0] += 1 return responses[idx] gateway.chat = AsyncMock(side_effect=chat_side_effect) return gateway # ── _parse_phases 协作契约解析测试 ───────────────────────── class TestParsePhasesContracts: """_parse_phases 协作契约解析测试""" def test_parse_phases_with_contracts(self): """_parse_phases 正确解析协作契约""" content = json.dumps( [ { "name": "规划", "assigned_expert": "lead", "task_description": "设计架构", "depends_on": [], "collaboration_contracts": [], }, { "name": "后端", "assigned_expert": "backend", "task_description": "实现API", "depends_on": ["规划"], "collaboration_contracts": [ { "from_expert": "backend", "to_expert": "frontend", "content_description": "API 定义", "status": "pending", } ], }, ] ) phases = TeamOrchestrator._parse_phases(content, ["lead", "backend", "frontend"], "lead") assert len(phases) == 2 # 规划阶段无契约 assert phases[0].collaboration_contracts == [] # 后端阶段有 1 个契约 assert len(phases[1].collaboration_contracts) == 1 contract = phases[1].collaboration_contracts[0] assert contract.from_expert == "backend" assert contract.to_expert == "frontend" assert contract.content_description == "API 定义" assert contract.status == "pending" def test_parse_phases_multiple_contracts(self): """_parse_phases 解析多个协作契约""" content = json.dumps( [ { "name": "集成", "assigned_expert": "lead", "task_description": "集成前后端", "depends_on": [], "collaboration_contracts": [ { "from_expert": "backend", "to_expert": "frontend", "content_description": "API 定义", }, { "from_expert": "frontend", "to_expert": "backend", "content_description": "前端调用约定", "status": "delivered", }, ], }, ] ) phases = TeamOrchestrator._parse_phases(content, ["lead", "backend", "frontend"], "lead") assert len(phases) == 1 assert len(phases[0].collaboration_contracts) == 2 assert phases[0].collaboration_contracts[0].from_expert == "backend" assert phases[0].collaboration_contracts[1].from_expert == "frontend" assert phases[0].collaboration_contracts[1].status == "delivered" def test_parse_phases_malformed_contracts_not_list(self): """LLM 返回的协作契约不是列表时优雅降级为空""" content = json.dumps( [ { "name": "A", "assigned_expert": "lead", "task_description": "任务A", "depends_on": [], "collaboration_contracts": "not a list", }, ] ) phases = TeamOrchestrator._parse_phases(content, ["lead"], "lead") assert len(phases) == 1 assert phases[0].collaboration_contracts == [] def test_parse_phases_malformed_contracts_item_not_dict(self): """LLM 返回的协作契约元素不是字典时降级为默认契约""" content = json.dumps( [ { "name": "A", "assigned_expert": "lead", "task_description": "任务A", "depends_on": [], "collaboration_contracts": ["not a dict", 42, None], }, ] ) phases = TeamOrchestrator._parse_phases(content, ["lead"], "lead") assert len(phases) == 1 # 非字典元素降级为默认 CollaborationContract assert len(phases[0].collaboration_contracts) == 3 for contract in phases[0].collaboration_contracts: assert isinstance(contract, CollaborationContract) assert contract.status == "pending" def test_parse_phases_missing_contracts_field(self): """LLM 返回的阶段缺少 collaboration_contracts 字段时默认为空""" content = json.dumps( [ { "name": "A", "assigned_expert": "lead", "task_description": "任务A", "depends_on": [], }, ] ) phases = TeamOrchestrator._parse_phases(content, ["lead"], "lead") assert len(phases) == 1 assert phases[0].collaboration_contracts == [] def test_parse_phases_contract_partial_fields(self): """协作契约部分字段缺失时使用默认值""" content = json.dumps( [ { "name": "A", "assigned_expert": "lead", "task_description": "任务A", "depends_on": [], "collaboration_contracts": [ {"from_expert": "backend"}, # 缺少其他字段 ], }, ] ) phases = TeamOrchestrator._parse_phases(content, ["lead", "backend"], "lead") assert len(phases) == 1 contract = phases[0].collaboration_contracts[0] assert contract.from_expert == "backend" assert contract.to_expert == "" assert contract.content_description == "" assert contract.status == "pending" # ── Lead 分解生成契约端到端测试 ──────────────────────────── class TestDecomposeGeneratesContracts: """Lead 分解任务生成协作契约的端到端测试""" @pytest.mark.asyncio async def test_decompose_generates_contracts(self): """Lead 分解任务时生成的 phases 包含协作契约""" gateway = _make_mock_llm_gateway( phases=[ { "name": "规划", "assigned_expert": "lead", "task_description": "设计架构", "depends_on": [], "collaboration_contracts": [], }, { "name": "后端", "assigned_expert": "backend", "task_description": "实现API", "depends_on": ["规划"], "collaboration_contracts": [ { "from_expert": "backend", "to_expert": "frontend", "content_description": "API 定义", "status": "pending", } ], }, { "name": "前端", "assigned_expert": "frontend", "task_description": "实现UI", "depends_on": ["后端"], "collaboration_contracts": [], }, ] ) team = _make_team_with_experts( expert_names=["lead", "backend", "frontend"], gateway=gateway ) orchestrator = TeamOrchestrator(team) result = await orchestrator.execute("开发功能") assert result["status"] == "completed" plan = result["plan"] assert len(plan.phases) == 3 # 后端阶段应包含协作契约 backend_phase = next(p for p in plan.phases if p.name == "后端") assert len(backend_phase.collaboration_contracts) == 1 contract = backend_phase.collaboration_contracts[0] assert contract.from_expert == "backend" assert contract.to_expert == "frontend" assert contract.content_description == "API 定义" # 规划和前端阶段无契约 planning_phase = next(p for p in plan.phases if p.name == "规划") assert planning_phase.collaboration_contracts == [] frontend_phase = next(p for p in plan.phases if p.name == "前端") assert frontend_phase.collaboration_contracts == [] @pytest.mark.asyncio async def test_plan_update_includes_contracts(self): """plan_update 事件包含协作契约信息""" gateway = _make_mock_llm_gateway( phases=[ { "name": "后端", "assigned_expert": "backend", "task_description": "实现API", "depends_on": [], "collaboration_contracts": [ { "from_expert": "backend", "to_expert": "frontend", "content_description": "API 定义", "status": "pending", } ], }, ] ) team = _make_team_with_experts( expert_names=["lead", "backend", "frontend"], gateway=gateway ) orchestrator = TeamOrchestrator(team) await orchestrator.execute("开发功能") calls = team._handoff_transport.send.call_args_list plan_updates = [c[0][1] for c in calls if c[0][1].get("type") == "plan_update"] assert len(plan_updates) >= 1 # plan_update 的 plan_phases 应包含 collaboration_contracts 字段 first_update = plan_updates[0] assert "plan_phases" in first_update phases_data = first_update["plan_phases"] assert len(phases_data) == 1 backend_phase_data = phases_data[0] assert "collaboration_contracts" in backend_phase_data assert len(backend_phase_data["collaboration_contracts"]) == 1 contract_data = backend_phase_data["collaboration_contracts"][0] assert contract_data["from_expert"] == "backend" assert contract_data["to_expert"] == "frontend" assert contract_data["content_description"] == "API 定义" assert contract_data["status"] == "pending" @pytest.mark.asyncio async def test_decompose_without_contracts_field_still_works(self): """LLM 未返回 collaboration_contracts 字段时仍正常工作(向后兼容)""" gateway = _make_mock_llm_gateway( phases=[ { "name": "A", "assigned_expert": "lead", "task_description": "任务A", "depends_on": [], }, { "name": "B", "assigned_expert": "backend", "task_description": "任务B", "depends_on": ["A"], }, ] ) team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway) orchestrator = TeamOrchestrator(team) result = await orchestrator.execute("测试任务") assert result["status"] == "completed" plan = result["plan"] assert len(plan.phases) == 2 # 所有阶段的协作契约都应为空列表 for ph in plan.phases: assert ph.collaboration_contracts == [] # ── U2: 协作契约执行测试 ────────────────────────────────── class TestCollaborationExecution: """U2: 协作契约执行 — 专家可见 + 主动通知测试""" @pytest.mark.asyncio async def test_expert_reads_collaboration_outputs(self): """专家执行时能读到协作契约中 from_expert 的输出""" team = _make_team_with_experts(expert_names=["lead", "backend", "frontend"]) orchestrator = TeamOrchestrator(team) # 创建计划:backend 阶段已完成,frontend 阶段有待执行的协作契约 plan = TeamPlan(task="开发功能", lead_expert="lead") backend_phase = PlanPhase( id="phase-backend", name="后端", assigned_expert="backend", task_description="实现API", depends_on=[], status=PhaseStatus.COMPLETED, result={"content": "API definition: GET /users"}, ) frontend_phase = PlanPhase( id="phase-frontend", name="前端", assigned_expert="frontend", task_description="实现UI", depends_on=["phase-backend"], status=PhaseStatus.PENDING, collaboration_contracts=[ CollaborationContract( from_expert="backend", to_expert="frontend", content_description="API 定义", status="delivered", # 已交付,触发读取 ) ], ) plan.phases = [backend_phase, frontend_phase] await orchestrator._execute_execution_phase(frontend_phase, plan) # 验证 frontend 专家的 agent.execute 收到了 collaboration_outputs frontend_expert = team.get_expert("frontend") task_msg = frontend_expert.agent.execute.call_args.args[0] assert "collaboration_outputs" in task_msg.input_data assert "backend" in task_msg.input_data["collaboration_outputs"] assert "API definition" in task_msg.input_data["collaboration_outputs"]["backend"] # 验证 context 中包含协作专家输出 assert "协作专家输出" in task_msg.input_data["context"] @pytest.mark.asyncio async def test_expert_notifies_collaborators(self): """专家完成后,协作契约中的 to_expert 收到 collaboration_notice 事件""" team = _make_team_with_experts(expert_names=["lead", "backend", "frontend"]) orchestrator = TeamOrchestrator(team) plan = TeamPlan(task="开发功能", lead_expert="lead") backend_phase = PlanPhase( id="phase-backend", name="后端", assigned_expert="backend", task_description="实现API", collaboration_contracts=[ CollaborationContract( from_expert="backend", to_expert="frontend", content_description="API 定义", status="pending", ) ], ) plan.phases = [backend_phase] await orchestrator._notify_collaborators(backend_phase, plan) calls = team._handoff_transport.send.call_args_list notices = [c[0][1] for c in calls if c[0][1].get("type") == "collaboration_notice"] assert len(notices) == 1 assert notices[0]["to_expert"] == "frontend" @pytest.mark.asyncio async def test_contract_status_updated_to_delivered(self): """契约状态从 pending 更新为 delivered""" team = _make_team_with_experts(expert_names=["lead", "backend", "frontend"]) orchestrator = TeamOrchestrator(team) plan = TeamPlan(task="开发功能", lead_expert="lead") contract = CollaborationContract( from_expert="backend", to_expert="frontend", content_description="API 定义", status="pending", ) backend_phase = PlanPhase( id="phase-backend", name="后端", assigned_expert="backend", task_description="实现API", collaboration_contracts=[contract], ) plan.phases = [backend_phase] await orchestrator._notify_collaborators(backend_phase, plan) assert contract.status == "delivered" @pytest.mark.asyncio async def test_no_collaboration_contracts_backward_compatible(self): """协作契约为空时,行为与当前一致(向后兼容)""" team = _make_team_with_experts(expert_names=["lead", "backend"]) orchestrator = TeamOrchestrator(team) plan = TeamPlan(task="开发功能", lead_expert="lead") backend_phase = PlanPhase( id="phase-backend", name="后端", assigned_expert="backend", task_description="实现API", status=PhaseStatus.PENDING, collaboration_contracts=[], ) plan.phases = [backend_phase] result = await orchestrator._execute_execution_phase(backend_phase, plan) # 验证正常执行 assert result is not None # 验证 input_data 中没有 collaboration_outputs backend_expert = team.get_expert("backend") task_msg = backend_expert.agent.execute.call_args.args[0] assert "collaboration_outputs" not in task_msg.input_data # 验证没有 collaboration_notice 事件 calls = team._handoff_transport.send.call_args_list notices = [c[0][1] for c in calls if c[0][1].get("type") == "collaboration_notice"] assert len(notices) == 0 @pytest.mark.asyncio async def test_collaboration_notice_event_content(self): """collaboration_notice 事件包含正确的 from_expert, to_expert, content_description""" team = _make_team_with_experts(expert_names=["lead", "backend", "frontend"]) orchestrator = TeamOrchestrator(team) plan = TeamPlan(task="开发功能", lead_expert="lead") backend_phase = PlanPhase( id="phase-backend", name="后端", assigned_expert="backend", task_description="实现API", collaboration_contracts=[ CollaborationContract( from_expert="backend", to_expert="frontend", content_description="API 定义", status="pending", ) ], ) plan.phases = [backend_phase] await orchestrator._notify_collaborators(backend_phase, plan) calls = team._handoff_transport.send.call_args_list notices = [c[0][1] for c in calls if c[0][1].get("type") == "collaboration_notice"] assert len(notices) == 1 notice = notices[0] assert notice["from_expert"] == "backend" assert notice["to_expert"] == "frontend" assert notice["content_description"] == "API 定义" assert notice["phase_id"] == "phase-backend" assert notice["phase_name"] == "后端" assert "output_key" in notice assert "expert_color" in notice @pytest.mark.asyncio async def test_notify_skips_empty_to_expert(self): """to_expert 为空时跳过通知""" team = _make_team_with_experts(expert_names=["lead", "backend"]) orchestrator = TeamOrchestrator(team) plan = TeamPlan(task="开发功能", lead_expert="lead") backend_phase = PlanPhase( id="phase-backend", name="后端", assigned_expert="backend", task_description="实现API", collaboration_contracts=[ CollaborationContract( from_expert="backend", to_expert="", # 空的 to_expert content_description="API 定义", status="pending", ) ], ) plan.phases = [backend_phase] await orchestrator._notify_collaborators(backend_phase, plan) calls = team._handoff_transport.send.call_args_list notices = [c[0][1] for c in calls if c[0][1].get("type") == "collaboration_notice"] assert len(notices) == 0 @pytest.mark.asyncio async def test_notify_skips_already_delivered(self): """契约状态已为 delivered 时跳过通知""" team = _make_team_with_experts(expert_names=["lead", "backend", "frontend"]) orchestrator = TeamOrchestrator(team) plan = TeamPlan(task="开发功能", lead_expert="lead") backend_phase = PlanPhase( id="phase-backend", name="后端", assigned_expert="backend", task_description="实现API", collaboration_contracts=[ CollaborationContract( from_expert="backend", to_expert="frontend", content_description="API 定义", status="delivered", # 已交付 ) ], ) plan.phases = [backend_phase] await orchestrator._notify_collaborators(backend_phase, plan) calls = team._handoff_transport.send.call_args_list notices = [c[0][1] for c in calls if c[0][1].get("type") == "collaboration_notice"] assert len(notices) == 0 # ── U3: Lead 验收环节 + 返工机制测试 ────────────────────── class TestPhaseReview: """U3: Lead 验收环节 + 返工机制测试""" @pytest.mark.asyncio async def test_review_passed(self): """验收合格时,阶段标记 COMPLETED,发出 review_result(passed)事件""" gateway = _make_review_gateway([(True, "")]) team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway) orchestrator = TeamOrchestrator(team) plan = TeamPlan(task="开发功能", lead_expert="lead") phase = PlanPhase( id="phase-1", name="后端", assigned_expert="backend", task_description="实现API", ) plan.phases = [phase] result = await orchestrator._execute_execution_phase(phase, plan) assert phase.status == PhaseStatus.COMPLETED assert result is not None # 验证 review_result 事件 calls = team._handoff_transport.send.call_args_list reviews = [c[0][1] for c in calls if c[0][1].get("type") == "review_result"] assert len(reviews) == 1 assert reviews[0]["passed"] is True @pytest.mark.asyncio async def test_review_failed_rework(self): """验收不合格时返工,附 feedback,重新执行后通过""" # 第一次验收不合格,第二次验收通过 gateway = _make_review_gateway([(False, "需要增加错误处理"), (True, "")]) team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway) orchestrator = TeamOrchestrator(team) plan = TeamPlan(task="开发功能", lead_expert="lead") phase = PlanPhase( id="phase-1", name="后端", assigned_expert="backend", task_description="实现API", ) plan.phases = [phase] result = await orchestrator._execute_execution_phase(phase, plan) assert phase.status == PhaseStatus.COMPLETED assert phase.rework_count == 1 assert phase.review_feedback == "需要增加错误处理" assert result is not None # 验证 task_description 被附加了返工反馈 assert "[返工要求]" in phase.task_description assert "需要增加错误处理" in phase.task_description # 验证 review_result 事件:第一次 rework,第二次 passed calls = team._handoff_transport.send.call_args_list reviews = [c[0][1] for c in calls if c[0][1].get("type") == "review_result"] assert len(reviews) == 2 assert reviews[0]["passed"] is False assert reviews[0]["final_status"] == "rework" assert reviews[1]["passed"] is True @pytest.mark.asyncio async def test_review_max_reworks_exceeded(self): """返工次数达到 MAX_REWORKS 仍不合格,标记 FAILED 并抛 RuntimeError 让调用方级联""" # 始终验收不合格 gateway = _make_review_gateway([(False, "不合格")] * 10) team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway) orchestrator = TeamOrchestrator(team) plan = TeamPlan(task="开发功能", lead_expert="lead") phase = PlanPhase( id="phase-1", name="后端", assigned_expert="backend", task_description="实现API", ) plan.phases = [phase] # P1: 超过返工上限时抛 RuntimeError,让 _execute_pipeline 的 gather(return_exceptions=True) 检测并级联 with pytest.raises(RuntimeError, match="phase-1 failed after"): await orchestrator._execute_execution_phase(phase, plan) assert phase.status == PhaseStatus.FAILED assert phase.rework_count == TeamOrchestrator.MAX_REWORKS + 1 # 验证 phase_failed 事件 calls = team._handoff_transport.send.call_args_list failures = [c[0][1] for c in calls if c[0][1].get("type") == "phase_failed"] assert len(failures) == 1 # 验证最后一个 review_result 事件是 failed reviews = [c[0][1] for c in calls if c[0][1].get("type") == "review_result"] assert reviews[-1]["final_status"] == "failed" @pytest.mark.asyncio async def test_review_no_llm_gateway_skips(self): """Lead LLM 不可用时,跳过验收直接标记 COMPLETED(优雅降级)""" # 不传 gateway,所有专家的 _llm_gateway 为 None team = _make_team_with_experts(expert_names=["lead", "backend"]) orchestrator = TeamOrchestrator(team) plan = TeamPlan(task="开发功能", lead_expert="lead") phase = PlanPhase( id="phase-1", name="后端", assigned_expert="backend", task_description="实现API", ) plan.phases = [phase] result = await orchestrator._execute_execution_phase(phase, plan) assert phase.status == PhaseStatus.COMPLETED assert result is not None # 验证没有发生返工 assert phase.rework_count == 0 # 验证只执行了一次(没有返工) calls = team._handoff_transport.send.call_args_list steps = [c[0][1] for c in calls if c[0][1].get("type") == "expert_step"] assert len(steps) == 1 @pytest.mark.asyncio async def test_review_result_event_content(self): """review_result 事件包含正确的 passed/feedback/expert 字段""" gateway = _make_review_gateway([(True, "")]) team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway) orchestrator = TeamOrchestrator(team) plan = TeamPlan(task="开发功能", lead_expert="lead") phase = PlanPhase( id="phase-1", name="后端", assigned_expert="backend", task_description="实现API", ) plan.phases = [phase] await orchestrator._execute_execution_phase(phase, plan) calls = team._handoff_transport.send.call_args_list reviews = [c[0][1] for c in calls if c[0][1].get("type") == "review_result"] assert len(reviews) == 1 review = reviews[0] assert review["phase_id"] == "phase-1" assert review["phase_name"] == "后端" assert review["passed"] is True assert review["feedback"] == "" assert review["expert"] == "backend" @pytest.mark.asyncio async def test_rework_feedback_appended_to_task(self): """返工时 feedback 被附加到 task_description""" gateway = _make_review_gateway([(False, "请增加单元测试"), (True, "")]) team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway) orchestrator = TeamOrchestrator(team) original_task = "实现API" plan = TeamPlan(task="开发功能", lead_expert="lead") phase = PlanPhase( id="phase-1", name="后端", assigned_expert="backend", task_description=original_task, ) plan.phases = [phase] await orchestrator._execute_execution_phase(phase, plan) # 验证 task_description 被附加了返工反馈 assert original_task in phase.task_description assert "[返工要求]: 请增加单元测试" in phase.task_description # 验证第二次执行的 task_msg 包含返工反馈 backend_expert = team.get_expert("backend") # agent.execute 被调用了 2 次(1 次初始 + 1 次返工) assert backend_expert.agent.execute.call_count == 2 # 验证第二次执行的 task_msg 应包含返工反馈 second_call_args = backend_expert.agent.execute.call_args_list[1] second_task_msg = second_call_args.args[0] assert "[返工要求]" in second_task_msg.input_data["task"] # ── U4: 专家风险标记测试 ────────────────────────────────── class TestRiskFlagging: """U4: 专家风险标记 — _parse_risk_flags 解析 + risk_flagged 事件发出测试""" def test_parse_risk_flags_single(self): """单个 [RISK: ...] 标记被正确解析""" content = "实现完成。[RISK: API 可能存在性能问题] 请关注。" risks = TeamOrchestrator._parse_risk_flags(content) assert len(risks) == 1 assert risks[0] == "API 可能存在性能问题" def test_parse_risk_flags_multiple(self): """多个 [RISK: ...] 标记都被解析""" content = "[RISK: 数据库连接池可能不足] 实现完成。 [RISK: 缺少单元测试覆盖]" risks = TeamOrchestrator._parse_risk_flags(content) assert len(risks) == 2 assert risks[0] == "数据库连接池可能不足" assert risks[1] == "缺少单元测试覆盖" def test_parse_risk_flags_none(self): """无风险标记时返回空列表""" content = "实现完成,没有风险。" risks = TeamOrchestrator._parse_risk_flags(content) assert risks == [] def test_parse_risk_flags_malformed(self): """格式不正确的标记被忽略""" content = ( "RISK: 不是标记] " # 缺少左括号 "[RISK 也不是标记] " # 缺少冒号 "[RISK:正常风险] " # 这个是正常的 ) risks = TeamOrchestrator._parse_risk_flags(content) # 只有 "正常风险" 被解析,其他格式不正确的被忽略 assert risks == ["正常风险"] @pytest.mark.asyncio async def test_risk_flagged_event_emitted(self): """专家输出包含 [RISK: ...] 时,risk_flagged 事件被发出""" gateway = _make_review_gateway([(True, "")]) team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway) # 覆盖 backend 专家的输出,包含风险标记 backend_expert = team.get_expert("backend") backend_expert.agent.execute = AsyncMock( return_value=TaskResult( task_id="test", agent_name="backend", status=TaskStatus.COMPLETED.value, output_data={"content": "API 实现完成 [RISK: 接口响应时间可能超标]"}, error_message=None, started_at=None, completed_at=None, ) ) orchestrator = TeamOrchestrator(team) plan = TeamPlan(task="开发功能", lead_expert="lead") phase = PlanPhase( id="phase-1", name="后端", assigned_expert="backend", task_description="实现API", ) plan.phases = [phase] await orchestrator._execute_execution_phase(phase, plan) calls = team._handoff_transport.send.call_args_list risk_events = [c[0][1] for c in calls if c[0][1].get("type") == "risk_flagged"] assert len(risk_events) == 1 assert risk_events[0]["risk_description"] == "接口响应时间可能超标" @pytest.mark.asyncio async def test_risk_flagged_event_content(self): """risk_flagged 事件包含正确的 expert, risk_description, phase_id 字段""" gateway = _make_review_gateway([(True, "")]) team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway) backend_expert = team.get_expert("backend") backend_expert.agent.execute = AsyncMock( return_value=TaskResult( task_id="test", agent_name="backend", status=TaskStatus.COMPLETED.value, output_data={"content": "完成 [RISK: 安全漏洞风险]"}, error_message=None, started_at=None, completed_at=None, ) ) orchestrator = TeamOrchestrator(team) plan = TeamPlan(task="开发功能", lead_expert="lead") phase = PlanPhase( id="phase-risk-1", name="安全审计", assigned_expert="backend", task_description="审计代码安全", ) plan.phases = [phase] await orchestrator._execute_execution_phase(phase, plan) calls = team._handoff_transport.send.call_args_list risk_events = [c[0][1] for c in calls if c[0][1].get("type") == "risk_flagged"] assert len(risk_events) == 1 event = risk_events[0] assert event["expert"] == "backend" assert event["expert_name"] == "backend" assert event["risk_description"] == "安全漏洞风险" assert event["phase_id"] == "phase-risk-1" assert event["phase_name"] == "安全审计" @pytest.mark.asyncio async def test_no_risk_flagged_when_clean(self): """专家输出不包含风险标记时,无 risk_flagged 事件""" gateway = _make_review_gateway([(True, "")]) team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway) # backend 专家输出无风险标记(使用默认输出 "Result from backend") orchestrator = TeamOrchestrator(team) plan = TeamPlan(task="开发功能", lead_expert="lead") phase = PlanPhase( id="phase-1", name="后端", assigned_expert="backend", task_description="实现API", ) plan.phases = [phase] await orchestrator._execute_execution_phase(phase, plan) calls = team._handoff_transport.send.call_args_list risk_events = [c[0][1] for c in calls if c[0][1].get("type") == "risk_flagged"] assert len(risk_events) == 0