feat(experts): U3 Lead 验收环节 + 返工机制

- PlanPhase 添加 rework_count 和 review_feedback 字段
- 添加 _review_phase_output 方法,Lead 用 LLM 验收阶段输出
- _execute_execution_phase 重构为返工循环(MAX_REWORKS=2)
- 验收通过/返工/失败三种路径,发出 review_result 事件
- LLM 不可用时优雅降级直接通过
- 6 个新测试,全套 449 passed 无回归
This commit is contained in:
chiguyong 2026-06-24 14:09:18 +08:00
parent fef7ecea39
commit 62fcbc0feb
3 changed files with 437 additions and 109 deletions

View File

@ -61,6 +61,7 @@ class TeamOrchestrator:
MAX_PHASES = 10 # Maximum phases Lead Expert can decompose
MAX_RETRIES = 1 # Retry once on phase failure before marking failed
MAX_REWORKS = 2 # 返工次数上限,超过则标记阶段失败
MAX_DEBATE_ROUNDS = 4 # Hard cap on debate rounds per phase
MAX_DEBATES = 3 # Hard cap on auto-inserted debate phases per execution
STOP_COMMANDS = frozenset({"/stop", "停止", "stop", "结束"})
@ -462,124 +463,155 @@ class TeamOrchestrator:
)
# Read dependency outputs from in-memory phase results (faster than workspace)
dependency_outputs: dict[str, Any] = {}
for dep_id in phase.depends_on:
dep_phase = plan.get_phase(dep_id)
if dep_phase and dep_phase.status == PhaseStatus.COMPLETED and dep_phase.result:
dependency_outputs[dep_phase.name] = dep_phase.result.get(
"content", str(dep_phase.result)
)
# 按协作契约读取相关专家的输出(可见性 — 打破上下文隔离,但限定在契约范围内)
collaboration_outputs: dict[str, str] = {}
for contract in phase.collaboration_contracts:
if contract.from_expert and contract.status in ("delivered", "received"):
# 从已完成的阶段中找到 from_expert 的输出
for prev_phase in plan.phases:
if (
prev_phase.assigned_expert == contract.from_expert
and prev_phase.status == PhaseStatus.COMPLETED
and prev_phase.result
):
content = prev_phase.result.get("content", str(prev_phase.result))
collaboration_outputs[contract.from_expert] = content
break
# Emit expert_step event
await self._broadcast_event(
"expert_step",
{
"expert_id": expert.config.name,
"expert_name": expert.config.name,
"expert_color": expert.config.color,
"content": phase.task_description,
"step": phase.id,
"phase_id": phase.id,
"phase_name": phase.name,
},
)
# Build TaskMessage for execution with context isolation
# Context includes: task description + persona + dependency outputs
input_data: dict[str, Any] = {
"task": phase.task_description,
"team_id": self._team.team_id,
"phase_id": phase.id,
"phase_name": phase.name,
"is_phase": True,
"dependency_outputs": dependency_outputs,
}
if dependency_outputs:
input_data["context"] = "前置阶段输出:\n" + "\n---\n".join(
f"[{name}]:\n{output[:500] if isinstance(output, str) else str(output)[:500]}"
for name, output in dependency_outputs.items()
)
# 合并协作契约输出到 context可见性 — 让专家看到契约范围内相关专家的输出)
if collaboration_outputs:
collab_context = "协作专家输出:\n" + "\n---\n".join(
f"[{expert}]: {output[:500] if isinstance(output, str) else str(output)[:500]}"
for expert, output in collaboration_outputs.items()
)
if "context" in input_data:
input_data["context"] += "\n\n" + collab_context
else:
input_data["context"] = collab_context
input_data["collaboration_outputs"] = collaboration_outputs
task_msg = TaskMessage(
task_id=phase.id,
agent_name=expert.config.name,
task_type="team_phase",
priority=0,
input_data=input_data,
callback_url=None,
created_at=datetime.now(timezone.utc),
)
# Execute with context isolation: try creating independent agent via pool
agent = await self._get_isolated_agent(expert, phase)
lead = self._team.lead_expert or expert
last_error: str | None = None
result: dict[str, Any] | None = None
try:
for attempt in range(self.MAX_RETRIES + 1):
try:
task_result: TaskResult = await agent.execute(task_msg)
# U3: 返工循环 — 最多 MAX_REWORKS + 1 次1 次初始 + MAX_REWORKS 次返工)
for _rework_attempt in range(self.MAX_REWORKS + 1):
# 每次迭代重新读取依赖输出(前置阶段可能在返工期间完成)
dependency_outputs: dict[str, Any] = {}
for dep_id in phase.depends_on:
dep_phase = plan.get_phase(dep_id)
if dep_phase and dep_phase.status == PhaseStatus.COMPLETED and dep_phase.result:
dependency_outputs[dep_phase.name] = dep_phase.result.get(
"content", str(dep_phase.result)
)
if task_result.status != TaskStatus.COMPLETED.value:
last_error = task_result.error_message or "unknown error"
# 按协作契约读取相关专家的输出(可见性 — 打破上下文隔离,但限定在契约范围内)
collaboration_outputs: dict[str, str] = {}
for contract in phase.collaboration_contracts:
if contract.from_expert and contract.status in ("delivered", "received"):
# 从已完成的阶段中找到 from_expert 的输出
for prev_phase in plan.phases:
if (
prev_phase.assigned_expert == contract.from_expert
and prev_phase.status == PhaseStatus.COMPLETED
and prev_phase.result
):
content = prev_phase.result.get("content", str(prev_phase.result))
collaboration_outputs[contract.from_expert] = content
break
# Emit expert_step event
await self._broadcast_event(
"expert_step",
{
"expert_id": expert.config.name,
"expert_name": expert.config.name,
"expert_color": expert.config.color,
"content": phase.task_description,
"step": phase.id,
"phase_id": phase.id,
"phase_name": phase.name,
},
)
# Build TaskMessage for execution with context isolation
# Context includes: task description + persona + dependency outputs
input_data: dict[str, Any] = {
"task": phase.task_description,
"team_id": self._team.team_id,
"phase_id": phase.id,
"phase_name": phase.name,
"is_phase": True,
"dependency_outputs": dependency_outputs,
}
if dependency_outputs:
input_data["context"] = "前置阶段输出:\n" + "\n---\n".join(
f"[{name}]:\n"
f"{output[:500] if isinstance(output, str) else str(output)[:500]}"
for name, output in dependency_outputs.items()
)
# 合并协作契约输出到 context可见性 — 让专家看到契约范围内相关专家的输出)
if collaboration_outputs:
collab_context = "协作专家输出:\n" + "\n---\n".join(
f"[{exp}]: {output[:500] if isinstance(output, str) else str(output)[:500]}"
for exp, output in collaboration_outputs.items()
)
if "context" in input_data:
input_data["context"] += "\n\n" + collab_context
else:
input_data["context"] = collab_context
input_data["collaboration_outputs"] = collaboration_outputs
task_msg = TaskMessage(
task_id=phase.id,
agent_name=expert.config.name,
task_type="team_phase",
priority=0,
input_data=input_data,
callback_url=None,
created_at=datetime.now(timezone.utc),
)
# 执行专家任务带重试MAX_RETRIES 处理瞬时失败)
for attempt in range(self.MAX_RETRIES + 1):
try:
task_result: TaskResult = await agent.execute(task_msg)
if task_result.status != TaskStatus.COMPLETED.value:
last_error = task_result.error_message or "unknown error"
if attempt < self.MAX_RETRIES:
logger.info(f"Retrying phase {phase.id} (attempt {attempt + 1})")
continue
raise RuntimeError(f"Agent execution failed: {last_error}")
result = task_result.output_data or {"content": ""}
break # 执行成功,跳出重试循环
except Exception as e:
last_error = str(e)
if attempt < self.MAX_RETRIES:
logger.info(f"Retrying phase {phase.id} (attempt {attempt + 1})")
continue
raise RuntimeError(f"Agent execution failed: {last_error}")
raise
result = task_result.output_data or {"content": ""}
# Write phase output to SharedWorkspace
output_key = f"{plan.id}/phase/{phase.id}/output"
await self._team.workspace.write(
output_key,
result.get("content", str(result)),
expert.config.name,
)
# Update phase status
# Emit expert_result event
await self._broadcast_event(
"expert_result",
{
"expert_id": expert.config.name,
"expert_name": expert.config.name,
"expert_color": expert.config.color,
"content": result.get("content", str(result)),
"phase_id": phase.id,
},
)
# 按协作契约通知相关专家(可协助)
if phase.collaboration_contracts:
await self._notify_collaborators(phase, plan)
# U3: Lead 验收阶段输出
passed, feedback = await self._review_phase_output(lead, phase, result)
if passed:
# 验收通过
phase.status = PhaseStatus.COMPLETED
phase.result = result
# Write phase output to SharedWorkspace
output_key = f"{plan.id}/phase/{phase.id}/output"
await self._team.workspace.write(
output_key,
result.get("content", str(result)),
expert.config.name,
)
# Emit expert_result event
await self._broadcast_event(
"expert_result",
"review_result",
{
"expert_id": expert.config.name,
"expert_name": expert.config.name,
"expert_color": expert.config.color,
"content": result.get("content", str(result)),
"phase_id": phase.id,
"phase_name": phase.name,
"passed": True,
"feedback": "",
"expert": phase.assigned_expert,
},
)
# Emit phase_completed event
result_summary = result.get("content", str(result))
if isinstance(result_summary, str) and len(result_summary) > 200:
@ -592,19 +624,54 @@ class TeamOrchestrator:
"result_summary": result_summary,
},
)
# 按协作契约通知相关专家(可协助)
if phase.collaboration_contracts:
await self._notify_collaborators(phase, plan)
return result
else:
# 验收不合格 — 返工或标记失败
phase.rework_count += 1
phase.review_feedback = feedback
except Exception as e:
last_error = str(e)
if attempt < self.MAX_RETRIES:
logger.info(f"Retrying phase {phase.id} (attempt {attempt + 1})")
if phase.rework_count > self.MAX_REWORKS:
# 超过返工上限,标记失败
phase.status = PhaseStatus.FAILED
await self._broadcast_event(
"review_result",
{
"phase_id": phase.id,
"phase_name": phase.name,
"passed": False,
"feedback": feedback,
"expert": phase.assigned_expert,
"rework_count": phase.rework_count,
"final_status": "failed",
},
)
await self._broadcast_event(
"phase_failed",
{
"phase_id": phase.id,
"phase_name": phase.name,
"error": f"Review failed after "
f"{phase.rework_count} reworks: {feedback}",
},
)
return result
else:
# 准备返工,继续循环
await self._broadcast_event(
"review_result",
{
"phase_id": phase.id,
"phase_name": phase.name,
"passed": False,
"feedback": feedback,
"expert": phase.assigned_expert,
"rework_count": phase.rework_count,
"final_status": "rework",
},
)
# 在 task_description 中附加返工反馈
phase.task_description += f"\n\n[返工要求]: {feedback}"
continue
raise
finally:
# Clean up isolated agent if we created one
@ -653,6 +720,53 @@ class TeamOrchestrator:
# 更新契约状态
contract.status = "delivered"
async def _review_phase_output(
self, lead: Expert, phase: PlanPhase, result: dict[str, Any]
) -> tuple[bool, str]:
"""Lead 验收阶段输出质量。
LLM 判断输出是否满足阶段要求
返回 (passed, feedback)
- passed=True, feedback="" 验收通过
- passed=False, feedback="修改要求" 验收不合格需返工
LLM 不可用跳过验收直接通过优雅降级
"""
gateway = self._get_llm_gateway(lead)
if not gateway:
logger.warning("No LLM gateway available, skipping review")
return True, ""
content = result.get("content", str(result))
prompt = (
f"你是项目经理,负责验收阶段输出质量。\n\n"
f"阶段名称: {phase.name}\n"
f"阶段任务: {phase.task_description}\n"
f"阶段输出:\n{content[:2000]}\n\n"
f"请判断输出是否满足阶段任务要求。\n"
f"返回 JSON 格式:\n"
f'{{"passed": true/false, "feedback": "若不合格,说明修改要求;若合格,留空"}}\n'
f"只返回 JSON不要其他文字。"
)
try:
response = await gateway.chat(
messages=[{"role": "user", "content": prompt}],
model=self._get_model(lead),
)
# 解析 LLM 返回的 JSON
json_match = re.search(r"\{.*\}", response.content, re.DOTALL)
if json_match:
review = json.loads(json_match.group(0))
passed = review.get("passed", True)
feedback = review.get("feedback", "")
return bool(passed), str(feedback)
except Exception as e:
logger.warning(f"Review LLM call failed: {e}")
# 降级:验收通过
return True, ""
async def _execute_debate_phase(self, phase: PlanPhase, plan: TeamPlan) -> dict[str, Any]:
"""Execute a DEBATE phase: Lead-facilitated structured debate.

View File

@ -166,6 +166,8 @@ class PlanPhase:
- max_rounds: 最大辩论轮次默认 2硬上限 4
- skip: 是否跳过辩论逃生舱
collaboration_contracts: 协作契约列表定义该阶段涉及的专家协作关系
rework_count: 返工次数Lead 验收不合格后重新执行的次数
review_feedback: Lead 验收反馈不合格时的修改要求
"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
@ -178,6 +180,8 @@ class PlanPhase:
phase_type: PhaseType = PhaseType.EXECUTION
debate_config: dict[str, Any] | None = None
collaboration_contracts: list[CollaborationContract] = field(default_factory=list)
rework_count: int = 0
review_feedback: str | None = None
def to_dict(self) -> dict[str, Any]:
"""序列化为字典"""
@ -199,6 +203,8 @@ class PlanPhase:
"phase_type": self.phase_type.value,
"debate_config": self.debate_config,
"collaboration_contracts": [c.to_dict() for c in self.collaboration_contracts],
"rework_count": self.rework_count,
"review_feedback": self.review_feedback,
}
@classmethod
@ -222,6 +228,8 @@ class PlanPhase:
phase_type=PhaseType(data.get("phase_type", PhaseType.EXECUTION.value)),
debate_config=data.get("debate_config"),
collaboration_contracts=contracts,
rework_count=data.get("rework_count", 0),
review_feedback=data.get("review_feedback"),
)

View File

@ -131,6 +131,30 @@ def _make_mock_llm_gateway(
return gateway
def _make_review_gateway(review_results: list[tuple[bool, str]]) -> MagicMock:
"""创建 mock LLM gateway 用于验收。
review_results: (passed, feedback) 列表按顺序返回
若调用次数超过列表长度重复返回最后一个结果
"""
gateway = AsyncMock()
responses = []
for passed, feedback in review_results:
resp = MagicMock()
resp.content = json.dumps({"passed": passed, "feedback": feedback})
responses.append(resp)
call_count = [0]
async def chat_side_effect(messages, model=None, **kwargs):
idx = min(call_count[0], len(responses) - 1)
call_count[0] += 1
return responses[idx]
gateway.chat = AsyncMock(side_effect=chat_side_effect)
return gateway
# ── _parse_phases 协作契约解析测试 ─────────────────────────
@ -659,3 +683,185 @@ class TestCollaborationExecution:
calls = team._handoff_transport.send.call_args_list
notices = [c[0][1] for c in calls if c[0][1].get("type") == "collaboration_notice"]
assert len(notices) == 0
# ── U3: Lead 验收环节 + 返工机制测试 ──────────────────────
class TestPhaseReview:
"""U3: Lead 验收环节 + 返工机制测试"""
@pytest.mark.asyncio
async def test_review_passed(self):
"""验收合格时,阶段标记 COMPLETED发出 review_resultpassed事件"""
gateway = _make_review_gateway([(True, "")])
team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway)
orchestrator = TeamOrchestrator(team)
plan = TeamPlan(task="开发功能", lead_expert="lead")
phase = PlanPhase(
id="phase-1",
name="后端",
assigned_expert="backend",
task_description="实现API",
)
plan.phases = [phase]
result = await orchestrator._execute_execution_phase(phase, plan)
assert phase.status == PhaseStatus.COMPLETED
assert result is not None
# 验证 review_result 事件
calls = team._handoff_transport.send.call_args_list
reviews = [c[0][1] for c in calls if c[0][1].get("type") == "review_result"]
assert len(reviews) == 1
assert reviews[0]["passed"] is True
@pytest.mark.asyncio
async def test_review_failed_rework(self):
"""验收不合格时返工,附 feedback重新执行后通过"""
# 第一次验收不合格,第二次验收通过
gateway = _make_review_gateway([(False, "需要增加错误处理"), (True, "")])
team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway)
orchestrator = TeamOrchestrator(team)
plan = TeamPlan(task="开发功能", lead_expert="lead")
phase = PlanPhase(
id="phase-1",
name="后端",
assigned_expert="backend",
task_description="实现API",
)
plan.phases = [phase]
result = await orchestrator._execute_execution_phase(phase, plan)
assert phase.status == PhaseStatus.COMPLETED
assert phase.rework_count == 1
assert phase.review_feedback == "需要增加错误处理"
assert result is not None
# 验证 task_description 被附加了返工反馈
assert "[返工要求]" in phase.task_description
assert "需要增加错误处理" in phase.task_description
# 验证 review_result 事件:第一次 rework第二次 passed
calls = team._handoff_transport.send.call_args_list
reviews = [c[0][1] for c in calls if c[0][1].get("type") == "review_result"]
assert len(reviews) == 2
assert reviews[0]["passed"] is False
assert reviews[0]["final_status"] == "rework"
assert reviews[1]["passed"] is True
@pytest.mark.asyncio
async def test_review_max_reworks_exceeded(self):
"""返工次数达到 MAX_REWORKS 仍不合格,标记 FAILED"""
# 始终验收不合格
gateway = _make_review_gateway([(False, "不合格")] * 10)
team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway)
orchestrator = TeamOrchestrator(team)
plan = TeamPlan(task="开发功能", lead_expert="lead")
phase = PlanPhase(
id="phase-1",
name="后端",
assigned_expert="backend",
task_description="实现API",
)
plan.phases = [phase]
await orchestrator._execute_execution_phase(phase, plan)
assert phase.status == PhaseStatus.FAILED
assert phase.rework_count == TeamOrchestrator.MAX_REWORKS + 1
# 验证 phase_failed 事件
calls = team._handoff_transport.send.call_args_list
failures = [c[0][1] for c in calls if c[0][1].get("type") == "phase_failed"]
assert len(failures) == 1
# 验证最后一个 review_result 事件是 failed
reviews = [c[0][1] for c in calls if c[0][1].get("type") == "review_result"]
assert reviews[-1]["final_status"] == "failed"
@pytest.mark.asyncio
async def test_review_no_llm_gateway_skips(self):
"""Lead LLM 不可用时,跳过验收直接标记 COMPLETED优雅降级"""
# 不传 gateway所有专家的 _llm_gateway 为 None
team = _make_team_with_experts(expert_names=["lead", "backend"])
orchestrator = TeamOrchestrator(team)
plan = TeamPlan(task="开发功能", lead_expert="lead")
phase = PlanPhase(
id="phase-1",
name="后端",
assigned_expert="backend",
task_description="实现API",
)
plan.phases = [phase]
result = await orchestrator._execute_execution_phase(phase, plan)
assert phase.status == PhaseStatus.COMPLETED
assert result is not None
# 验证没有发生返工
assert phase.rework_count == 0
# 验证只执行了一次(没有返工)
calls = team._handoff_transport.send.call_args_list
steps = [c[0][1] for c in calls if c[0][1].get("type") == "expert_step"]
assert len(steps) == 1
@pytest.mark.asyncio
async def test_review_result_event_content(self):
"""review_result 事件包含正确的 passed/feedback/expert 字段"""
gateway = _make_review_gateway([(True, "")])
team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway)
orchestrator = TeamOrchestrator(team)
plan = TeamPlan(task="开发功能", lead_expert="lead")
phase = PlanPhase(
id="phase-1",
name="后端",
assigned_expert="backend",
task_description="实现API",
)
plan.phases = [phase]
await orchestrator._execute_execution_phase(phase, plan)
calls = team._handoff_transport.send.call_args_list
reviews = [c[0][1] for c in calls if c[0][1].get("type") == "review_result"]
assert len(reviews) == 1
review = reviews[0]
assert review["phase_id"] == "phase-1"
assert review["phase_name"] == "后端"
assert review["passed"] is True
assert review["feedback"] == ""
assert review["expert"] == "backend"
@pytest.mark.asyncio
async def test_rework_feedback_appended_to_task(self):
"""返工时 feedback 被附加到 task_description"""
gateway = _make_review_gateway([(False, "请增加单元测试"), (True, "")])
team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway)
orchestrator = TeamOrchestrator(team)
original_task = "实现API"
plan = TeamPlan(task="开发功能", lead_expert="lead")
phase = PlanPhase(
id="phase-1",
name="后端",
assigned_expert="backend",
task_description=original_task,
)
plan.phases = [phase]
await orchestrator._execute_execution_phase(phase, plan)
# 验证 task_description 被附加了返工反馈
assert original_task in phase.task_description
assert "[返工要求]: 请增加单元测试" in phase.task_description
# 验证第二次执行的 task_msg 包含返工反馈
backend_expert = team.get_expert("backend")
# agent.execute 被调用了 2 次1 次初始 + 1 次返工)
assert backend_expert.agent.execute.call_count == 2
# 第二次调用的 task_msg 应包含返工反馈
second_call_args = backend_expert.agent.execute.call_args_list[1]
second_task_msg = second_call_args.args[0]
assert "[返工要求]" in second_task_msg.input_data["task"]