From 47a437c5e35653ebe2e672a67c356443123c3ee3 Mon Sep 17 00:00:00 2001 From: chiguyong Date: Wed, 1 Jul 2026 13:26:19 +0800 Subject: [PATCH] fix(experts): resolve residual review findings from PR #13 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses 4 actionable findings (1 P1 + 3 P2) from ce-code-review of feat/ui-ue-enhancement (PR #13), now merged to main (8066e0b). P1 — expert_step payload alignment (_phase_executor.py) The thinking/tool_call/tool_result event payloads were missing the fields the frontend WsServerMessage contract requires (expert_name/expert_color/content/step). Frontend code consuming these events silently degraded. Now all expert_step broadcasts carry the full contract; tool_call/tool_result keep step_data for the raw payload. P2 #1 — execute_stream CancellationToken registration (config_driven.py) execute_stream() bypassed BaseAgent.execute() and never registered a CancellationToken, so cancel_task() could not cooperatively cancel a streaming task. Now registers the token and cleans it up in finally. P2 #2 — team_synthesis orphan milestone cleanup (orchestrator.py) If synthesis streaming was interrupted (cancel/exception), no terminal team_synthesis event was emitted, leaving the frontend streaming milestone spinning forever. Now an inner try/except emits a terminal team_synthesis with status=cancelled|error before re-raising, so the frontend can finalize the milestone. The success path also carries the synthesis_id. P2 #3 — synthesis_id dedup (orchestrator.py + types.ts + chatStream.ts) Without an identifier, the frontend could not precisely match a team_synthesis terminal event to its streaming milestone (especially across retries/concurrent teams). The backend now injects a stable synthesis_id (`{plan.id}:synthesis`) into both team_synthesis_chunk and team_synthesis events; the frontend uses it for exact milestone matching and treats error/cancelled status as terminal. Test updates - Updated test_thinking_events_forwarded_as_expert_step to assert the new payload contract (expert_id/name/color/content/step). - Added test_tool_call_events_forwarded_as_expert_step covering tool_call/tool_result payload shape (content=tool_name摘要 + step_data=原始 payload). Verification - ruff check: clean - pytest tests/unit/experts/test_phase_executor_streaming.py: 14/14 - npm run typecheck: clean - vitest: 126/127 (1 unrelated baseline failure in tauri-auth.test.ts) Residuals doc: docs/residual-review-findings/feat-ui-ue-enhancement.md --- src/agentkit/core/config_driven.py | 17 ++++-- src/agentkit/experts/_phase_executor.py | 19 +++++- src/agentkit/experts/orchestrator.py | 37 ++++++++++-- src/agentkit/server/frontend/src/api/types.ts | 6 +- .../server/frontend/src/stores/chatStream.ts | 29 ++++++++-- .../experts/test_phase_executor_streaming.py | 58 ++++++++++++++++++- 6 files changed, 147 insertions(+), 19 deletions(-) diff --git a/src/agentkit/core/config_driven.py b/src/agentkit/core/config_driven.py index 5db680f..9bada3b 100644 --- a/src/agentkit/core/config_driven.py +++ b/src/agentkit/core/config_driven.py @@ -17,7 +17,7 @@ import yaml from agentkit.core.base import BaseAgent from agentkit.core.exceptions import ConfigValidationError -from agentkit.core.protocol import AgentCapability, TaskMessage +from agentkit.core.protocol import AgentCapability, CancellationToken, TaskMessage from agentkit.core.react import ReActEvent from agentkit.evolution.lifecycle import EvolutionMixin from agentkit.evolution.reflector import Reflector @@ -688,10 +688,19 @@ class ConfigDrivenAgent(BaseAgent, EvolutionMixin): 镜像 execute() → handle_task() 分派,但不包装 TaskResult — 直接 yield 事件,由调用方负责转发和累积。 + + P2 fix: 注册 CancellationToken 到 _active_tokens,使 cancel_task() 能 + 协作式取消流式任务。原实现绕过 BaseAgent.execute(),未注册 token。 """ - await self._register_mcp_tools() - async for event in self.handle_task_stream(task): - yield event + token = CancellationToken() + self._active_tokens[task.task_id] = token + try: + await self._register_mcp_tools() + async for event in self.handle_task_stream(task): + yield event + finally: + # async generator 的 finally 在 generator 关闭时执行(GC/aclose/正常结束) + self._active_tokens.pop(task.task_id, None) async def handle_task_stream(self, task: TaskMessage) -> AsyncGenerator[ReActEvent, None]: """根据 execution_mode / task_mode 流式分派,镜像 handle_task()。""" diff --git a/src/agentkit/experts/_phase_executor.py b/src/agentkit/experts/_phase_executor.py index 5e64d53..47698d9 100644 --- a/src/agentkit/experts/_phase_executor.py +++ b/src/agentkit/experts/_phase_executor.py @@ -239,9 +239,14 @@ class PhaseExecutorMixin: "expert_id": expert.config.name, "content": chunk, }) elif etype == "thinking": + # P1 fix: payload 对齐前端 WsServerMessage 契约 + # (expert_id/expert_name/expert_color/content/step) await self._broadcast_event("expert_step", { "expert_id": expert.config.name, - "thinking": event.data.get("content", ""), + "expert_name": expert.config.name, + "expert_color": expert.config.color, + "content": event.data.get("content", ""), + "step": "thinking", }) elif etype == "final_answer": # P0 fix: ReActEngine 先发 token(增量)再发 final_answer(全文)。 @@ -251,8 +256,18 @@ class PhaseExecutorMixin: if output and not accumulated: accumulated.append(str(output)) elif etype in ("tool_call", "tool_result"): + # P1 fix: payload 对齐前端契约 — content 携带可读摘要,step_data 保留原始数据 await self._broadcast_event("expert_step", { - "expert_id": expert.config.name, "step_data": event.data, + "expert_id": expert.config.name, + "expert_name": expert.config.name, + "expert_color": expert.config.color, + "content": ( + event.data.get("tool_name") + or event.data.get("name") + or etype + ), + "step": etype, + "step_data": event.data, }) elif etype == "error": raise RuntimeError( diff --git a/src/agentkit/experts/orchestrator.py b/src/agentkit/experts/orchestrator.py index 77b605a..f5b973a 100644 --- a/src/agentkit/experts/orchestrator.py +++ b/src/agentkit/experts/orchestrator.py @@ -287,22 +287,51 @@ class TeamOrchestrator( plan.status = PlanStatus.COMPLETED # U3: 流式综合 — 每个 chunk 广播 team_synthesis_chunk + # P2 fix: 携带 synthesis_id 让前端去重 streaming milestone(避免附身到上一次孤儿) + synthesis_id = f"{plan.id}:synthesis" async def _broadcast_synthesis_chunk(data: dict[str, object]) -> None: + # data 可能是 {"chunk": "..."} 或 {"value": "..."}(synthesizer 决定) + # 统一注入 synthesis_id,不破坏原 data 结构 + if isinstance(data, dict): + data = {**data, "synthesis_id": synthesis_id} await self._broadcast_event("team_synthesis_chunk", data) - final_result = await self._synthesize_results( - lead, task, completed, broadcast_callback=_broadcast_synthesis_chunk - ) + # P2 fix: 流式综合期间若异常/取消,必须广播 team_synthesis 终结事件, + # 否则前端 streaming milestone 永久转圈(孤儿占位无终结信号)。 + # 内层 try/except 只清理孤儿 milestone 后 re-raise,让外层 except 处理 fallback。 + try: + final_result = await self._synthesize_results( + lead, task, completed, broadcast_callback=_broadcast_synthesis_chunk + ) + except asyncio.CancelledError: + await self._broadcast_event( + "team_synthesis", + {"content": "", "phases_completed": len(completed), + "phases_total": len(plan.phases), "status": "cancelled", + "synthesis_id": synthesis_id}, + ) + raise + except Exception as synth_err: + logger.error(f"Synthesis streaming failed: {synth_err}") + await self._broadcast_event( + "team_synthesis", + {"content": "", "phases_completed": len(completed), + "phases_total": len(plan.phases), "status": "error", + "error": str(synth_err), "synthesis_id": synthesis_id}, + ) + raise # 让外层 except 决定是否 fallback self._team.set_status(TeamStatus.COMPLETED) - # 7. Emit team_synthesis event + # 7. Emit team_synthesis event (success path only — + # 失败路径已在上面广播 error 终结事件) await self._broadcast_event( "team_synthesis", { "content": final_result.get("content", ""), "phases_completed": len(completed), "phases_total": len(plan.phases), + "synthesis_id": synthesis_id, }, ) diff --git a/src/agentkit/server/frontend/src/api/types.ts b/src/agentkit/server/frontend/src/api/types.ts index 8975dd5..6fbc649 100644 --- a/src/agentkit/server/frontend/src/api/types.ts +++ b/src/agentkit/server/frontend/src/api/types.ts @@ -85,6 +85,8 @@ export interface IChatMessage { review_result?: IReviewResult /** U5: PM collaboration — risk flag for RiskFlagCard */ risk_flag?: IRiskFlag + /** U4: synthesis identifier for streaming milestone dedup (team_synthesis_chunk/team_synthesis) */ + synthesis_id?: string } /** Conversation with messages */ @@ -151,8 +153,8 @@ export type WsServerMessage = // PLAN_EXEC (U4) — phase lifecycle events emitted by ReActEngine. | { type: 'phase_changed'; data: { phase: string; previous: string } } | { type: 'phase_violation'; data: { current_phase: string; tool: string; message: string; violation_kind: string; command_preview?: string } } - | { type: 'team_synthesis_chunk'; data: { chunk: string } } - | { type: 'team_synthesis'; data: { content: string; phases_completed?: number; phases_total?: number } } + | { type: 'team_synthesis_chunk'; data: { chunk: string; synthesis_id?: string } } + | { type: 'team_synthesis'; data: { content: string; phases_completed?: number; phases_total?: number; synthesis_id?: string; status?: 'completed' | 'error' | 'cancelled'; error?: string } } | { type: 'team_dissolved'; data: { team_id: string } } // Board Meeting 模式事件 | { type: 'board_started'; data: IBoardStartedData } diff --git a/src/agentkit/server/frontend/src/stores/chatStream.ts b/src/agentkit/server/frontend/src/stores/chatStream.ts index c49e665..b8a0180 100644 --- a/src/agentkit/server/frontend/src/stores/chatStream.ts +++ b/src/agentkit/server/frontend/src/stores/chatStream.ts @@ -868,19 +868,26 @@ export function dispatchWsEvent( case "team_synthesis_chunk": { // U4: 团队综合流式 chunk — 首次创建 streaming 占位,后续累加。 + // P2 fix: 用 synthesis_id 去重,避免 chunk 附身到上一次孤儿 streaming milestone。 const conversationId = state.resolveIncomingConvId(); if (!conversationId) break; const conv = state.conversations.value.find( (c) => c.id === conversationId, ); if (!conv) break; + const sid = event.data.synthesis_id; const existing = findLastMessage( conv.messages, - (m) => m.message_type === "milestone" && m.status === "streaming", + (m) => + m.message_type === "milestone" && + m.status === "streaming" && + (sid ? m.synthesis_id === sid || m.synthesis_id === undefined : true), ); if (existing) { state.updateMessage(conversationId, existing.id, { content: (existing.content || "") + (event.data.chunk || ""), + // 首次附身到无 synthesis_id 的 milestone 时,标记上 synthesis_id + ...(sid && !existing.synthesis_id ? { synthesis_id: sid } : {}), }); } else { const synthMsg: IChatMessage = { @@ -890,6 +897,7 @@ export function dispatchWsEvent( timestamp: new Date().toISOString(), status: "streaming", message_type: "milestone", + ...(sid ? { synthesis_id: sid } : {}), }; state.appendMessage(conversationId, synthMsg); } @@ -899,20 +907,31 @@ export function dispatchWsEvent( case "team_synthesis": { // U4: 最终综合结果 — 覆盖累积 content,标记 completed。 // 若存在 streaming 占位则 update;否则 append(向后兼容)。 + // P2 fix: 用 synthesis_id 精确匹配对应 streaming milestone(跨重试场景)。 + // P2 fix: 处理 status=error/cancelled 终结事件(后端 synthesis 失败时清理孤儿)。 const conversationId = state.resolveIncomingConvId(); if (!conversationId) break; const conv = state.conversations.value.find( (c) => c.id === conversationId, ); if (!conv) break; + const sid = event.data.synthesis_id; + const finalStatus: 'completed' | 'error' = + event.data.status === 'error' || event.data.status === 'cancelled' + ? 'error' + : 'completed'; const existing = findLastMessage( conv.messages, - (m) => m.message_type === "milestone" && m.status === "streaming", + (m) => + m.message_type === "milestone" && + m.status === "streaming" && + (sid ? m.synthesis_id === sid || m.synthesis_id === undefined : true), ); if (existing) { state.updateMessage(conversationId, existing.id, { content: event.data.content || "", - status: "completed", + status: finalStatus, + ...(event.data.error ? { error_detail: event.data.error } : {}), }); } else { const synthesisMsg: IChatMessage = { @@ -920,8 +939,10 @@ export function dispatchWsEvent( role: "assistant", content: event.data.content || "", timestamp: new Date().toISOString(), - status: "completed", + status: finalStatus, message_type: "milestone", + ...(sid ? { synthesis_id: sid } : {}), + ...(event.data.error ? { error_detail: event.data.error } : {}), }; state.appendMessage(conversationId, synthesisMsg); } diff --git a/tests/unit/experts/test_phase_executor_streaming.py b/tests/unit/experts/test_phase_executor_streaming.py index 072cedb..248d889 100644 --- a/tests/unit/experts/test_phase_executor_streaming.py +++ b/tests/unit/experts/test_phase_executor_streaming.py @@ -193,7 +193,11 @@ class TestStreamEventForwarding: @pytest.mark.asyncio async def test_thinking_events_forwarded_as_expert_step(self): - """execute_stream 产出 thinking 事件时,_broadcast_event 转发 expert_step。""" + """execute_stream 产出 thinking 事件时,_broadcast_event 转发 expert_step。 + + P1 fix: payload 对齐前端 WsServerMessage 契约 — + 携带 expert_id/expert_name/expert_color/content/step 字段。 + """ events = [ ReActEvent(event_type="thinking", step=0, data={"content": "思考中..."}), ReActEvent(event_type="token", step=0, data={"content": "结果"}), @@ -210,10 +214,58 @@ class TestStreamEventForwarding: step_calls = [ c for c in orch._broadcast_event.call_args_list - if c.args[0] == "expert_step" and "thinking" in c.args[1] + if c.args[0] == "expert_step" and c.args[1].get("step") == "thinking" ] assert len(step_calls) == 1 - assert step_calls[0].args[1]["thinking"] == "思考中..." + payload = step_calls[0].args[1] + # P1 fix: payload 必须包含前端契约字段 + assert payload["expert_id"] == expert.config.name + assert payload["expert_name"] == expert.config.name + assert payload["expert_color"] == expert.config.color + assert payload["content"] == "思考中..." + assert payload["step"] == "thinking" + + @pytest.mark.asyncio + async def test_tool_call_events_forwarded_as_expert_step(self): + """execute_stream 产出 tool_call/tool_result 事件时,_broadcast_event 转发 expert_step。 + + P1 fix: payload 对齐前端契约 — content 携带 tool_name 摘要,step_data 保留原始数据。 + """ + events = [ + ReActEvent(event_type="tool_call", step=0, + data={"tool_name": "search", "args": {"q": "test"}}), + ReActEvent(event_type="tool_result", step=0, + data={"tool_name": "search", "result": "hit"}), + ] + agent = _make_stream_agent(events) + expert = _make_mock_expert() + lead = _make_mock_expert(name="lead") + phase = _make_phase() + plan = _make_simple_plan() + orch = _make_orchestrator_for_streaming() + + await orch._run_agent_steps(expert, agent, lead, phase, plan) + + # 仅断言 tool_call/tool_result 事件(phase intro step 单独由其他测试覆盖) + step_calls = [ + c + for c in orch._broadcast_event.call_args_list + if c.args[0] == "expert_step" + and c.args[1].get("step") in ("tool_call", "tool_result") + ] + assert len(step_calls) == 2 + # tool_call event + p1 = step_calls[0].args[1] + assert p1["step"] == "tool_call" + assert p1["content"] == "search" + assert p1["step_data"]["args"] == {"q": "test"} + assert p1["expert_name"] == expert.config.name + assert p1["expert_color"] == expert.config.color + # tool_result event + p2 = step_calls[1].args[1] + assert p2["step"] == "tool_result" + assert p2["content"] == "search" + assert p2["step_data"]["result"] == "hit" # ── expert_result 终结事件测试 ──────────────────────────── -- 2.43.0