From 5487cca19939dd6286cfb45683e8524ce4a6386b Mon Sep 17 00:00:00 2001 From: chiguyong Date: Wed, 24 Jun 2026 14:17:58 +0800 Subject: [PATCH] =?UTF-8?q?feat(experts):=20U4=20=E4=B8=93=E5=AE=B6?= =?UTF-8?q?=E9=A3=8E=E9=99=A9=E6=A0=87=E8=AE=B0=20+=20risk=5Fflagged=20?= =?UTF-8?q?=E4=BA=8B=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - orchestrator 新增 _parse_risk_flags 静态方法,正则解析 [RISK: ...] 标记 - _execute_execution_phase 在协作通知后、验收前解析风险标记 - 风险标记通过 risk_flagged 事件广播,供前端/CLI 渲染 - 无风险标记时行为不变,向后兼容 - 新增 TestRiskFlagging 7 个测试(单/多/无/格式错误/事件发出/内容/兼容) --- src/agentkit/experts/orchestrator.py | 39 ++++++ tests/unit/experts/test_pm_collaboration.py | 140 +++++++++++++++++++- 2 files changed, 178 insertions(+), 1 deletion(-) diff --git a/src/agentkit/experts/orchestrator.py b/src/agentkit/experts/orchestrator.py index 1e4f2f8..8604857 100644 --- a/src/agentkit/experts/orchestrator.py +++ b/src/agentkit/experts/orchestrator.py @@ -595,6 +595,24 @@ class TeamOrchestrator: if phase.collaboration_contracts: await self._notify_collaborators(phase, plan) + # U4: 解析专家输出中的风险标记,发出 risk_flagged 事件 + # ponytail: 风险标记通过验收环节间接处理 Lead 决策。 + # 验收 prompt 包含输出内容,Lead 可在验收反馈中要求返工。 + # 未来如需更复杂的风险决策(如自动插入辩论),可在此扩展。 + content = result.get("content", str(result)) + risk_flags = self._parse_risk_flags(content) + for risk_desc in risk_flags: + await self._broadcast_event( + "risk_flagged", + { + "expert": phase.assigned_expert, + "expert_name": phase.assigned_expert, + "risk_description": risk_desc, + "phase_id": phase.id, + "phase_name": phase.name, + }, + ) + # U3: Lead 验收阶段输出 passed, feedback = await self._review_phase_output(lead, phase, result) @@ -767,6 +785,27 @@ class TeamOrchestrator: # 降级:验收通过 return True, "" + @staticmethod + def _parse_risk_flags(content: str) -> list[str]: + """从专家输出中解析风险标记。 + + 风险标记格式:[RISK: <风险描述>] + 可在一行中出现多个,也可跨多行。 + + Returns: + 风险描述列表(空列表表示无风险标记) + """ + # 匹配 [RISK: ...] 格式,允许跨行 + pattern = re.compile(r"\[RISK:\s*(.+?)\]", re.DOTALL) + matches = pattern.findall(content) + # 清理每个匹配项:去除多余空白,截断过长的描述 + risks: list[str] = [] + for match in matches: + risk = match.strip().replace("\n", " ") + if risk and len(risk) <= 500: # 限制风险描述长度 + risks.append(risk) + return risks + async def _execute_debate_phase(self, phase: PlanPhase, plan: TeamPlan) -> dict[str, Any]: """Execute a DEBATE phase: Lead-facilitated structured debate. diff --git a/tests/unit/experts/test_pm_collaboration.py b/tests/unit/experts/test_pm_collaboration.py index fa675b0..8c29e91 100644 --- a/tests/unit/experts/test_pm_collaboration.py +++ b/tests/unit/experts/test_pm_collaboration.py @@ -861,7 +861,145 @@ class TestPhaseReview: backend_expert = team.get_expert("backend") # agent.execute 被调用了 2 次(1 次初始 + 1 次返工) assert backend_expert.agent.execute.call_count == 2 - # 第二次调用的 task_msg 应包含返工反馈 + # 验证第二次执行的 task_msg 应包含返工反馈 second_call_args = backend_expert.agent.execute.call_args_list[1] second_task_msg = second_call_args.args[0] assert "[返工要求]" in second_task_msg.input_data["task"] + + +# ── U4: 专家风险标记测试 ────────────────────────────────── + + +class TestRiskFlagging: + """U4: 专家风险标记 — _parse_risk_flags 解析 + risk_flagged 事件发出测试""" + + def test_parse_risk_flags_single(self): + """单个 [RISK: ...] 标记被正确解析""" + content = "实现完成。[RISK: API 可能存在性能问题] 请关注。" + risks = TeamOrchestrator._parse_risk_flags(content) + assert len(risks) == 1 + assert risks[0] == "API 可能存在性能问题" + + def test_parse_risk_flags_multiple(self): + """多个 [RISK: ...] 标记都被解析""" + content = "[RISK: 数据库连接池可能不足] 实现完成。 [RISK: 缺少单元测试覆盖]" + risks = TeamOrchestrator._parse_risk_flags(content) + assert len(risks) == 2 + assert risks[0] == "数据库连接池可能不足" + assert risks[1] == "缺少单元测试覆盖" + + def test_parse_risk_flags_none(self): + """无风险标记时返回空列表""" + content = "实现完成,没有风险。" + risks = TeamOrchestrator._parse_risk_flags(content) + assert risks == [] + + def test_parse_risk_flags_malformed(self): + """格式不正确的标记被忽略""" + content = ( + "RISK: 不是标记] " # 缺少左括号 + "[RISK 也不是标记] " # 缺少冒号 + "[RISK:正常风险] " # 这个是正常的 + ) + risks = TeamOrchestrator._parse_risk_flags(content) + # 只有 "正常风险" 被解析,其他格式不正确的被忽略 + assert risks == ["正常风险"] + + @pytest.mark.asyncio + async def test_risk_flagged_event_emitted(self): + """专家输出包含 [RISK: ...] 时,risk_flagged 事件被发出""" + gateway = _make_review_gateway([(True, "")]) + team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway) + # 覆盖 backend 专家的输出,包含风险标记 + backend_expert = team.get_expert("backend") + backend_expert.agent.execute = AsyncMock( + return_value=TaskResult( + task_id="test", + agent_name="backend", + status=TaskStatus.COMPLETED.value, + output_data={"content": "API 实现完成 [RISK: 接口响应时间可能超标]"}, + error_message=None, + started_at=None, + completed_at=None, + ) + ) + orchestrator = TeamOrchestrator(team) + + plan = TeamPlan(task="开发功能", lead_expert="lead") + phase = PlanPhase( + id="phase-1", + name="后端", + assigned_expert="backend", + task_description="实现API", + ) + plan.phases = [phase] + + await orchestrator._execute_execution_phase(phase, plan) + + calls = team._handoff_transport.send.call_args_list + risk_events = [c[0][1] for c in calls if c[0][1].get("type") == "risk_flagged"] + assert len(risk_events) == 1 + assert risk_events[0]["risk_description"] == "接口响应时间可能超标" + + @pytest.mark.asyncio + async def test_risk_flagged_event_content(self): + """risk_flagged 事件包含正确的 expert, risk_description, phase_id 字段""" + gateway = _make_review_gateway([(True, "")]) + team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway) + backend_expert = team.get_expert("backend") + backend_expert.agent.execute = AsyncMock( + return_value=TaskResult( + task_id="test", + agent_name="backend", + status=TaskStatus.COMPLETED.value, + output_data={"content": "完成 [RISK: 安全漏洞风险]"}, + error_message=None, + started_at=None, + completed_at=None, + ) + ) + orchestrator = TeamOrchestrator(team) + + plan = TeamPlan(task="开发功能", lead_expert="lead") + phase = PlanPhase( + id="phase-risk-1", + name="安全审计", + assigned_expert="backend", + task_description="审计代码安全", + ) + plan.phases = [phase] + + await orchestrator._execute_execution_phase(phase, plan) + + calls = team._handoff_transport.send.call_args_list + risk_events = [c[0][1] for c in calls if c[0][1].get("type") == "risk_flagged"] + assert len(risk_events) == 1 + event = risk_events[0] + assert event["expert"] == "backend" + assert event["expert_name"] == "backend" + assert event["risk_description"] == "安全漏洞风险" + assert event["phase_id"] == "phase-risk-1" + assert event["phase_name"] == "安全审计" + + @pytest.mark.asyncio + async def test_no_risk_flagged_when_clean(self): + """专家输出不包含风险标记时,无 risk_flagged 事件""" + gateway = _make_review_gateway([(True, "")]) + team = _make_team_with_experts(expert_names=["lead", "backend"], gateway=gateway) + # backend 专家输出无风险标记(使用默认输出 "Result from backend") + orchestrator = TeamOrchestrator(team) + + plan = TeamPlan(task="开发功能", lead_expert="lead") + phase = PlanPhase( + id="phase-1", + name="后端", + assigned_expert="backend", + task_description="实现API", + ) + plan.phases = [phase] + + await orchestrator._execute_execution_phase(phase, plan) + + calls = team._handoff_transport.send.call_args_list + risk_events = [c[0][1] for c in calls if c[0][1].get("type") == "risk_flagged"] + assert len(risk_events) == 0