"""U1: Lead 生成协作契约单元测试 测试覆盖: - _parse_phases 正确解析 LLM 返回的协作契约 - _parse_phases 对格式不正确的协作契约优雅降级 - Lead 分解任务时生成的 phases 包含协作契约(端到端 execute) - plan_update 事件包含协作契约信息 """ from __future__ import annotations import json from unittest.mock import AsyncMock, MagicMock import pytest from agentkit.core.handoff_transport import InProcessHandoffTransport from agentkit.core.protocol import TaskResult, TaskStatus from agentkit.experts.config import ExpertConfig from agentkit.experts.expert import Expert from agentkit.experts.orchestrator import TeamOrchestrator from agentkit.experts.plan import CollaborationContract from agentkit.experts.team import ExpertTeam # ── 辅助函数 ────────────────────────────────────────────── def _make_expert_config( name: str = "test_expert", is_lead: bool = False, llm: dict | None = None, ) -> ExpertConfig: """创建测试用 ExpertConfig""" return ExpertConfig( name=name, agent_type="expert", persona=f"{name}的角色", thinking_style="逻辑推理", bound_skills=["skill_a"], is_lead=is_lead, task_mode="llm_generate", prompt={"identity": "测试"}, llm=llm, ) def _make_mock_expert( name: str = "test_expert", is_lead: bool = False, is_active: bool = True, gateway: MagicMock | None = None, ) -> MagicMock: """创建 mock Expert""" config = _make_expert_config(name=name, is_lead=is_lead) expert = MagicMock(spec=Expert) expert.config = config expert.is_active = is_active expert.team_id = None expert.get_capabilities_summary.return_value = { "name": name, "persona": config.persona, "thinking_style": config.thinking_style, "bound_skills": config.bound_skills, "is_lead": is_lead, } mock_agent = MagicMock() mock_agent._llm_gateway = gateway # 默认 agent.execute 返回成功结果 mock_agent.execute = AsyncMock( return_value=TaskResult( task_id="test", agent_name=name, status=TaskStatus.COMPLETED.value, output_data={"content": f"Result from {name}"}, error_message=None, started_at=None, completed_at=None, ) ) expert.agent = mock_agent return expert def _make_team_with_experts( expert_names: list[str] | None = None, lead_name: str = "lead", gateway: MagicMock | None = None, ) -> ExpertTeam: """创建包含 mock experts 的 ExpertTeam""" team = ExpertTeam() transport = AsyncMock(spec=InProcessHandoffTransport) team._handoff_transport = transport if expert_names is None: expert_names = [lead_name, "backend", "frontend"] for name in expert_names: is_lead = name == lead_name expert = _make_mock_expert(name=name, is_lead=is_lead, gateway=gateway) team._experts[name] = expert if is_lead: team._lead_expert_name = name return team def _make_mock_llm_gateway( phases: list[dict], synthesis_content: str = "综合结果", ) -> MagicMock: """创建 mock LLM gateway. 首次 chat 返回 phases 的 JSON(用于任务分解),后续调用返回 synthesis_content。 """ gateway = AsyncMock() phases_json = json.dumps(phases) decomp_response = MagicMock() decomp_response.content = phases_json synth_response = MagicMock() synth_response.content = synthesis_content call_count = [0] async def chat_side_effect(messages, model=None, **kwargs): call_count[0] += 1 if call_count[0] == 1: return decomp_response return synth_response gateway.chat = AsyncMock(side_effect=chat_side_effect) return gateway # ── _parse_phases 协作契约解析测试 ───────────────────────── class TestParsePhasesContracts: """_parse_phases 协作契约解析测试""" def test_parse_phases_with_contracts(self): """_parse_phases 正确解析协作契约""" content = json.dumps( [ { "name": "规划", "assigned_expert": "lead", "task_description": "设计架构", "depends_on": [], "collaboration_contracts": [], }, { "name": "后端", "assigned_expert": "backend", "task_description": "实现API", "depends_on": ["规划"], "collaboration_contracts": [ { "from_expert": "backend", "to_expert": "frontend", "content_description": "API 定义", "status": "pending", } ], }, ] ) phases = TeamOrchestrator._parse_phases(content, ["lead", "backend", "frontend"], "lead") assert len(phases) == 2 # 规划阶段无契约 assert phases[0].collaboration_contracts == [] # 后端阶段有 1 个契约 assert len(phases[1].collaboration_contracts) == 1 contract = phases[1].collaboration_contracts[0] assert contract.from_expert == "backend" assert contract.to_expert == "frontend" assert contract.content_description == "API 定义" assert contract.status == "pending" def test_parse_phases_multiple_contracts(self): """_parse_phases 解析多个协作契约""" content = json.dumps( [ { "name": "集成", "assigned_expert": "lead", "task_description": "集成前后端", "depends_on": [], "collaboration_contracts": [ { "from_expert": "backend", "to_expert": "frontend", "content_description": "API 定义", }, { "from_expert": "frontend", "to_expert": "backend", "content_description": "前端调用约定", "status": "delivered", }, ], }, ] ) phases = TeamOrchestrator._parse_phases(content, ["lead", "backend", "frontend"], "lead") assert len(phases) == 1 assert len(phases[0].collaboration_contracts) == 2 assert phases[0].collaboration_contracts[0].from_expert == "backend" assert phases[0].collaboration_contracts[1].from_expert == "frontend" assert phases[0].collaboration_contracts[1].status == "delivered" def test_parse_phases_malformed_contracts_not_list(self): """LLM 返回的协作契约不是列表时优雅降级为空""" content = json.dumps( [ { "name": "A", "assigned_expert": "lead", "task_description": "任务A", "depends_on": [], "collaboration_contracts": "not a list", }, ] ) phases = TeamOrchestrator._parse_phases(content, ["lead"], "lead") assert len(phases) == 1 assert phases[0].collaboration_contracts == [] def test_parse_phases_malformed_contracts_item_not_dict(self): """LLM 返回的协作契约元素不是字典时降级为默认契约""" content = json.dumps( [ { "name": "A", "assigned_expert": "lead", "task_description": "任务A", "depends_on": [], "collaboration_contracts": ["not a dict", 42, None], }, ] ) phases = TeamOrchestrator._parse_phases(content, ["lead"], "lead") assert len(phases) == 1 # 非字典元素降级为默认 CollaborationContract assert len(phases[0].collaboration_contracts) == 3 for contract in phases[0].collaboration_contracts: assert isinstance(contract, CollaborationContract) assert contract.status == "pending" def test_parse_phases_missing_contracts_field(self): """LLM 返回的阶段缺少 collaboration_contracts 字段时默认为空""" content = json.dumps( [ { "name": "A", "assigned_expert": "lead", "task_description": "任务A", "depends_on": [], }, ] ) phases = TeamOrchestrator._parse_phases(content, ["lead"], "lead") assert len(phases) == 1 assert phases[0].collaboration_contracts == [] def test_parse_phases_contract_partial_fields(self): """协作契约部分字段缺失时使用默认值""" content = json.dumps( [ { "name": "A", "assigned_expert": "lead", "task_description": "任务A", "depends_on": [], "collaboration_contracts": [ {"from_expert": "backend"}, # 缺少其他字段 ], }, ] ) phases = TeamOrchestrator._parse_phases(content, ["lead", "backend"], "lead") assert len(phases) == 1 contract = phases[0].collaboration_contracts[0] assert contract.from_expert == "backend" assert contract.to_expert == "" assert contract.content_description == "" assert contract.status == "pending" # ── Lead 分解生成契约端到端测试 ──────────────────────────── class TestDecomposeGeneratesContracts: """Lead 分解任务生成协作契约的端到端测试""" @pytest.mark.asyncio async def test_decompose_generates_contracts(self): """Lead 分解任务时生成的 phases 包含协作契约""" gateway = _make_mock_llm_gateway( phases=[ { "name": "规划", "assigned_expert": "lead", "task_description": "设计架构", "depends_on": [], "collaboration_contracts": [], }, { "name": "后端", "assigned_expert": "backend", "task_description": "实现API", "depends_on": ["规划"], "collaboration_contracts": [ { "from_expert": "backend", "to_expert": "frontend", "content_description": "API 定义", "status": "pending", } ], }, { "name": "前端", "assigned_expert": "frontend", "task_description": "实现UI", "depends_on": ["后端"], "collaboration_contracts": [], }, ] ) team = _make_team_with_experts( expert_names=["lead", "backend", "frontend"], gateway=gateway ) orchestrator = TeamOrchestrator(team) result = await orchestrator.execute("开发功能") assert result["status"] == "completed" plan = result["plan"] assert len(plan.phases) == 3 # 后端阶段应包含协作契约 backend_phase = next(p for p in plan.phases if p.name == "后端") assert len(backend_phase.collaboration_contracts) == 1 contract = backend_phase.collaboration_contracts[0] assert contract.from_expert == "backend" assert contract.to_expert == "frontend" assert contract.content_description == "API 定义" # 规划和前端阶段无契约 planning_phase = next(p for p in plan.phases if p.name == "规划") assert planning_phase.collaboration_contracts == [] frontend_phase = next(p for p in plan.phases if p.name == "前端") assert frontend_phase.collaboration_contracts == [] @pytest.mark.asyncio async def test_plan_update_includes_contracts(self): """plan_update 事件包含协作契约信息""" gateway = _make_mock_llm_gateway( phases=[ { "name": "后端", "assigned_expert": "backend", "task_description": "实现API", "depends_on": [], "collaboration_contracts": [ { "from_expert": "backend", "to_expert": "frontend", "content_description": "API 定义", "status": "pending", } ], }, ] ) team = _make_team_with_experts( expert_names=["lead", "backend", "frontend"], gateway=gateway ) orchestrator = TeamOrchestrator(team) await orchestrator.execute("开发功能") calls = team._handoff_transport.send.call_args_list plan_updates = [c[0][1] for c in calls if c[0][1].get("type") == "plan_update"] assert len(plan_updates) >= 1 # plan_update 的 plan_phases 应包含 collaboration_contracts 字段 first_update = plan_updates[0] assert "plan_phases" in first_update phases_data = first_update["plan_phases"] assert len(phases_data) == 1 backend_phase_data = phases_data[0] assert "collaboration_contracts" in backend_phase_data assert len(backend_phase_data["collaboration_contracts"]) == 1 contract_data = backend_phase_data["collaboration_contracts"][0] assert contract_data["from_expert"] == "backend" assert contract_data["to_expert"] == "frontend" assert contract_data["content_description"] == "API 定义" assert contract_data["status"] == "pending" @pytest.mark.asyncio async def test_decompose_without_contracts_field_still_works(self): """LLM 未返回 collaboration_contracts 字段时仍正常工作(向后兼容)""" gateway = _make_mock_llm_gateway( phases=[ { "name": "A", "assigned_expert": "lead", "task_description": "任务A", "depends_on": [], }, { "name": "B", "assigned_expert": "backend", "task_description": "任务B", "depends_on": ["A"], }, ] ) team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway) orchestrator = TeamOrchestrator(team) result = await orchestrator.execute("测试任务") assert result["status"] == "completed" plan = result["plan"] assert len(plan.phases) == 2 # 所有阶段的协作契约都应为空列表 for ph in plan.phases: assert ph.collaboration_contracts == []