feat(experts): U2 协作契约执行 — 专家可见 + 主动通知

- _execute_execution_phase 按协作契约读取相关专家输出(可见性)
- 添加 _notify_collaborators 方法,完成后通知相关专家(可协助)
- 发出 collaboration_notice 事件,契约状态更新为 delivered
- 7 个新测试,全套 443 passed 无回归
This commit is contained in:
chiguyong 2026-06-24 13:54:38 +08:00
parent f219c5f016
commit c46cf06f6d
2 changed files with 296 additions and 1 deletions

View File

@ -470,6 +470,21 @@ class TeamOrchestrator:
"content", str(dep_phase.result) "content", str(dep_phase.result)
) )
# 按协作契约读取相关专家的输出(可见性 — 打破上下文隔离,但限定在契约范围内)
collaboration_outputs: dict[str, str] = {}
for contract in phase.collaboration_contracts:
if contract.from_expert and contract.status in ("delivered", "received"):
# 从已完成的阶段中找到 from_expert 的输出
for prev_phase in plan.phases:
if (
prev_phase.assigned_expert == contract.from_expert
and prev_phase.status == PhaseStatus.COMPLETED
and prev_phase.result
):
content = prev_phase.result.get("content", str(prev_phase.result))
collaboration_outputs[contract.from_expert] = content
break
# Emit expert_step event # Emit expert_step event
await self._broadcast_event( await self._broadcast_event(
"expert_step", "expert_step",
@ -500,6 +515,18 @@ class TeamOrchestrator:
for name, output in dependency_outputs.items() for name, output in dependency_outputs.items()
) )
# 合并协作契约输出到 context可见性 — 让专家看到契约范围内相关专家的输出)
if collaboration_outputs:
collab_context = "协作专家输出:\n" + "\n---\n".join(
f"[{expert}]: {output[:500] if isinstance(output, str) else str(output)[:500]}"
for expert, output in collaboration_outputs.items()
)
if "context" in input_data:
input_data["context"] += "\n\n" + collab_context
else:
input_data["context"] = collab_context
input_data["collaboration_outputs"] = collaboration_outputs
task_msg = TaskMessage( task_msg = TaskMessage(
task_id=phase.id, task_id=phase.id,
agent_name=expert.config.name, agent_name=expert.config.name,
@ -566,6 +593,10 @@ class TeamOrchestrator:
}, },
) )
# 按协作契约通知相关专家(可协助)
if phase.collaboration_contracts:
await self._notify_collaborators(phase, plan)
return result return result
except Exception as e: except Exception as e:
@ -592,6 +623,36 @@ class TeamOrchestrator:
) )
raise RuntimeError(f"Phase {phase.id} ({phase.name}) failed: {last_error}") raise RuntimeError(f"Phase {phase.id} ({phase.name}) failed: {last_error}")
async def _notify_collaborators(self, phase: PlanPhase, plan: TeamPlan) -> None:
"""阶段完成后,按协作契约通知相关专家。
遍历当前阶段的 collaboration_contracts对每个 to_expert 发出
collaboration_notice 事件并更新契约状态为 delivered
"""
for contract in phase.collaboration_contracts:
if not contract.to_expert or contract.status == "delivered":
continue
# 获取接收方专家信息
to_expert = self._team.get_expert(contract.to_expert)
expert_color = to_expert.config.color if to_expert else "#888888"
await self._broadcast_event(
"collaboration_notice",
{
"from_expert": phase.assigned_expert,
"to_expert": contract.to_expert,
"content_description": contract.content_description,
"phase_id": phase.id,
"phase_name": phase.name,
"output_key": f"{plan.id}/phase/{phase.id}/output",
"expert_color": expert_color,
},
)
# 更新契约状态
contract.status = "delivered"
async def _execute_debate_phase(self, phase: PlanPhase, plan: TeamPlan) -> dict[str, Any]: async def _execute_debate_phase(self, phase: PlanPhase, plan: TeamPlan) -> dict[str, Any]:
"""Execute a DEBATE phase: Lead-facilitated structured debate. """Execute a DEBATE phase: Lead-facilitated structured debate.

View File

@ -19,7 +19,7 @@ from agentkit.core.protocol import TaskResult, TaskStatus
from agentkit.experts.config import ExpertConfig from agentkit.experts.config import ExpertConfig
from agentkit.experts.expert import Expert from agentkit.experts.expert import Expert
from agentkit.experts.orchestrator import TeamOrchestrator from agentkit.experts.orchestrator import TeamOrchestrator
from agentkit.experts.plan import CollaborationContract from agentkit.experts.plan import CollaborationContract, PhaseStatus, PlanPhase, TeamPlan
from agentkit.experts.team import ExpertTeam from agentkit.experts.team import ExpertTeam
@ -425,3 +425,237 @@ class TestDecomposeGeneratesContracts:
# 所有阶段的协作契约都应为空列表 # 所有阶段的协作契约都应为空列表
for ph in plan.phases: for ph in plan.phases:
assert ph.collaboration_contracts == [] assert ph.collaboration_contracts == []
# ── U2: 协作契约执行测试 ──────────────────────────────────
class TestCollaborationExecution:
"""U2: 协作契约执行 — 专家可见 + 主动通知测试"""
@pytest.mark.asyncio
async def test_expert_reads_collaboration_outputs(self):
"""专家执行时能读到协作契约中 from_expert 的输出"""
team = _make_team_with_experts(expert_names=["lead", "backend", "frontend"])
orchestrator = TeamOrchestrator(team)
# 创建计划backend 阶段已完成frontend 阶段有待执行的协作契约
plan = TeamPlan(task="开发功能", lead_expert="lead")
backend_phase = PlanPhase(
id="phase-backend",
name="后端",
assigned_expert="backend",
task_description="实现API",
depends_on=[],
status=PhaseStatus.COMPLETED,
result={"content": "API definition: GET /users"},
)
frontend_phase = PlanPhase(
id="phase-frontend",
name="前端",
assigned_expert="frontend",
task_description="实现UI",
depends_on=["phase-backend"],
status=PhaseStatus.PENDING,
collaboration_contracts=[
CollaborationContract(
from_expert="backend",
to_expert="frontend",
content_description="API 定义",
status="delivered", # 已交付,触发读取
)
],
)
plan.phases = [backend_phase, frontend_phase]
await orchestrator._execute_execution_phase(frontend_phase, plan)
# 验证 frontend 专家的 agent.execute 收到了 collaboration_outputs
frontend_expert = team.get_expert("frontend")
task_msg = frontend_expert.agent.execute.call_args.args[0]
assert "collaboration_outputs" in task_msg.input_data
assert "backend" in task_msg.input_data["collaboration_outputs"]
assert "API definition" in task_msg.input_data["collaboration_outputs"]["backend"]
# 验证 context 中包含协作专家输出
assert "协作专家输出" in task_msg.input_data["context"]
@pytest.mark.asyncio
async def test_expert_notifies_collaborators(self):
"""专家完成后,协作契约中的 to_expert 收到 collaboration_notice 事件"""
team = _make_team_with_experts(expert_names=["lead", "backend", "frontend"])
orchestrator = TeamOrchestrator(team)
plan = TeamPlan(task="开发功能", lead_expert="lead")
backend_phase = PlanPhase(
id="phase-backend",
name="后端",
assigned_expert="backend",
task_description="实现API",
collaboration_contracts=[
CollaborationContract(
from_expert="backend",
to_expert="frontend",
content_description="API 定义",
status="pending",
)
],
)
plan.phases = [backend_phase]
await orchestrator._notify_collaborators(backend_phase, plan)
calls = team._handoff_transport.send.call_args_list
notices = [c[0][1] for c in calls if c[0][1].get("type") == "collaboration_notice"]
assert len(notices) == 1
assert notices[0]["to_expert"] == "frontend"
@pytest.mark.asyncio
async def test_contract_status_updated_to_delivered(self):
"""契约状态从 pending 更新为 delivered"""
team = _make_team_with_experts(expert_names=["lead", "backend", "frontend"])
orchestrator = TeamOrchestrator(team)
plan = TeamPlan(task="开发功能", lead_expert="lead")
contract = CollaborationContract(
from_expert="backend",
to_expert="frontend",
content_description="API 定义",
status="pending",
)
backend_phase = PlanPhase(
id="phase-backend",
name="后端",
assigned_expert="backend",
task_description="实现API",
collaboration_contracts=[contract],
)
plan.phases = [backend_phase]
await orchestrator._notify_collaborators(backend_phase, plan)
assert contract.status == "delivered"
@pytest.mark.asyncio
async def test_no_collaboration_contracts_backward_compatible(self):
"""协作契约为空时,行为与当前一致(向后兼容)"""
team = _make_team_with_experts(expert_names=["lead", "backend"])
orchestrator = TeamOrchestrator(team)
plan = TeamPlan(task="开发功能", lead_expert="lead")
backend_phase = PlanPhase(
id="phase-backend",
name="后端",
assigned_expert="backend",
task_description="实现API",
status=PhaseStatus.PENDING,
collaboration_contracts=[],
)
plan.phases = [backend_phase]
result = await orchestrator._execute_execution_phase(backend_phase, plan)
# 验证正常执行
assert result is not None
# 验证 input_data 中没有 collaboration_outputs
backend_expert = team.get_expert("backend")
task_msg = backend_expert.agent.execute.call_args.args[0]
assert "collaboration_outputs" not in task_msg.input_data
# 验证没有 collaboration_notice 事件
calls = team._handoff_transport.send.call_args_list
notices = [c[0][1] for c in calls if c[0][1].get("type") == "collaboration_notice"]
assert len(notices) == 0
@pytest.mark.asyncio
async def test_collaboration_notice_event_content(self):
"""collaboration_notice 事件包含正确的 from_expert, to_expert, content_description"""
team = _make_team_with_experts(expert_names=["lead", "backend", "frontend"])
orchestrator = TeamOrchestrator(team)
plan = TeamPlan(task="开发功能", lead_expert="lead")
backend_phase = PlanPhase(
id="phase-backend",
name="后端",
assigned_expert="backend",
task_description="实现API",
collaboration_contracts=[
CollaborationContract(
from_expert="backend",
to_expert="frontend",
content_description="API 定义",
status="pending",
)
],
)
plan.phases = [backend_phase]
await orchestrator._notify_collaborators(backend_phase, plan)
calls = team._handoff_transport.send.call_args_list
notices = [c[0][1] for c in calls if c[0][1].get("type") == "collaboration_notice"]
assert len(notices) == 1
notice = notices[0]
assert notice["from_expert"] == "backend"
assert notice["to_expert"] == "frontend"
assert notice["content_description"] == "API 定义"
assert notice["phase_id"] == "phase-backend"
assert notice["phase_name"] == "后端"
assert "output_key" in notice
assert "expert_color" in notice
@pytest.mark.asyncio
async def test_notify_skips_empty_to_expert(self):
"""to_expert 为空时跳过通知"""
team = _make_team_with_experts(expert_names=["lead", "backend"])
orchestrator = TeamOrchestrator(team)
plan = TeamPlan(task="开发功能", lead_expert="lead")
backend_phase = PlanPhase(
id="phase-backend",
name="后端",
assigned_expert="backend",
task_description="实现API",
collaboration_contracts=[
CollaborationContract(
from_expert="backend",
to_expert="", # 空的 to_expert
content_description="API 定义",
status="pending",
)
],
)
plan.phases = [backend_phase]
await orchestrator._notify_collaborators(backend_phase, plan)
calls = team._handoff_transport.send.call_args_list
notices = [c[0][1] for c in calls if c[0][1].get("type") == "collaboration_notice"]
assert len(notices) == 0
@pytest.mark.asyncio
async def test_notify_skips_already_delivered(self):
"""契约状态已为 delivered 时跳过通知"""
team = _make_team_with_experts(expert_names=["lead", "backend", "frontend"])
orchestrator = TeamOrchestrator(team)
plan = TeamPlan(task="开发功能", lead_expert="lead")
backend_phase = PlanPhase(
id="phase-backend",
name="后端",
assigned_expert="backend",
task_description="实现API",
collaboration_contracts=[
CollaborationContract(
from_expert="backend",
to_expert="frontend",
content_description="API 定义",
status="delivered", # 已交付
)
],
)
plan.phases = [backend_phase]
await orchestrator._notify_collaborators(backend_phase, plan)
calls = team._handoff_transport.send.call_args_list
notices = [c[0][1] for c in calls if c[0][1].get("type") == "collaboration_notice"]
assert len(notices) == 0