From f219c5f01636bce1c2856b766ae1706819f1cac2 Mon Sep 17 00:00:00 2001 From: chiguyong Date: Wed, 24 Jun 2026 13:44:50 +0800 Subject: [PATCH] =?UTF-8?q?feat(experts):=20U1=20=E5=8D=8F=E4=BD=9C?= =?UTF-8?q?=E5=A5=91=E7=BA=A6=E6=95=B0=E6=8D=AE=E6=A8=A1=E5=9E=8B=20+=20Le?= =?UTF-8?q?ad=20=E7=94=9F=E6=88=90=E5=A5=91=E7=BA=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - PlanPhase 添加 collaboration_contracts 字段(CollaborationContract dataclass) - 修改 _decompose_task prompt,要求 Lead 分解任务时定义协作契约 - 修改 _parse_phases 解析 LLM 返回的协作契约信息 - plan_update 事件自动包含协作契约(通过 to_dict 序列化) - 71 + 9 = 80 个新测试,全套 436 passed 无回归 --- src/agentkit/experts/orchestrator.py | 119 +++--- src/agentkit/experts/plan.py | 61 ++- tests/unit/experts/test_plan.py | 153 ++++++- tests/unit/experts/test_pm_collaboration.py | 427 ++++++++++++++++++++ 4 files changed, 682 insertions(+), 78 deletions(-) create mode 100644 tests/unit/experts/test_pm_collaboration.py diff --git a/src/agentkit/experts/orchestrator.py b/src/agentkit/experts/orchestrator.py index cc31cc1..ee2b544 100644 --- a/src/agentkit/experts/orchestrator.py +++ b/src/agentkit/experts/orchestrator.py @@ -32,7 +32,14 @@ from agentkit.core.protocol import TaskMessage, TaskResult, TaskStatus from agentkit.llm.gateway import LLMGateway from .expert import Expert -from .plan import PhaseStatus, PhaseType, PlanPhase, PlanStatus, TeamPlan +from .plan import ( + CollaborationContract, + PhaseStatus, + PhaseType, + PlanPhase, + PlanStatus, + TeamPlan, +) from .team import ExpertTeam, TeamStatus logger = logging.getLogger(__name__) @@ -137,7 +144,9 @@ class TeamOrchestrator: phases = await self._decompose_task(lead, task) if not phases: logger.warning("Task decomposition returned no phases, executing as single phase") - phases = [PlanPhase(name="执行", assigned_expert=lead.config.name, task_description=task)] + phases = [ + PlanPhase(name="执行", assigned_expert=lead.config.name, task_description=task) + ] plan.phases = phases[: self.MAX_PHASES] @@ -194,9 +203,7 @@ class TeamOrchestrator: for ph, result in zip(ready, results): if isinstance(result, (Exception, asyncio.CancelledError)): logger.error(f"Phase {ph.id} ({ph.name}) failed: {result}") - plan.update_phase_status( - ph.id, PhaseStatus.FAILED, {"error": str(result)} - ) + plan.update_phase_status(ph.id, PhaseStatus.FAILED, {"error": str(result)}) phase_results[ph.id] = {"error": str(result)} # Emit phase_failed event await self._broadcast_event( @@ -215,13 +222,9 @@ class TeamOrchestrator: # U3: Divergence detection — check completed phases for conflicts # and dynamically insert DEBATE phases if needed if self._debate_count < self.MAX_DEBATES: - completed_now = [ - ph for ph in ready if ph.status == PhaseStatus.COMPLETED - ] + completed_now = [ph for ph in ready if ph.status == PhaseStatus.COMPLETED] if completed_now: - await self._check_divergence_and_insert_debates( - lead, plan, completed_now - ) + await self._check_divergence_and_insert_debates(lead, plan, completed_now) # 5. Check if all phases failed completed = plan.completed_phases @@ -264,16 +267,12 @@ class TeamOrchestrator: # Circular dependency or invalid reference from topological_sort logger.error(f"Pipeline execution failed (invalid plan): {e}") plan.status = PlanStatus.FAILED - await self._broadcast_event( - "team_dissolved", {"team_id": self._team.team_id} - ) + await self._broadcast_event("team_dissolved", {"team_id": self._team.team_id}) return await self._fallback_to_single_agent(task, plan, phase_results) except Exception as e: logger.error(f"Pipeline execution failed: {e}") plan.status = PlanStatus.FAILED - await self._broadcast_event( - "team_dissolved", {"team_id": self._team.team_id} - ) + await self._broadcast_event("team_dissolved", {"team_id": self._team.team_id}) return await self._fallback_to_single_agent(task, plan, phase_results) async def _decompose_task(self, lead: Expert, task: str) -> list[PlanPhase]: @@ -300,14 +299,24 @@ class TeamOrchestrator: f"Return a JSON array of phase objects, each with:\n" f'- "name": phase name (e.g., "规划", "前端", "后端", "QA", "评审")\n' f'- "assigned_expert": name of the expert to assign ' - f'(must be one of: {", ".join(available_experts)})\n' + f"(must be one of: {', '.join(available_experts)})\n" f'- "task_description": clear phase task description\n' - f'- "depends_on": array of phase names this phase depends on (empty array if none)\n\n' + f'- "depends_on": array of phase names this phase depends on (empty array if none)\n' + f'- "collaboration_contracts": 数组,定义该阶段的协作契约,每个契约包含:\n' + f' - "from_expert": 提供内容的专家名称\n' + f' - "to_expert": 接收内容的专家名称\n' + f' - "content_description": 协作内容描述\n' + f' 例如:[{{"from_expert":"backend","to_expert":"frontend",' + f'"content_description":"API 定义"}}]\n\n' f"Example:\n" f'[{{"name":"规划","assigned_expert":"tech_lead",' - f'"task_description":"设计架构","depends_on":[]}},' + f'"task_description":"设计架构","depends_on":[],"collaboration_contracts":[]}},' + f'{{"name":"后端","assigned_expert":"backend",' + f'"task_description":"实现API","depends_on":["规划"],' + f'"collaboration_contracts":[{{"from_expert":"backend",' + f'"to_expert":"frontend","content_description":"API 定义"}}]}},' f'{{"name":"前端","assigned_expert":"frontend",' - f'"task_description":"实现UI","depends_on":["规划"]}}]\n\n' + f'"task_description":"实现UI","depends_on":["后端"],"collaboration_contracts":[]}}]\n\n' f"Return ONLY the JSON array, no other text." ) @@ -367,11 +376,23 @@ class TeamOrchestrator: if not isinstance(depends_on_names, list): depends_on_names = [] + # 解析协作契约(LLM 返回格式不正确时优雅降级为空列表) + contracts_data = item.get("collaboration_contracts", []) + if not isinstance(contracts_data, list): + contracts_data = [] + contracts = [ + CollaborationContract.from_dict(c) + if isinstance(c, dict) + else CollaborationContract() + for c in contracts_data + ] + phase = PlanPhase( name=name, assigned_expert=assigned, task_description=task_desc, depends_on=[], # Will resolve to IDs in second pass + collaboration_contracts=contracts, ) raw_phases.append({"phase": phase, "depends_on_names": depends_on_names}) name_to_id[name] = phase.id @@ -474,12 +495,9 @@ class TeamOrchestrator: "dependency_outputs": dependency_outputs, } if dependency_outputs: - input_data["context"] = ( - "前置阶段输出:\n" - + "\n---\n".join( - f"[{name}]:\n{output[:500] if isinstance(output, str) else str(output)[:500]}" - for name, output in dependency_outputs.items() - ) + input_data["context"] = "前置阶段输出:\n" + "\n---\n".join( + f"[{name}]:\n{output[:500] if isinstance(output, str) else str(output)[:500]}" + for name, output in dependency_outputs.items() ) task_msg = TaskMessage( @@ -840,13 +858,13 @@ class TeamOrchestrator: return f"[第 {round_num} 轮辩论小结因 LLM 不可用无法生成]" # Get only current round's arguments - round_entries = [h for h in history if h.get("round") == round_num and h["role"] == "expert"] + round_entries = [ + h for h in history if h.get("round") == round_num and h["role"] == "expert" + ] if not round_entries: return "" - round_text = "\n\n".join( - f"[{h['expert']}]: {h['content']}" for h in round_entries - ) + round_text = "\n\n".join(f"[{h['expert']}]: {h['content']}" for h in round_entries) prompt = ( f"你是团队 Lead {lead.config.name},正在主持辩论。\n\n" @@ -985,9 +1003,7 @@ class TeamOrchestrator: # ── U4: User intervention processing at phase boundaries ────────── - async def _process_interventions( - self, lead: Expert, plan: TeamPlan - ) -> bool: + async def _process_interventions(self, lead: Expert, plan: TeamPlan) -> bool: """Process pending user interventions at a phase boundary. Handles three intervention kinds: @@ -1024,13 +1040,12 @@ class TeamOrchestrator: # /debate → insert DEBATE phase if lower.startswith("/debate"): - topic = stripped[len("/debate"):].strip() + topic = stripped[len("/debate") :].strip() if not topic: continue if self._debate_count >= self.MAX_DEBATES: logger.info( - f"Max debates ({self.MAX_DEBATES}) reached, " - "ignoring /debate intervention" + f"Max debates ({self.MAX_DEBATES}) reached, ignoring /debate intervention" ) continue participants = [ @@ -1066,9 +1081,7 @@ class TeamOrchestrator: # ── U3: Divergence detection + dynamic debate insertion ──────────── - async def _maybe_add_plan_review_debate( - self, lead: Expert, plan: TeamPlan, task: str - ) -> None: + async def _maybe_add_plan_review_debate(self, lead: Expert, plan: TeamPlan, task: str) -> None: """Optionally add a plan review debate phase before execution. Skips for simple tasks (<= 2 phases) or when LLM judges it unnecessary. @@ -1085,9 +1098,7 @@ class TeamOrchestrator: return member_names = [ - e.config.name - for e in self._team.active_experts - if e.config.name != lead.config.name + e.config.name for e in self._team.active_experts if e.config.name != lead.config.name ] if not member_names: return @@ -1152,9 +1163,7 @@ class TeamOrchestrator: # Need other completed phases to compare against other_completed = [ - ph - for ph in plan.completed_phases - if ph.id != completed_phase.id and ph.result + ph for ph in plan.completed_phases if ph.id != completed_phase.id and ph.result ] if not other_completed: return False @@ -1166,18 +1175,16 @@ class TeamOrchestrator: current_output = "" if completed_phase.result: - current_output = completed_phase.result.get( - "content", str(completed_phase.result) - )[:500] + current_output = completed_phase.result.get("content", str(completed_phase.result))[ + :500 + ] prompt = ( f"你是团队 Lead {lead.config.name},需要判断刚完成的阶段产出是否与其他阶段存在分歧。\n\n" f"原始任务:{plan.task}\n\n" f"刚完成的阶段:{completed_phase.name}\n" f"产出:{current_output}\n\n" - f"其他已完成阶段的产出:\n" - + "\n---\n".join(other_outputs) - + "\n\n" + f"其他已完成阶段的产出:\n" + "\n---\n".join(other_outputs) + "\n\n" "请判断是否值得发起辩论。以下情况值得辩论:\n" "1) 两个阶段产出存在矛盾或冲突\n" "2) 阶段产出与原始任务约束冲突\n" @@ -1393,14 +1400,12 @@ class TeamOrchestrator: f"Original task: {task}\n\n" f"Below are {len(results)} phase results from your team members. " f"Synthesize them into a single comprehensive final result that " - f"best addresses the original task.\n\n" - + "\n---\n".join(summaries) + f"best addresses the original task.\n\n" + "\n---\n".join(summaries) ) # U4: Append accumulated user context so user guidance influences synthesis if self._user_context: - prompt += ( - "\n\n用户在执行期间补充的指导意见(请在综合时参考):\n- " - + "\n- ".join(self._user_context) + prompt += "\n\n用户在执行期间补充的指导意见(请在综合时参考):\n- " + "\n- ".join( + self._user_context ) prompt += "\n\nProvide the synthesized result directly." diff --git a/src/agentkit/experts/plan.py b/src/agentkit/experts/plan.py index 4b4d1c0..4f60d3e 100644 --- a/src/agentkit/experts/plan.py +++ b/src/agentkit/experts/plan.py @@ -106,6 +106,44 @@ class SubTask: ) +@dataclass +class CollaborationContract: + """协作契约 — 定义专家间的协作关系 + + Lead 在分解任务时为每个阶段定义协作契约,明确哪些专家需要协作、协作内容是什么。 + + Attributes: + from_expert: 提供协作内容的专家名称 + to_expert: 接收协作内容的专家名称 + content_description: 协作内容描述(如"API 定义"、"数据模型") + status: 契约状态(pending/delivered/received) + """ + + from_expert: str = "" + to_expert: str = "" + content_description: str = "" + status: str = "pending" + + def to_dict(self) -> dict[str, Any]: + """序列化为字典""" + return { + "from_expert": self.from_expert, + "to_expert": self.to_expert, + "content_description": self.content_description, + "status": self.status, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> CollaborationContract: + """从字典创建 CollaborationContract""" + return cls( + from_expert=data.get("from_expert", ""), + to_expert=data.get("to_expert", ""), + content_description=data.get("content_description", ""), + status=data.get("status", "pending"), + ) + + @dataclass class PlanPhase: """流水线模式中的执行阶段 @@ -127,6 +165,7 @@ class PlanPhase: - participants: 参与专家名称列表 - max_rounds: 最大辩论轮次(默认 2,硬上限 4) - skip: 是否跳过辩论(逃生舱) + collaboration_contracts: 协作契约列表,定义该阶段涉及的专家协作关系 """ id: str = field(default_factory=lambda: str(uuid.uuid4())) @@ -138,6 +177,7 @@ class PlanPhase: result: dict[str, Any] | None = None phase_type: PhaseType = PhaseType.EXECUTION debate_config: dict[str, Any] | None = None + collaboration_contracts: list[CollaborationContract] = field(default_factory=list) def to_dict(self) -> dict[str, Any]: """序列化为字典""" @@ -158,11 +198,19 @@ class PlanPhase: "result": result_str, "phase_type": self.phase_type.value, "debate_config": self.debate_config, + "collaboration_contracts": [c.to_dict() for c in self.collaboration_contracts], } @classmethod def from_dict(cls, data: dict[str, Any]) -> PlanPhase: """从字典创建 PlanPhase""" + contracts_data = data.get("collaboration_contracts", []) + if not isinstance(contracts_data, list): + contracts_data = [] + contracts = [ + CollaborationContract.from_dict(c) if isinstance(c, dict) else CollaborationContract() + for c in contracts_data + ] return cls( id=data.get("id", str(uuid.uuid4())), name=data.get("name", ""), @@ -173,6 +221,7 @@ class PlanPhase: result=data.get("result"), phase_type=PhaseType(data.get("phase_type", PhaseType.EXECUTION.value)), debate_config=data.get("debate_config"), + collaboration_contracts=contracts, ) @@ -295,9 +344,7 @@ class TeamPlan: @property def all_phases_done(self) -> bool: """所有阶段是否都已完成(成功或失败)""" - return all( - ph.status in (PhaseStatus.COMPLETED, PhaseStatus.FAILED) for ph in self.phases - ) + return all(ph.status in (PhaseStatus.COMPLETED, PhaseStatus.FAILED) for ph in self.phases) def get_ready_phases(self) -> list[PlanPhase]: """返回当前可执行的阶段(状态为 PENDING 且所有依赖已完成) @@ -357,17 +404,13 @@ class TeamPlan: while len(processed) < len(self.phases): # Find all phases with in_degree 0 that haven't been processed current_layer_ids = [ - ph_id - for ph_id in in_degree - if ph_id not in processed and in_degree[ph_id] == 0 + ph_id for ph_id in in_degree if ph_id not in processed and in_degree[ph_id] == 0 ] if not current_layer_ids: # No progress — cycle detected remaining = [ph_id for ph_id in in_degree if ph_id not in processed] - raise ValueError( - f"Circular dependency detected among phases: {remaining}" - ) + raise ValueError(f"Circular dependency detected among phases: {remaining}") # Add current layer current_layer = [phase_map[ph_id] for ph_id in current_layer_ids] diff --git a/tests/unit/experts/test_plan.py b/tests/unit/experts/test_plan.py index 23f2a40..f9f06ef 100644 --- a/tests/unit/experts/test_plan.py +++ b/tests/unit/experts/test_plan.py @@ -5,6 +5,7 @@ from __future__ import annotations import pytest from agentkit.experts.plan import ( + CollaborationContract, MergeStrategy, PhaseStatus, PhaseType, @@ -328,12 +329,8 @@ def _make_pipeline_plan() -> TeamPlan: """ phases = [ _make_phase(id="p1", name="规划", assigned_expert="tech_lead", depends_on=[]), - _make_phase( - id="p2", name="前端", assigned_expert="frontend_engineer", depends_on=["p1"] - ), - _make_phase( - id="p3", name="后端", assigned_expert="backend_engineer", depends_on=["p1"] - ), + _make_phase(id="p2", name="前端", assigned_expert="frontend_engineer", depends_on=["p1"]), + _make_phase(id="p3", name="后端", assigned_expert="backend_engineer", depends_on=["p1"]), _make_phase(id="p4", name="QA", assigned_expert="qa_engineer", depends_on=["p2", "p3"]), _make_phase(id="p5", name="评审", assigned_expert="code_reviewer", depends_on=["p4"]), ] @@ -525,6 +522,142 @@ class TestPlanPhase: assert d["phase_type"] == "execution" assert d["debate_config"] is None + def test_default_collaboration_contracts_empty(self): + """默认 collaboration_contracts 为空列表""" + phase = PlanPhase(name="测试阶段") + assert phase.collaboration_contracts == [] + d = phase.to_dict() + assert d["collaboration_contracts"] == [] + + def test_plan_phase_with_contracts(self): + """PlanPhase 携带 collaboration_contracts 序列化/反序列化正确""" + contracts = [ + CollaborationContract( + from_expert="backend", + to_expert="frontend", + content_description="API 定义", + status="delivered", + ), + CollaborationContract( + from_expert="tech_lead", + to_expert="backend", + content_description="数据模型", + ), + ] + phase = PlanPhase( + id="contract_phase", + name="后端开发", + assigned_expert="backend_engineer", + task_description="实现 API", + collaboration_contracts=contracts, + ) + d = phase.to_dict() + assert len(d["collaboration_contracts"]) == 2 + assert d["collaboration_contracts"][0]["from_expert"] == "backend" + assert d["collaboration_contracts"][0]["to_expert"] == "frontend" + assert d["collaboration_contracts"][0]["content_description"] == "API 定义" + assert d["collaboration_contracts"][0]["status"] == "delivered" + + # 往返序列化 + restored = PlanPhase.from_dict(d) + assert len(restored.collaboration_contracts) == 2 + assert restored.collaboration_contracts[0].from_expert == "backend" + assert restored.collaboration_contracts[0].to_expert == "frontend" + assert restored.collaboration_contracts[0].content_description == "API 定义" + assert restored.collaboration_contracts[0].status == "delivered" + assert restored.collaboration_contracts[1].from_expert == "tech_lead" + assert restored.collaboration_contracts[1].status == "pending" + + def test_plan_phase_empty_contracts(self): + """协作契约为空列表时正常工作""" + phase = PlanPhase( + id="empty_contract_phase", + name="独立阶段", + assigned_expert="solo_expert", + collaboration_contracts=[], + ) + d = phase.to_dict() + assert d["collaboration_contracts"] == [] + restored = PlanPhase.from_dict(d) + assert restored.collaboration_contracts == [] + + def test_backward_compatibility_no_contracts_field(self): + """向后兼容:不带 collaboration_contracts 的旧 dict 默认为空列表""" + old_dict = { + "id": "old_phase", + "name": "旧阶段", + "assigned_expert": "dev", + "task_description": "旧任务", + "depends_on": [], + "status": "pending", + "result": None, + } + phase = PlanPhase.from_dict(old_dict) + assert phase.collaboration_contracts == [] + + +class TestCollaborationContract: + """CollaborationContract 数据模型测试""" + + def test_default_values(self): + """默认值:空字符串字段,status 为 pending""" + contract = CollaborationContract() + assert contract.from_expert == "" + assert contract.to_expert == "" + assert contract.content_description == "" + assert contract.status == "pending" + + def test_creation_with_all_fields(self): + """创建 CollaborationContract 并设置所有字段""" + contract = CollaborationContract( + from_expert="backend", + to_expert="frontend", + content_description="API 定义", + status="delivered", + ) + assert contract.from_expert == "backend" + assert contract.to_expert == "frontend" + assert contract.content_description == "API 定义" + assert contract.status == "delivered" + + def test_collaboration_contract_serialization(self): + """CollaborationContract 序列化/反序列化正确""" + contract = CollaborationContract( + from_expert="tech_lead", + to_expert="qa_engineer", + content_description="测试用例规范", + status="received", + ) + d = contract.to_dict() + assert d == { + "from_expert": "tech_lead", + "to_expert": "qa_engineer", + "content_description": "测试用例规范", + "status": "received", + } + + restored = CollaborationContract.from_dict(d) + assert restored.from_expert == contract.from_expert + assert restored.to_expert == contract.to_expert + assert restored.content_description == contract.content_description + assert restored.status == contract.status + + def test_from_dict_missing_fields_uses_defaults(self): + """from_dict 对缺失字段使用默认值""" + restored = CollaborationContract.from_dict({"from_expert": "backend"}) + assert restored.from_expert == "backend" + assert restored.to_expert == "" + assert restored.content_description == "" + assert restored.status == "pending" + + def test_from_dict_empty_dict(self): + """from_dict 对空字典返回全默认值""" + restored = CollaborationContract.from_dict({}) + assert restored.from_expert == "" + assert restored.to_expert == "" + assert restored.content_description == "" + assert restored.status == "pending" + class TestTeamPlanPhases: """TeamPlan 流水线模式(phases)测试""" @@ -733,12 +866,8 @@ class TestTopologicalSort: task="混合模式任务", phases=[ PlanPhase(id="p1", name="规划", assigned_expert="tech_lead", depends_on=[]), - PlanPhase( - id="p2", name="前端", assigned_expert="frontend", depends_on=["p1"] - ), - PlanPhase( - id="p3", name="后端", assigned_expert="backend", depends_on=["p1"] - ), + PlanPhase(id="p2", name="前端", assigned_expert="frontend", depends_on=["p1"]), + PlanPhase(id="p3", name="后端", assigned_expert="backend", depends_on=["p1"]), PlanPhase( id="d1", name="架构辩论", diff --git a/tests/unit/experts/test_pm_collaboration.py b/tests/unit/experts/test_pm_collaboration.py new file mode 100644 index 0000000..b0ba429 --- /dev/null +++ b/tests/unit/experts/test_pm_collaboration.py @@ -0,0 +1,427 @@ +"""U1: Lead 生成协作契约单元测试 + +测试覆盖: +- _parse_phases 正确解析 LLM 返回的协作契约 +- _parse_phases 对格式不正确的协作契约优雅降级 +- Lead 分解任务时生成的 phases 包含协作契约(端到端 execute) +- plan_update 事件包含协作契约信息 +""" + +from __future__ import annotations + +import json +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from agentkit.core.handoff_transport import InProcessHandoffTransport +from agentkit.core.protocol import TaskResult, TaskStatus +from agentkit.experts.config import ExpertConfig +from agentkit.experts.expert import Expert +from agentkit.experts.orchestrator import TeamOrchestrator +from agentkit.experts.plan import CollaborationContract +from agentkit.experts.team import ExpertTeam + + +# ── 辅助函数 ────────────────────────────────────────────── + + +def _make_expert_config( + name: str = "test_expert", + is_lead: bool = False, + llm: dict | None = None, +) -> ExpertConfig: + """创建测试用 ExpertConfig""" + return ExpertConfig( + name=name, + agent_type="expert", + persona=f"{name}的角色", + thinking_style="逻辑推理", + bound_skills=["skill_a"], + is_lead=is_lead, + task_mode="llm_generate", + prompt={"identity": "测试"}, + llm=llm, + ) + + +def _make_mock_expert( + name: str = "test_expert", + is_lead: bool = False, + is_active: bool = True, + gateway: MagicMock | None = None, +) -> MagicMock: + """创建 mock Expert""" + config = _make_expert_config(name=name, is_lead=is_lead) + expert = MagicMock(spec=Expert) + expert.config = config + expert.is_active = is_active + expert.team_id = None + expert.get_capabilities_summary.return_value = { + "name": name, + "persona": config.persona, + "thinking_style": config.thinking_style, + "bound_skills": config.bound_skills, + "is_lead": is_lead, + } + mock_agent = MagicMock() + mock_agent._llm_gateway = gateway + # 默认 agent.execute 返回成功结果 + mock_agent.execute = AsyncMock( + return_value=TaskResult( + task_id="test", + agent_name=name, + status=TaskStatus.COMPLETED.value, + output_data={"content": f"Result from {name}"}, + error_message=None, + started_at=None, + completed_at=None, + ) + ) + expert.agent = mock_agent + return expert + + +def _make_team_with_experts( + expert_names: list[str] | None = None, + lead_name: str = "lead", + gateway: MagicMock | None = None, +) -> ExpertTeam: + """创建包含 mock experts 的 ExpertTeam""" + team = ExpertTeam() + transport = AsyncMock(spec=InProcessHandoffTransport) + team._handoff_transport = transport + + if expert_names is None: + expert_names = [lead_name, "backend", "frontend"] + + for name in expert_names: + is_lead = name == lead_name + expert = _make_mock_expert(name=name, is_lead=is_lead, gateway=gateway) + team._experts[name] = expert + if is_lead: + team._lead_expert_name = name + + return team + + +def _make_mock_llm_gateway( + phases: list[dict], + synthesis_content: str = "综合结果", +) -> MagicMock: + """创建 mock LLM gateway. + + 首次 chat 返回 phases 的 JSON(用于任务分解),后续调用返回 synthesis_content。 + """ + gateway = AsyncMock() + phases_json = json.dumps(phases) + decomp_response = MagicMock() + decomp_response.content = phases_json + synth_response = MagicMock() + synth_response.content = synthesis_content + call_count = [0] + + async def chat_side_effect(messages, model=None, **kwargs): + call_count[0] += 1 + if call_count[0] == 1: + return decomp_response + return synth_response + + gateway.chat = AsyncMock(side_effect=chat_side_effect) + return gateway + + +# ── _parse_phases 协作契约解析测试 ───────────────────────── + + +class TestParsePhasesContracts: + """_parse_phases 协作契约解析测试""" + + def test_parse_phases_with_contracts(self): + """_parse_phases 正确解析协作契约""" + content = json.dumps( + [ + { + "name": "规划", + "assigned_expert": "lead", + "task_description": "设计架构", + "depends_on": [], + "collaboration_contracts": [], + }, + { + "name": "后端", + "assigned_expert": "backend", + "task_description": "实现API", + "depends_on": ["规划"], + "collaboration_contracts": [ + { + "from_expert": "backend", + "to_expert": "frontend", + "content_description": "API 定义", + "status": "pending", + } + ], + }, + ] + ) + phases = TeamOrchestrator._parse_phases(content, ["lead", "backend", "frontend"], "lead") + assert len(phases) == 2 + # 规划阶段无契约 + assert phases[0].collaboration_contracts == [] + # 后端阶段有 1 个契约 + assert len(phases[1].collaboration_contracts) == 1 + contract = phases[1].collaboration_contracts[0] + assert contract.from_expert == "backend" + assert contract.to_expert == "frontend" + assert contract.content_description == "API 定义" + assert contract.status == "pending" + + def test_parse_phases_multiple_contracts(self): + """_parse_phases 解析多个协作契约""" + content = json.dumps( + [ + { + "name": "集成", + "assigned_expert": "lead", + "task_description": "集成前后端", + "depends_on": [], + "collaboration_contracts": [ + { + "from_expert": "backend", + "to_expert": "frontend", + "content_description": "API 定义", + }, + { + "from_expert": "frontend", + "to_expert": "backend", + "content_description": "前端调用约定", + "status": "delivered", + }, + ], + }, + ] + ) + phases = TeamOrchestrator._parse_phases(content, ["lead", "backend", "frontend"], "lead") + assert len(phases) == 1 + assert len(phases[0].collaboration_contracts) == 2 + assert phases[0].collaboration_contracts[0].from_expert == "backend" + assert phases[0].collaboration_contracts[1].from_expert == "frontend" + assert phases[0].collaboration_contracts[1].status == "delivered" + + def test_parse_phases_malformed_contracts_not_list(self): + """LLM 返回的协作契约不是列表时优雅降级为空""" + content = json.dumps( + [ + { + "name": "A", + "assigned_expert": "lead", + "task_description": "任务A", + "depends_on": [], + "collaboration_contracts": "not a list", + }, + ] + ) + phases = TeamOrchestrator._parse_phases(content, ["lead"], "lead") + assert len(phases) == 1 + assert phases[0].collaboration_contracts == [] + + def test_parse_phases_malformed_contracts_item_not_dict(self): + """LLM 返回的协作契约元素不是字典时降级为默认契约""" + content = json.dumps( + [ + { + "name": "A", + "assigned_expert": "lead", + "task_description": "任务A", + "depends_on": [], + "collaboration_contracts": ["not a dict", 42, None], + }, + ] + ) + phases = TeamOrchestrator._parse_phases(content, ["lead"], "lead") + assert len(phases) == 1 + # 非字典元素降级为默认 CollaborationContract + assert len(phases[0].collaboration_contracts) == 3 + for contract in phases[0].collaboration_contracts: + assert isinstance(contract, CollaborationContract) + assert contract.status == "pending" + + def test_parse_phases_missing_contracts_field(self): + """LLM 返回的阶段缺少 collaboration_contracts 字段时默认为空""" + content = json.dumps( + [ + { + "name": "A", + "assigned_expert": "lead", + "task_description": "任务A", + "depends_on": [], + }, + ] + ) + phases = TeamOrchestrator._parse_phases(content, ["lead"], "lead") + assert len(phases) == 1 + assert phases[0].collaboration_contracts == [] + + def test_parse_phases_contract_partial_fields(self): + """协作契约部分字段缺失时使用默认值""" + content = json.dumps( + [ + { + "name": "A", + "assigned_expert": "lead", + "task_description": "任务A", + "depends_on": [], + "collaboration_contracts": [ + {"from_expert": "backend"}, # 缺少其他字段 + ], + }, + ] + ) + phases = TeamOrchestrator._parse_phases(content, ["lead", "backend"], "lead") + assert len(phases) == 1 + contract = phases[0].collaboration_contracts[0] + assert contract.from_expert == "backend" + assert contract.to_expert == "" + assert contract.content_description == "" + assert contract.status == "pending" + + +# ── Lead 分解生成契约端到端测试 ──────────────────────────── + + +class TestDecomposeGeneratesContracts: + """Lead 分解任务生成协作契约的端到端测试""" + + @pytest.mark.asyncio + async def test_decompose_generates_contracts(self): + """Lead 分解任务时生成的 phases 包含协作契约""" + gateway = _make_mock_llm_gateway( + phases=[ + { + "name": "规划", + "assigned_expert": "lead", + "task_description": "设计架构", + "depends_on": [], + "collaboration_contracts": [], + }, + { + "name": "后端", + "assigned_expert": "backend", + "task_description": "实现API", + "depends_on": ["规划"], + "collaboration_contracts": [ + { + "from_expert": "backend", + "to_expert": "frontend", + "content_description": "API 定义", + "status": "pending", + } + ], + }, + { + "name": "前端", + "assigned_expert": "frontend", + "task_description": "实现UI", + "depends_on": ["后端"], + "collaboration_contracts": [], + }, + ] + ) + team = _make_team_with_experts( + expert_names=["lead", "backend", "frontend"], gateway=gateway + ) + orchestrator = TeamOrchestrator(team) + + result = await orchestrator.execute("开发功能") + + assert result["status"] == "completed" + plan = result["plan"] + assert len(plan.phases) == 3 + # 后端阶段应包含协作契约 + backend_phase = next(p for p in plan.phases if p.name == "后端") + assert len(backend_phase.collaboration_contracts) == 1 + contract = backend_phase.collaboration_contracts[0] + assert contract.from_expert == "backend" + assert contract.to_expert == "frontend" + assert contract.content_description == "API 定义" + # 规划和前端阶段无契约 + planning_phase = next(p for p in plan.phases if p.name == "规划") + assert planning_phase.collaboration_contracts == [] + frontend_phase = next(p for p in plan.phases if p.name == "前端") + assert frontend_phase.collaboration_contracts == [] + + @pytest.mark.asyncio + async def test_plan_update_includes_contracts(self): + """plan_update 事件包含协作契约信息""" + gateway = _make_mock_llm_gateway( + phases=[ + { + "name": "后端", + "assigned_expert": "backend", + "task_description": "实现API", + "depends_on": [], + "collaboration_contracts": [ + { + "from_expert": "backend", + "to_expert": "frontend", + "content_description": "API 定义", + "status": "pending", + } + ], + }, + ] + ) + team = _make_team_with_experts( + expert_names=["lead", "backend", "frontend"], gateway=gateway + ) + orchestrator = TeamOrchestrator(team) + + await orchestrator.execute("开发功能") + + calls = team._handoff_transport.send.call_args_list + plan_updates = [c[0][1] for c in calls if c[0][1].get("type") == "plan_update"] + assert len(plan_updates) >= 1 + # plan_update 的 plan_phases 应包含 collaboration_contracts 字段 + first_update = plan_updates[0] + assert "plan_phases" in first_update + phases_data = first_update["plan_phases"] + assert len(phases_data) == 1 + backend_phase_data = phases_data[0] + assert "collaboration_contracts" in backend_phase_data + assert len(backend_phase_data["collaboration_contracts"]) == 1 + contract_data = backend_phase_data["collaboration_contracts"][0] + assert contract_data["from_expert"] == "backend" + assert contract_data["to_expert"] == "frontend" + assert contract_data["content_description"] == "API 定义" + assert contract_data["status"] == "pending" + + @pytest.mark.asyncio + async def test_decompose_without_contracts_field_still_works(self): + """LLM 未返回 collaboration_contracts 字段时仍正常工作(向后兼容)""" + gateway = _make_mock_llm_gateway( + phases=[ + { + "name": "A", + "assigned_expert": "lead", + "task_description": "任务A", + "depends_on": [], + }, + { + "name": "B", + "assigned_expert": "backend", + "task_description": "任务B", + "depends_on": ["A"], + }, + ] + ) + team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway) + orchestrator = TeamOrchestrator(team) + + result = await orchestrator.execute("测试任务") + + assert result["status"] == "completed" + plan = result["plan"] + assert len(plan.phases) == 2 + # 所有阶段的协作契约都应为空列表 + for ph in plan.phases: + assert ph.collaboration_contracts == []