From c46cf06f6d9906826cf362397b1186d93dfe6603 Mon Sep 17 00:00:00 2001 From: chiguyong Date: Wed, 24 Jun 2026 13:54:38 +0800 Subject: [PATCH] =?UTF-8?q?feat(experts):=20U2=20=E5=8D=8F=E4=BD=9C?= =?UTF-8?q?=E5=A5=91=E7=BA=A6=E6=89=A7=E8=A1=8C=20=E2=80=94=20=E4=B8=93?= =?UTF-8?q?=E5=AE=B6=E5=8F=AF=E8=A7=81=20+=20=E4=B8=BB=E5=8A=A8=E9=80=9A?= =?UTF-8?q?=E7=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - _execute_execution_phase 按协作契约读取相关专家输出(可见性) - 添加 _notify_collaborators 方法,完成后通知相关专家(可协助) - 发出 collaboration_notice 事件,契约状态更新为 delivered - 7 个新测试,全套 443 passed 无回归 --- src/agentkit/experts/orchestrator.py | 61 +++++ tests/unit/experts/test_pm_collaboration.py | 236 +++++++++++++++++++- 2 files changed, 296 insertions(+), 1 deletion(-) diff --git a/src/agentkit/experts/orchestrator.py b/src/agentkit/experts/orchestrator.py index ee2b544..0704902 100644 --- a/src/agentkit/experts/orchestrator.py +++ b/src/agentkit/experts/orchestrator.py @@ -470,6 +470,21 @@ class TeamOrchestrator: "content", str(dep_phase.result) ) + # 按协作契约读取相关专家的输出(可见性 — 打破上下文隔离,但限定在契约范围内) + collaboration_outputs: dict[str, str] = {} + for contract in phase.collaboration_contracts: + if contract.from_expert and contract.status in ("delivered", "received"): + # 从已完成的阶段中找到 from_expert 的输出 + for prev_phase in plan.phases: + if ( + prev_phase.assigned_expert == contract.from_expert + and prev_phase.status == PhaseStatus.COMPLETED + and prev_phase.result + ): + content = prev_phase.result.get("content", str(prev_phase.result)) + collaboration_outputs[contract.from_expert] = content + break + # Emit expert_step event await self._broadcast_event( "expert_step", @@ -500,6 +515,18 @@ class TeamOrchestrator: for name, output in dependency_outputs.items() ) + # 合并协作契约输出到 context(可见性 — 让专家看到契约范围内相关专家的输出) + if collaboration_outputs: + collab_context = "协作专家输出:\n" + "\n---\n".join( + f"[{expert}]: {output[:500] if isinstance(output, str) else str(output)[:500]}" + for expert, output in collaboration_outputs.items() + ) + if "context" in input_data: + input_data["context"] += "\n\n" + collab_context + else: + input_data["context"] = collab_context + input_data["collaboration_outputs"] = collaboration_outputs + task_msg = TaskMessage( task_id=phase.id, agent_name=expert.config.name, @@ -566,6 +593,10 @@ class TeamOrchestrator: }, ) + # 按协作契约通知相关专家(可协助) + if phase.collaboration_contracts: + await self._notify_collaborators(phase, plan) + return result except Exception as e: @@ -592,6 +623,36 @@ class TeamOrchestrator: ) raise RuntimeError(f"Phase {phase.id} ({phase.name}) failed: {last_error}") + async def _notify_collaborators(self, phase: PlanPhase, plan: TeamPlan) -> None: + """阶段完成后,按协作契约通知相关专家。 + + 遍历当前阶段的 collaboration_contracts,对每个 to_expert 发出 + collaboration_notice 事件,并更新契约状态为 delivered。 + """ + for contract in phase.collaboration_contracts: + if not contract.to_expert or contract.status == "delivered": + continue + + # 获取接收方专家信息 + to_expert = self._team.get_expert(contract.to_expert) + expert_color = to_expert.config.color if to_expert else "#888888" + + await self._broadcast_event( + "collaboration_notice", + { + "from_expert": phase.assigned_expert, + "to_expert": contract.to_expert, + "content_description": contract.content_description, + "phase_id": phase.id, + "phase_name": phase.name, + "output_key": f"{plan.id}/phase/{phase.id}/output", + "expert_color": expert_color, + }, + ) + + # 更新契约状态 + contract.status = "delivered" + async def _execute_debate_phase(self, phase: PlanPhase, plan: TeamPlan) -> dict[str, Any]: """Execute a DEBATE phase: Lead-facilitated structured debate. diff --git a/tests/unit/experts/test_pm_collaboration.py b/tests/unit/experts/test_pm_collaboration.py index b0ba429..d566d21 100644 --- a/tests/unit/experts/test_pm_collaboration.py +++ b/tests/unit/experts/test_pm_collaboration.py @@ -19,7 +19,7 @@ from agentkit.core.protocol import TaskResult, TaskStatus from agentkit.experts.config import ExpertConfig from agentkit.experts.expert import Expert from agentkit.experts.orchestrator import TeamOrchestrator -from agentkit.experts.plan import CollaborationContract +from agentkit.experts.plan import CollaborationContract, PhaseStatus, PlanPhase, TeamPlan from agentkit.experts.team import ExpertTeam @@ -425,3 +425,237 @@ class TestDecomposeGeneratesContracts: # 所有阶段的协作契约都应为空列表 for ph in plan.phases: assert ph.collaboration_contracts == [] + + +# ── U2: 协作契约执行测试 ────────────────────────────────── + + +class TestCollaborationExecution: + """U2: 协作契约执行 — 专家可见 + 主动通知测试""" + + @pytest.mark.asyncio + async def test_expert_reads_collaboration_outputs(self): + """专家执行时能读到协作契约中 from_expert 的输出""" + team = _make_team_with_experts(expert_names=["lead", "backend", "frontend"]) + orchestrator = TeamOrchestrator(team) + + # 创建计划:backend 阶段已完成,frontend 阶段有待执行的协作契约 + plan = TeamPlan(task="开发功能", lead_expert="lead") + backend_phase = PlanPhase( + id="phase-backend", + name="后端", + assigned_expert="backend", + task_description="实现API", + depends_on=[], + status=PhaseStatus.COMPLETED, + result={"content": "API definition: GET /users"}, + ) + frontend_phase = PlanPhase( + id="phase-frontend", + name="前端", + assigned_expert="frontend", + task_description="实现UI", + depends_on=["phase-backend"], + status=PhaseStatus.PENDING, + collaboration_contracts=[ + CollaborationContract( + from_expert="backend", + to_expert="frontend", + content_description="API 定义", + status="delivered", # 已交付,触发读取 + ) + ], + ) + plan.phases = [backend_phase, frontend_phase] + + await orchestrator._execute_execution_phase(frontend_phase, plan) + + # 验证 frontend 专家的 agent.execute 收到了 collaboration_outputs + frontend_expert = team.get_expert("frontend") + task_msg = frontend_expert.agent.execute.call_args.args[0] + assert "collaboration_outputs" in task_msg.input_data + assert "backend" in task_msg.input_data["collaboration_outputs"] + assert "API definition" in task_msg.input_data["collaboration_outputs"]["backend"] + # 验证 context 中包含协作专家输出 + assert "协作专家输出" in task_msg.input_data["context"] + + @pytest.mark.asyncio + async def test_expert_notifies_collaborators(self): + """专家完成后,协作契约中的 to_expert 收到 collaboration_notice 事件""" + team = _make_team_with_experts(expert_names=["lead", "backend", "frontend"]) + orchestrator = TeamOrchestrator(team) + + plan = TeamPlan(task="开发功能", lead_expert="lead") + backend_phase = PlanPhase( + id="phase-backend", + name="后端", + assigned_expert="backend", + task_description="实现API", + collaboration_contracts=[ + CollaborationContract( + from_expert="backend", + to_expert="frontend", + content_description="API 定义", + status="pending", + ) + ], + ) + plan.phases = [backend_phase] + + await orchestrator._notify_collaborators(backend_phase, plan) + + calls = team._handoff_transport.send.call_args_list + notices = [c[0][1] for c in calls if c[0][1].get("type") == "collaboration_notice"] + assert len(notices) == 1 + assert notices[0]["to_expert"] == "frontend" + + @pytest.mark.asyncio + async def test_contract_status_updated_to_delivered(self): + """契约状态从 pending 更新为 delivered""" + team = _make_team_with_experts(expert_names=["lead", "backend", "frontend"]) + orchestrator = TeamOrchestrator(team) + + plan = TeamPlan(task="开发功能", lead_expert="lead") + contract = CollaborationContract( + from_expert="backend", + to_expert="frontend", + content_description="API 定义", + status="pending", + ) + backend_phase = PlanPhase( + id="phase-backend", + name="后端", + assigned_expert="backend", + task_description="实现API", + collaboration_contracts=[contract], + ) + plan.phases = [backend_phase] + + await orchestrator._notify_collaborators(backend_phase, plan) + + assert contract.status == "delivered" + + @pytest.mark.asyncio + async def test_no_collaboration_contracts_backward_compatible(self): + """协作契约为空时,行为与当前一致(向后兼容)""" + team = _make_team_with_experts(expert_names=["lead", "backend"]) + orchestrator = TeamOrchestrator(team) + + plan = TeamPlan(task="开发功能", lead_expert="lead") + backend_phase = PlanPhase( + id="phase-backend", + name="后端", + assigned_expert="backend", + task_description="实现API", + status=PhaseStatus.PENDING, + collaboration_contracts=[], + ) + plan.phases = [backend_phase] + + result = await orchestrator._execute_execution_phase(backend_phase, plan) + + # 验证正常执行 + assert result is not None + # 验证 input_data 中没有 collaboration_outputs + backend_expert = team.get_expert("backend") + task_msg = backend_expert.agent.execute.call_args.args[0] + assert "collaboration_outputs" not in task_msg.input_data + # 验证没有 collaboration_notice 事件 + calls = team._handoff_transport.send.call_args_list + notices = [c[0][1] for c in calls if c[0][1].get("type") == "collaboration_notice"] + assert len(notices) == 0 + + @pytest.mark.asyncio + async def test_collaboration_notice_event_content(self): + """collaboration_notice 事件包含正确的 from_expert, to_expert, content_description""" + team = _make_team_with_experts(expert_names=["lead", "backend", "frontend"]) + orchestrator = TeamOrchestrator(team) + + plan = TeamPlan(task="开发功能", lead_expert="lead") + backend_phase = PlanPhase( + id="phase-backend", + name="后端", + assigned_expert="backend", + task_description="实现API", + collaboration_contracts=[ + CollaborationContract( + from_expert="backend", + to_expert="frontend", + content_description="API 定义", + status="pending", + ) + ], + ) + plan.phases = [backend_phase] + + await orchestrator._notify_collaborators(backend_phase, plan) + + calls = team._handoff_transport.send.call_args_list + notices = [c[0][1] for c in calls if c[0][1].get("type") == "collaboration_notice"] + assert len(notices) == 1 + notice = notices[0] + assert notice["from_expert"] == "backend" + assert notice["to_expert"] == "frontend" + assert notice["content_description"] == "API 定义" + assert notice["phase_id"] == "phase-backend" + assert notice["phase_name"] == "后端" + assert "output_key" in notice + assert "expert_color" in notice + + @pytest.mark.asyncio + async def test_notify_skips_empty_to_expert(self): + """to_expert 为空时跳过通知""" + team = _make_team_with_experts(expert_names=["lead", "backend"]) + orchestrator = TeamOrchestrator(team) + + plan = TeamPlan(task="开发功能", lead_expert="lead") + backend_phase = PlanPhase( + id="phase-backend", + name="后端", + assigned_expert="backend", + task_description="实现API", + collaboration_contracts=[ + CollaborationContract( + from_expert="backend", + to_expert="", # 空的 to_expert + content_description="API 定义", + status="pending", + ) + ], + ) + plan.phases = [backend_phase] + + await orchestrator._notify_collaborators(backend_phase, plan) + + calls = team._handoff_transport.send.call_args_list + notices = [c[0][1] for c in calls if c[0][1].get("type") == "collaboration_notice"] + assert len(notices) == 0 + + @pytest.mark.asyncio + async def test_notify_skips_already_delivered(self): + """契约状态已为 delivered 时跳过通知""" + team = _make_team_with_experts(expert_names=["lead", "backend", "frontend"]) + orchestrator = TeamOrchestrator(team) + + plan = TeamPlan(task="开发功能", lead_expert="lead") + backend_phase = PlanPhase( + id="phase-backend", + name="后端", + assigned_expert="backend", + task_description="实现API", + collaboration_contracts=[ + CollaborationContract( + from_expert="backend", + to_expert="frontend", + content_description="API 定义", + status="delivered", # 已交付 + ) + ], + ) + plan.phases = [backend_phase] + + await orchestrator._notify_collaborators(backend_phase, plan) + + calls = team._handoff_transport.send.call_args_list + notices = [c[0][1] for c in calls if c[0][1].get("type") == "collaboration_notice"] + assert len(notices) == 0