feat(experts): U4 专家风险标记 + risk_flagged 事件

- orchestrator 新增 _parse_risk_flags 静态方法,正则解析 [RISK: ...] 标记
- _execute_execution_phase 在协作通知后、验收前解析风险标记
- 风险标记通过 risk_flagged 事件广播,供前端/CLI 渲染
- 无风险标记时行为不变,向后兼容
- 新增 TestRiskFlagging 7 个测试(单/多/无/格式错误/事件发出/内容/兼容)
This commit is contained in:
chiguyong 2026-06-24 14:17:58 +08:00
parent 62fcbc0feb
commit 5487cca199
2 changed files with 178 additions and 1 deletions

View File

@ -595,6 +595,24 @@ class TeamOrchestrator:
if phase.collaboration_contracts: if phase.collaboration_contracts:
await self._notify_collaborators(phase, plan) await self._notify_collaborators(phase, plan)
# U4: 解析专家输出中的风险标记,发出 risk_flagged 事件
# ponytail: 风险标记通过验收环节间接处理 Lead 决策。
# 验收 prompt 包含输出内容Lead 可在验收反馈中要求返工。
# 未来如需更复杂的风险决策(如自动插入辩论),可在此扩展。
content = result.get("content", str(result))
risk_flags = self._parse_risk_flags(content)
for risk_desc in risk_flags:
await self._broadcast_event(
"risk_flagged",
{
"expert": phase.assigned_expert,
"expert_name": phase.assigned_expert,
"risk_description": risk_desc,
"phase_id": phase.id,
"phase_name": phase.name,
},
)
# U3: Lead 验收阶段输出 # U3: Lead 验收阶段输出
passed, feedback = await self._review_phase_output(lead, phase, result) passed, feedback = await self._review_phase_output(lead, phase, result)
@ -767,6 +785,27 @@ class TeamOrchestrator:
# 降级:验收通过 # 降级:验收通过
return True, "" return True, ""
@staticmethod
def _parse_risk_flags(content: str) -> list[str]:
"""从专家输出中解析风险标记。
风险标记格式[RISK: <风险描述>]
可在一行中出现多个也可跨多行
Returns:
风险描述列表空列表表示无风险标记
"""
# 匹配 [RISK: ...] 格式,允许跨行
pattern = re.compile(r"\[RISK:\s*(.+?)\]", re.DOTALL)
matches = pattern.findall(content)
# 清理每个匹配项:去除多余空白,截断过长的描述
risks: list[str] = []
for match in matches:
risk = match.strip().replace("\n", " ")
if risk and len(risk) <= 500: # 限制风险描述长度
risks.append(risk)
return risks
async def _execute_debate_phase(self, phase: PlanPhase, plan: TeamPlan) -> dict[str, Any]: async def _execute_debate_phase(self, phase: PlanPhase, plan: TeamPlan) -> dict[str, Any]:
"""Execute a DEBATE phase: Lead-facilitated structured debate. """Execute a DEBATE phase: Lead-facilitated structured debate.

View File

@ -861,7 +861,145 @@ class TestPhaseReview:
backend_expert = team.get_expert("backend") backend_expert = team.get_expert("backend")
# agent.execute 被调用了 2 次1 次初始 + 1 次返工) # agent.execute 被调用了 2 次1 次初始 + 1 次返工)
assert backend_expert.agent.execute.call_count == 2 assert backend_expert.agent.execute.call_count == 2
# 第二次调用的 task_msg 应包含返工反馈 # 验证第二次执行的 task_msg 应包含返工反馈
second_call_args = backend_expert.agent.execute.call_args_list[1] second_call_args = backend_expert.agent.execute.call_args_list[1]
second_task_msg = second_call_args.args[0] second_task_msg = second_call_args.args[0]
assert "[返工要求]" in second_task_msg.input_data["task"] assert "[返工要求]" in second_task_msg.input_data["task"]
# ── U4: 专家风险标记测试 ──────────────────────────────────
class TestRiskFlagging:
"""U4: 专家风险标记 — _parse_risk_flags 解析 + risk_flagged 事件发出测试"""
def test_parse_risk_flags_single(self):
"""单个 [RISK: ...] 标记被正确解析"""
content = "实现完成。[RISK: API 可能存在性能问题] 请关注。"
risks = TeamOrchestrator._parse_risk_flags(content)
assert len(risks) == 1
assert risks[0] == "API 可能存在性能问题"
def test_parse_risk_flags_multiple(self):
"""多个 [RISK: ...] 标记都被解析"""
content = "[RISK: 数据库连接池可能不足] 实现完成。 [RISK: 缺少单元测试覆盖]"
risks = TeamOrchestrator._parse_risk_flags(content)
assert len(risks) == 2
assert risks[0] == "数据库连接池可能不足"
assert risks[1] == "缺少单元测试覆盖"
def test_parse_risk_flags_none(self):
"""无风险标记时返回空列表"""
content = "实现完成,没有风险。"
risks = TeamOrchestrator._parse_risk_flags(content)
assert risks == []
def test_parse_risk_flags_malformed(self):
"""格式不正确的标记被忽略"""
content = (
"RISK: 不是标记] " # 缺少左括号
"[RISK 也不是标记] " # 缺少冒号
"[RISK:正常风险] " # 这个是正常的
)
risks = TeamOrchestrator._parse_risk_flags(content)
# 只有 "正常风险" 被解析,其他格式不正确的被忽略
assert risks == ["正常风险"]
@pytest.mark.asyncio
async def test_risk_flagged_event_emitted(self):
"""专家输出包含 [RISK: ...] 时risk_flagged 事件被发出"""
gateway = _make_review_gateway([(True, "")])
team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway)
# 覆盖 backend 专家的输出,包含风险标记
backend_expert = team.get_expert("backend")
backend_expert.agent.execute = AsyncMock(
return_value=TaskResult(
task_id="test",
agent_name="backend",
status=TaskStatus.COMPLETED.value,
output_data={"content": "API 实现完成 [RISK: 接口响应时间可能超标]"},
error_message=None,
started_at=None,
completed_at=None,
)
)
orchestrator = TeamOrchestrator(team)
plan = TeamPlan(task="开发功能", lead_expert="lead")
phase = PlanPhase(
id="phase-1",
name="后端",
assigned_expert="backend",
task_description="实现API",
)
plan.phases = [phase]
await orchestrator._execute_execution_phase(phase, plan)
calls = team._handoff_transport.send.call_args_list
risk_events = [c[0][1] for c in calls if c[0][1].get("type") == "risk_flagged"]
assert len(risk_events) == 1
assert risk_events[0]["risk_description"] == "接口响应时间可能超标"
@pytest.mark.asyncio
async def test_risk_flagged_event_content(self):
"""risk_flagged 事件包含正确的 expert, risk_description, phase_id 字段"""
gateway = _make_review_gateway([(True, "")])
team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway)
backend_expert = team.get_expert("backend")
backend_expert.agent.execute = AsyncMock(
return_value=TaskResult(
task_id="test",
agent_name="backend",
status=TaskStatus.COMPLETED.value,
output_data={"content": "完成 [RISK: 安全漏洞风险]"},
error_message=None,
started_at=None,
completed_at=None,
)
)
orchestrator = TeamOrchestrator(team)
plan = TeamPlan(task="开发功能", lead_expert="lead")
phase = PlanPhase(
id="phase-risk-1",
name="安全审计",
assigned_expert="backend",
task_description="审计代码安全",
)
plan.phases = [phase]
await orchestrator._execute_execution_phase(phase, plan)
calls = team._handoff_transport.send.call_args_list
risk_events = [c[0][1] for c in calls if c[0][1].get("type") == "risk_flagged"]
assert len(risk_events) == 1
event = risk_events[0]
assert event["expert"] == "backend"
assert event["expert_name"] == "backend"
assert event["risk_description"] == "安全漏洞风险"
assert event["phase_id"] == "phase-risk-1"
assert event["phase_name"] == "安全审计"
@pytest.mark.asyncio
async def test_no_risk_flagged_when_clean(self):
"""专家输出不包含风险标记时,无 risk_flagged 事件"""
gateway = _make_review_gateway([(True, "")])
team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway)
# backend 专家输出无风险标记(使用默认输出 "Result from backend"
orchestrator = TeamOrchestrator(team)
plan = TeamPlan(task="开发功能", lead_expert="lead")
phase = PlanPhase(
id="phase-1",
name="后端",
assigned_expert="backend",
task_description="实现API",
)
plan.phases = [phase]
await orchestrator._execute_execution_phase(phase, plan)
calls = team._handoff_transport.send.call_args_list
risk_events = [c[0][1] for c in calls if c[0][1].get("type") == "risk_flagged"]
assert len(risk_events) == 0