diff --git a/AGENTS.md b/AGENTS.md index 4953b0f..ee05ac7 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -173,7 +173,15 @@ HandoffTransport:InProcess(asyncio.Queue)+ Redis Pub/Sub — 仅用于事 Client -> Server:`message`、`reply`、`confirmation_reply`、`cancel`、`ping` Server -> Client:`connected`、`token`、`thinking`、`step`、`final_answer`、`skill_match`、`confirmation_request`、`confirmation_result`、`ask_human`、`error`、`pong` -专家团队事件:`team_formed`、`expert_step`、`expert_result`、`plan_update`、`phase_started`、`phase_completed`、`phase_failed`、`team_synthesis`、`team_dissolved` +专家团队事件: +- 生命周期:`team_formed`、`plan_update`、`phase_started`、`phase_completed`、`phase_failed`、`team_dissolved` +- 专家执行(流式):`expert_step`(thinking/tool_call/tool_result 步骤,payload 携带 `expert_id`/`expert_name`/`expert_color`/`content`/`step`)、`expert_result_chunk`(token 增量)、`expert_result_chunk_reset`(重试前清空)、`expert_result`(终结,status=completed|error) +- 综合(流式):`team_synthesis_chunk`(增量 chunk,携带 `synthesis_id`)、`team_synthesis`(终结,携带 `synthesis_id` + status=completed|error|cancelled) +- PLAN_EXEC (U4):`phase_changed`、`phase_violation` + +**`synthesis_id` 去重契约**:后端在 `team_synthesis_chunk` 和 `team_synthesis` 事件中注入稳定标识 `{plan_id}:synthesis`,前端用它精确匹配 streaming milestone,避免重试/并发时孤儿 milestone。 + +**`execute_stream()` 取消契约**:`ConfigDrivenAgent.execute_stream()` 注册 `CancellationToken` 到 `_active_tokens`(与 `BaseAgent.execute()` 一致),使 `cancel_task()` 能协作式取消流式任务。 ### 前端页面 diff --git a/CONCEPTS.md b/CONCEPTS.md index 6406058..a33347c 100644 --- a/CONCEPTS.md +++ b/CONCEPTS.md @@ -42,6 +42,11 @@ The agent-level fallback sequence when the primary agent fails: main agent → R ### ReAct Streaming Contract The protocol `ReActEngine.execute_stream()` yields to consumers: first zero or more `token` events whose `data.content` are incremental content fragments, then exactly one `final_answer` event whose `data.output` is the concatenation of all token fragments (the complete text). The two events carry the same content — token is the增量 view, final_answer is the聚合 view. Consumers must pick one accumulation strategy (append tokens, or wait for final_answer) and cannot mix both without producing doubled output. When `execute_stream()` is wrapped from a sync `execute()` via `_wrap_sync_as_stream`, no token events are emitted and final_answer's output is the sole content carrier. +### Streaming Milestone +A chat message that progresses through `streaming → completed | error` states as WebSocket events arrive, used to surface long-running expert team operations (expert results, team synthesis) to the user as live progress indicators rather than blocking until completion. + +A streaming milestone is opened by a chunk event (`expert_result_chunk` / `team_synthesis_chunk`) and must be finalized by a terminal event of the same type family (`expert_result` / `team_synthesis`) carrying a `status` field. If the terminal event never arrives — e.g., the stream is interrupted by cancellation or exception without a cleanup broadcast — the milestone becomes an orphan, spinning forever. Stable identifiers (`synthesis_id`) injected into both chunk and terminal events let the frontend precisely match a terminal event to its open milestone across retries and concurrent teams; positional matching (last streaming milestone) is unreliable in those scenarios and serves only as a backward-compatibility fallback. + ## Channels & Caching ### Per-User Cache Namespace diff --git a/docs/solutions/integration-issues/streaming-event-contract-residuals.md b/docs/solutions/integration-issues/streaming-event-contract-residuals.md new file mode 100644 index 0000000..5c07292 --- /dev/null +++ b/docs/solutions/integration-issues/streaming-event-contract-residuals.md @@ -0,0 +1,224 @@ +--- +module: experts +date: 2026-07-01 +problem_type: integration_issue +component: service_object +severity: high +category: integration-issues +symptoms: + - "expert_step WebSocket events reached frontend with missing fields — frontend WsServerMessage contract silently degraded (no expert_id/expert_name/expert_color/content/step)" + - "cancel_task() could not cooperatively cancel a streaming task — execute_stream continued emitting tokens after user cancellation" + - "If synthesis streaming raised CancelledError or any Exception, no terminal team_synthesis event was emitted; frontend streaming milestone spun forever" + - "Frontend could not precisely match a team_synthesis terminal event to its open streaming milestone across retries / concurrent teams (no stable synthesis_id)" +root_cause: logic_error +resolution_type: code_fix +tags: + - streaming + - websocket + - expert-team + - event-contract + - cancellation + - cancellation-token + - phase-executor + - synthesis-id +related_components: + - frontend_websocket_layer +--- + +# 专家团队流式事件合约缺口(expert_step payload / CancellationToken / synthesis 终止 / synthesis_id 去重) + +## Problem + +PR #13 (`feat/ui-ue-enhancement`) 引入专家团队流式输出后,ce-code-review 暴露了 17 个 findings(2 P0、1 P1、8 P2、6 P3)。PR #13 已修复 3 个 P0/P2 流式事件 bug(见 `docs/solutions/runtime-errors/streaming-event-whitelist-and-accumulation.md`),但留下 4 个 residuals(1 P1 + 3 P2)记录在 `docs/residual-review-findings/feat-ui-ue-enhancement.md`。PR #14(commit `47a437c`,merge `8e8843c`)专门收尾这 4 个 residuals — 它们共同指向同一根因簇:**专家团队流式事件合约在后端生产者与前端消费者之间没有可验证的契约对齐**。 + +具体而言:后端 `_phase_executor` 转发 `expert_step` 事件时只传部分字段,与前端 `WsServerMessage` 类型契约不匹配;`execute_stream` 绕过 `BaseAgent.execute()` 注册路径导致 `CancellationToken` 机制失效;综合流式异常路径未广播终止事件导致前端里程碑永久 spinning;前端按内容匹配 milestone 在并发 / 重试场景下不可靠。 + +## Symptoms + +- **Bug 1 (P1)**:前端消费 `expert_step` 事件时静默降级 — `thinking` / `tool_call` / `tool_result` 子类型的 payload 缺失 `expert_id` / `expert_name` / `expert_color` / `content` / `step` 字段,前端 `WsServerMessage` 联合类型的窄化分支匹配不到,UI 显示空卡片或回退到默认分支。 +- **Bug 2 (P2 #1)**:用户点击取消后,`cancel_task()` 找不到 task_id 对应的 `CancellationToken`(因为 `execute_stream` 从未注册),流式 token 继续到达前端,UI 出现"已取消但仍写入"的状态错乱。 +- **Bug 3 (P2 #2)**:综合阶段抛出 `asyncio.CancelledError` 或任意 `Exception` 时,外层 handler 只广播了通用 `error` 事件,前端等待 `team_synthesis` 终止事件以关闭 milestone — 永远收不到,milestone 进入永久 streaming 状态(loading 光标无限闪烁)。 +- **Bug 4 (P2 #3)**:前端用 `findLastMessage` 查找 `message_type === "milestone" && status === "streaming"` 的消息来匹配终止事件 — 在重试或并发团队场景下,可能匹配到错误的 milestone,导致错误的卡片被 finalize(content 错位 / 状态错乱)。 + +## What Didn't Work + +- **PR #13 已修复 3 个相关 bug 但未触及这 4 处**:白名单扩充让新事件类型到达前端,双重累积修复让 token 不再重复,异常扩展到 `except Exception` 让 LLM 异常不再穿透 — 但都没有验证 `expert_step` payload 是否满足前端类型契约。白名单只解决了"事件丢失",没有解决"事件到达但字段不全"。 +- **假设 `execute_stream` 走 `BaseAgent.execute()` 的注册路径**:实际上 `execute_stream` 是独立 async generator,直接 `async for event in self.handle_task_stream(task): yield event`,从未调用 `self._active_tokens[task.task_id] = token`。`cancel_task()` 在 dict 中查找 task_id 永远 miss,cancel 信号无通道。同步 `execute()` 路径正常工作掩盖了流式路径的 gap。 +- **依赖外层 `except Exception` 兜底综合异常**:外层确实捕获并广播 `error` 事件,但前端 streaming milestone 监听的是 `team_synthesis` 事件类型(带 `status` 字段),不是通用 `error`。两者事件类型不同,前端 milestone handler 不会触发。需要内层专门广播 `team_synthesis` 终止事件。 +- **用消息位置 / 内容匹配 milestone**:原 `findLastMessage` 仅按 `status === "streaming"` 查找最后一条 milestone — 在单团队单次综合场景下工作,但并发团队或综合重试时多个 milestone 同时 streaming,匹配到第一个或最后一个都不可靠。需要稳定标识符。 +- **ce-code-review 在 PR #13 时已发现部分信号**:但 P1 的 expert_step payload 缺口当时被归类为"建议"而非 blocker,导致 PR #13 合入时未阻塞;后续 review 才升级为 P1 residual。 + +## Solution + +### Fix 1 (P1): `expert_step` payload 与前端契约对齐 + +`src/agentkit/experts/_phase_executor.py` 在 `_execute_phase_stream` 转发 thinking / tool_call / tool_result 时补全前端契约所需字段。 + +```python +# thinking 事件(约 line 244) +elif etype == "thinking": + await self._broadcast_event("expert_step", { + "expert_id": expert.config.name, + "expert_name": expert.config.name, + "expert_color": expert.config.color, + "content": event.data.get("content", ""), + "step": "thinking", + }) + +# tool_call / tool_result 事件(约 line 260) +elif etype in ("tool_call", "tool_result"): + await self._broadcast_event("expert_step", { + "expert_id": expert.config.name, + "expert_name": expert.config.name, + "expert_color": expert.config.color, + "content": ( + event.data.get("tool_name") + or event.data.get("name") + or etype + ), + "step": etype, + "step_data": event.data, + }) +``` + +测试更新:`tests/unit/experts/test_phase_executor_streaming.py` 中 `test_thinking_events_forwarded_as_expert_step` 断言补全,新增 `test_tool_call_events_forwarded_as_expert_step` 覆盖 tool_call / tool_result 分支。 + +### Fix 2 (P2 #1): `execute_stream` 注册 `CancellationToken` + +`src/agentkit/core/config_driven.py` 在流式入口注册 token,在 `finally` 清理,与 `BaseAgent.execute()` 保持对称。新增 import:`from agentkit.core.protocol import AgentCapability, CancellationToken, TaskMessage`。 + +```python +async def execute_stream(self, task: TaskMessage) -> AsyncGenerator[ReActEvent, None]: + token = CancellationToken() + self._active_tokens[task.task_id] = token + try: + await self._register_mcp_tools() + async for event in self.handle_task_stream(task): + yield event + finally: + self._active_tokens.pop(task.task_id, None) +``` + +`cancel_task()` 现在能在 `_active_tokens` 中找到 task_id 对应的 token 并触发取消信号,`handle_task_stream` 内部的协作式取消点(`await asyncio.sleep` / 工具调用边界)会响应。 + +### Fix 3 (P2 #2): `team_synthesis` 终止事件广播 + +`src/agentkit/experts/orchestrator.py` 在综合调用周围添加内层 try/except,捕获 `CancelledError` 与 `Exception`,分别广播 `status: cancelled` / `status: error` 的 `team_synthesis` 终止事件(携带 `synthesis_id`),然后 re-raise 保留异常传播。 + +```python +try: + final_result = await self._synthesize_results( + lead, task, completed, broadcast_callback=_broadcast_synthesis_chunk + ) +except asyncio.CancelledError: + await self._broadcast_event( + "team_synthesis", + {"content": "", "phases_completed": len(completed), + "phases_total": len(plan.phases), "status": "cancelled", + "synthesis_id": synthesis_id}, + ) + raise +except Exception as synth_err: + logger.error(f"Synthesis streaming failed: {synth_err}") + await self._broadcast_event( + "team_synthesis", + {"content": "", "phases_completed": len(completed), + "phases_total": len(plan.phases), "status": "error", + "error": str(synth_err), "synthesis_id": synthesis_id}, + ) + raise +``` + +`asyncio.CancelledError` 在 Python 3.8+ 继承 `BaseException`,因此 `except Exception` 不会捕获它 — 两个 except 分支顺序正确,取消语义保留。 + +### Fix 4 (P2 #3): `synthesis_id` 跨 chunk 与终止事件去重 + +**后端**(`orchestrator.py` 约 line 289):在综合开始时生成 `synthesis_id = f"{plan.id}:synthesis"`,注入到 chunk 广播与终止事件。 + +```python +synthesis_id = f"{plan.id}:synthesis" +async def _broadcast_synthesis_chunk(data: dict[str, object]) -> None: + if isinstance(data, dict): + data = {**data, "synthesis_id": synthesis_id} + await self._broadcast_event("team_synthesis_chunk", data) +``` + +终止事件(Fix 3 的 cancelled / error 分支)也携带同一 `synthesis_id`。 + +**前端类型**(`src/agentkit/server/frontend/src/api/types.ts` line 156-157):扩展 `team_synthesis_chunk` 与 `team_synthesis` 的 data 类型以声明 `synthesis_id` / `status` / `error` 字段。 + +```typescript +| { type: 'team_synthesis_chunk'; data: { chunk: string; synthesis_id?: string } } +| { type: 'team_synthesis'; data: { content: string; phases_completed?: number; phases_total?: number; synthesis_id?: string; status?: 'completed' | 'error' | 'cancelled'; error?: string } } +``` + +**前端 handler**(`src/agentkit/server/frontend/src/stores/chatStream.ts` 约 line 907):用 `synthesis_id` 做精确 milestone 匹配,回退到"任一 streaming milestone"以兼容历史未携带 id 的事件。 + +```typescript +const sid = event.data.synthesis_id; +const finalStatus: 'completed' | 'error' = + event.data.status === 'error' || event.data.status === 'cancelled' + ? 'error' + : 'completed'; +const existing = findLastMessage( + conv.messages, + (m) => + m.message_type === "milestone" && + m.status === "streaming" && + (sid ? m.synthesis_id === sid || m.synthesis_id === undefined : true), +); +``` + +## Why This Works + +**Fix 1 — 合约对齐而非字段补全**:前端 `WsServerMessage` 是 discriminated union,每个 `type` 分支有窄化的 `data` 形状。`expert_step` 的窄化要求 `expert_id` / `expert_name` / `expert_color` / `content` / `step` 全部存在,缺失任一字段会让 TypeScript 窄化失败(运行时则回退到默认分支,UI 显示空卡片)。修复后后端生产者与前端消费者在 data shape 上 1:1 对齐,前端无需做防御性兜底。 + +**Fix 2 — 取消通道对称性**:`BaseAgent.execute()` 与 `execute_stream()` 都是 task 的执行入口,必须在 `_active_tokens` 注册上对称。`execute_stream` 是 async generator,`finally` 块保证即使 generator 提前 close(消费者 break / 外层 cancel)也会清理 token — 不会泄漏 dict 条目导致后续 cancel 误触发。`CancellationToken` 是协作式取消,`handle_task_stream` 内部在 await 边界检查 `token.is_cancelled()` 即可响应。 + +**Fix 3 — 终止事件类型对齐**:前端 milestone handler 监听的事件类型是 `team_synthesis`(带 `status` 字段),不是通用 `error`。外层 `except Exception` 广播 `error` 事件虽然对调试有用,但前端无法用它关闭 milestone — 事件类型不匹配。内层 try/except 在综合调用边界捕获异常并广播 `team_synthesis` 事件(`status: cancelled|error`),与前端 handler 监听的事件类型对齐。`raise` 保留异常传播,外层 handler 仍可记录日志 / 清理状态。 + +**Fix 4 — 稳定标识符优于位置匹配**:流式 chunk 与终止事件属于同一逻辑流,但 WS 是无序消息流,无法用"最后一条 streaming milestone"可靠匹配 — 并发团队 / 重试场景下多个 milestone 同时 streaming。`synthesis_id = f"{plan.id}:synthesis"` 用 plan id 命名空间隔离,保证同一团队的综合事件链共享 id,跨团队不冲突。前端匹配优先 `m.synthesis_id === sid`,回退 `m.synthesis_id === undefined` 以兼容历史事件(未注入 id 的旧消息)— 向后兼容。 + +## Prevention + +### 流式事件合约测试 + +新增 / 修改 `expert_step` / `team_synthesis` 等流式事件 payload 时,必须有测试断言:每个广播的 event data 满足前端 `WsServerMessage` 类型契约的所有必填字段。可考虑: + +- **后端契约测试**:在 `_phase_executor` / `orchestrator` 的流式测试中,断言每个 broadcast 的 event data 包含前端契约要求的全部字段(用 Pydantic 模型镜像前端 type,或直接断言字段集合)。 +- **前端类型同步**:后端字段变更时同步更新 `types.ts`,并依赖 `npm run typecheck` 暴露 drift。可考虑生成器从单一 schema 生成前后端类型(长期方案)。 + +### 取消通道注册 checklist + +新增任何 task 执行入口(`execute` / `execute_stream` / `execute_rewoo` 等)时: + +1. 在入口处 `self._active_tokens[task.task_id] = CancellationToken()` +2. 在 `finally` 块 `self._active_tokens.pop(task.task_id, None)` +3. 在内部 await 边界检查 `token.is_cancelled()`(协作式取消点) +4. 添加测试:调用 `cancel_task(task_id)` 后,generator 在下一个 yield 边界停止产出 + +### 终止事件对称性 + +任何"打开 streaming milestone"的事件类型 X,必须有对应的"关闭 milestone"事件类型 X(携带 `status: completed|error|cancelled`)。如果 X 的生产路径有多个异常分支,每个分支都必须广播 X 终止事件 — 不能依赖外层通用 `error` 事件关闭 milestone(事件类型不匹配)。 + +新增流式事件类型 X 时,问:所有异常路径是否都广播了 X 终止事件?包括 `CancelledError`、`Exception`、generator 提前 close。 + +### Milestone 标识符 + +任何需要"打开 → 流式 → 关闭"配对的消息类型,必须有稳定标识符(`{plan_id}:synthesis` 模式): + +- 打开事件携带 id +- 每个 chunk 携带 id +- 关闭事件携带同一 id +- 前端用 id 精确匹配,回退到位置匹配仅用于兼容历史数据 + +考虑抽象 `StreamSession`:打开时生成 id,所有 chunk / close 自动注入 id,避免每处手写。 + +### 残留 review 的优先级升级 + +ce-code-review 在 PR #13 时将 expert_step payload 缺口标为"建议" — 但前端合约不匹配实际上是静默降级,用户无感知。未来 review 中,"前端类型契约字段缺失"应直接归为 P1 而非建议,因为:合约不匹配 = 前端窄化失败 = UI 静默降级 = 用户无错误反馈 = 难以诊断的 UX 退化。 + +## Related + +- `docs/solutions/runtime-errors/streaming-event-whitelist-and-accumulation.md` — PR #13 的 3 个 P0/P2 流式事件 bug(WS 白名单缺失、token+final_answer 双重累积、异常范围过窄)。与本 doc 属同一流式事件合约簇,但 PR #13 与 PR #14 的 bug 谱系不同 — 建议未来用 ce-compound-refresh 合并为单一参考文档。 +- `docs/residual-review-findings/feat-ui-ue-enhancement.md` — 本 doc 4 个 residuals 的源 finding 列表。