From 4866a16109828ca8eef8077acc48e7d8f12bb88d Mon Sep 17 00:00:00 2001 From: chiguyong Date: Wed, 1 Jul 2026 13:15:01 +0800 Subject: [PATCH] docs: compound streaming-event-whitelist-and-accumulation learning MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Captures the ReAct streaming contract bug + WS event whitelist governance from PR #13's review fixes. Three intertwined runtime issues documented: 1. P0: final_answer double-accumulated token content (logic_error) 2. P0: _VALID_TEAM_EVENT_TYPES whitelist missing 3 new streaming event types 3. P2: except (RuntimeError, TimeoutError, ConnectionError) too narrow for LLMProviderError/ConfigValidationError in async generator Adds ReAct Streaming Contract entry to CONCEPTS.md — defines the protocol execute_stream() yields (token events with incremental content, then one final_answer event with the concatenated full text). Consumers must pick one accumulation strategy, cannot mix both without doubled output. --- CONCEPTS.md | 3 + ...eaming-event-whitelist-and-accumulation.md | 167 ++++++++++++++++++ 2 files changed, 170 insertions(+) create mode 100644 docs/solutions/runtime-errors/streaming-event-whitelist-and-accumulation.md diff --git a/CONCEPTS.md b/CONCEPTS.md index cc9d4c2..6406058 100644 --- a/CONCEPTS.md +++ b/CONCEPTS.md @@ -39,6 +39,9 @@ The feedback loop triggered when a verification check fails after a final answer ### Three-tier Degradation Chain The agent-level fallback sequence when the primary agent fails: main agent → Recovery tier (reuses `ReflexionEngine` for Evaluate→Reflect→Retry) → Emergency tier (rule-based fallback returning a structured error with suggestions). Each tier is independently configurable; the Recovery tier avoids new infrastructure by reusing the existing reflection engine, and the Emergency tier replaces the previous static-text fallback with actionable error structure. +### ReAct Streaming Contract +The protocol `ReActEngine.execute_stream()` yields to consumers: first zero or more `token` events whose `data.content` are incremental content fragments, then exactly one `final_answer` event whose `data.output` is the concatenation of all token fragments (the complete text). The two events carry the same content — token is the增量 view, final_answer is the聚合 view. Consumers must pick one accumulation strategy (append tokens, or wait for final_answer) and cannot mix both without producing doubled output. When `execute_stream()` is wrapped from a sync `execute()` via `_wrap_sync_as_stream`, no token events are emitted and final_answer's output is the sole content carrier. + ## Channels & Caching ### Per-User Cache Namespace diff --git a/docs/solutions/runtime-errors/streaming-event-whitelist-and-accumulation.md b/docs/solutions/runtime-errors/streaming-event-whitelist-and-accumulation.md new file mode 100644 index 0000000..0277fec --- /dev/null +++ b/docs/solutions/runtime-errors/streaming-event-whitelist-and-accumulation.md @@ -0,0 +1,167 @@ +--- +module: experts/server +date: 2026-07-01 +problem_type: runtime_error +component: assistant +severity: high +symptoms: + - "前端流式输出内容翻倍(token + final_answer 双重累积,'Hello' 变为 'HelloHello')" + - "WebSocket 客户端收不到新增流式事件类型(expert_result_chunk 等)" + - "LLM 抛出 LLMProviderError 时前端 streaming 卡死,无错误反馈" +root_cause: logic_error +resolution_type: code_fix +related_components: + - server/routes/chat + - core/react + - server/frontend/stores/chatStream +tags: + - streaming + - websocket + - react-engine + - phase-executor + - expert-team + - event-whitelist + - async-generator +--- + +# 流式事件白名单 + token 双重累积 + 异常穿透 + +## Problem + +引入专家团队流式输出(U3/U4)后,前端聊天界面出现三种症状: + +1. **内容翻倍**:专家流式回复内容在前端显示两次(`"Hello"` → `"HelloHello"`),最终结果与中间 token 拼接重复 +2. **事件丢失**:新加入的 `expert_result_chunk` / `team_synthesis_chunk` 等 WS 事件到达前端时被静默丢弃,导致整个流式功能失效 +3. **流式卡死**:LLM Provider 抛出非 `RuntimeError` 的异常(如 `LLMProviderError` / `ConfigValidationError`)时,流式生成器未广播错误事件,前端 streaming 状态永不结束 + +## Symptoms + +- 专家团队讨论/综合的 token 流式输出在前端拼接为 `"HelloHello"`(token 内容 + final_answer 全文) +- `emit_team_event` 调用日志显示 200 OK,但前端 WebSocket 客户端从未收到 `expert_result_chunk` 事件 +- LLM Provider 因 API key 失效抛出 `LLMProviderError` 时,前端 streaming 光标永久闪烁,UI 卡在"thinking"状态 +- `_phase_executor` 的 `accumulated.append(output)` 被调用两次:一次 token 累积,一次 final_answer 累积 + +## What Didn't Work + +- **假设 final_answer 是单独的"补充内容"**:原测试用非重叠内容验证(token="Hello" + final_answer=" World",断言 content == "Hello World"),通过了测试但掩盖了真实合约。ReActEngine 实际合约是:token 拼接 = final_answer output(两者内容相同)。 +- **假设 `emit_team_event` 会广播所有事件类型**:调用 `emit_team_event` 后日志显示成功,但 `_VALID_TEAM_EVENT_TYPES` frozenset 实际上是一个白名单 — 不在白名单的事件被 `emit_team_event` 静默丢弃(log 级别 DEBUG,看不到)。"广播成功"的日志误导了调试。 +- **假设 `except (RuntimeError, asyncio.TimeoutError, ConnectionError)` 足够**:LLM 网关的异常层次是 `LLMProviderError(Exception)` 而非 `LLMProviderError(RuntimeError)`,所以穿透了流式异常处理。`asyncio.CancelledError` 在 Python 3.8+ 继承 `BaseException`(非 `Exception`),不会被 `except Exception` 捕获,所以扩大 except 范围不会破坏取消语义。 + +## Solution + +三处修复,均位于专家团队流式事件转发路径: + +### Fix 1: WS 事件白名单扩充 + +`src/agentkit/server/routes/chat.py` 的 `_VALID_TEAM_EVENT_TYPES` frozenset 必须包含所有 `emit_team_event` 调用的事件类型: + +```python +_VALID_TEAM_EVENT_TYPES = frozenset({ + "team_formed", "expert_step", "expert_result", + "expert_result_chunk", # 新增:U3 流式 token + "expert_result_chunk_reset", # 新增:U3 retry 重置 + "plan_update", "team_synthesis", + "team_synthesis_chunk", # 新增:U4 综合流式 token + "team_dissolved", + # ... 其余不变 +}) +``` + +### Fix 2: final_answer 不再重复累积 + +`src/agentkit/experts/_phase_executor.py` 在 `_execute_phase_stream` 中区分 token 与 final_answer 的角色: + +```python +# 修改前(双重累积 bug): +elif etype == "final_answer": + output = event.data.get("output", "") + if output: + accumulated.append(str(output)) # BUG: token 已累积,再 append 全文 + await self._broadcast_event("expert_result_chunk", { + "expert_id": expert.config.name, "content": output, + }) # BUG: 再广播一次全文 + +# 修改后: +elif etype == "final_answer": + output = event.data.get("output", "") + # ReActEngine 先发 token(增量)再发 final_answer(全文)。 + # token 已累积时,final_answer 仅作完成信号,不重复 append/broadcast; + # 无 token(如 _wrap_sync_as_stream fallback)时用 output 兜底。 + if output and not accumulated: + accumulated.append(str(output)) +``` + +### Fix 3: 异常处理扩展到 `except Exception` + +`_execute_phase_stream` 的异常处理从窄类型扩展到 `Exception`,覆盖 `LLMProviderError` / `ConfigValidationError` 等穿透异常: + +```python +# 修改前: +except (RuntimeError, asyncio.TimeoutError, ConnectionError) as e: + # 流式异常 — 广播 expert_result(error) 携带已累积内容 + +# 修改后: +except Exception as e: + # 兜底捕获 LLMProviderError/ConfigValidationError 等非 RuntimeError + # asyncio.CancelledError (BaseException) 已被上方 except 捕获,不会到达此处 + # 流式异常 — 广播 expert_result(error) 携带已累积内容 +``` + +## Why This Works + +**ReAct 流式合约**:`ReActEngine.execute_stream()` 是 async generator,合约是: +1. 逐个 yield `token` 事件,每个 `data.content` 是增量内容片段 +2. 最后 yield 一个 `final_answer` 事件,`data.output` 是所有 token 拼接的完整文本 + +token 和 final_answer **内容相同**(前者是增量,后者是聚合)。因此下游消费者必须选择一种累积策略,不能两者都累积: + +- **累积 token**(本次方案):增量 append,最终 `accumulated = ["Hel", "lo"]`,拼接为 `"Hello"`。final_answer 仅作为完成信号。 +- **使用 final_answer**:忽略 token,等 final_answer 一次性拿到全文。但这样失去流式效果,且 `_wrap_sync_as_stream` fallback 路径没有 token 事件,必须依赖 final_answer output。 + +混合方案(两者都累积)必然重复,因为 ReActEngine 的合约已经保证 `final_answer.output == "".join(tokens)`。 + +**WS 事件白名单**:`emit_team_event` 是"开放发送 + 白名单接收"的设计 — 服务端会广播任何 event_type,但 `_VALID_TEAM_EVENT_TYPES` frozenset 决定哪些事件类型被转发到 WebSocket 客户端。不在白名单的事件在转发层被静默丢弃(返回 200,但不实际发送)。新增事件类型时必须同步更新白名单。 + +**异常层次**:Python 3.8+ 的 `asyncio.CancelledError` 继承 `BaseException` 而非 `Exception`,所以 `except Exception` 不会捕获取消信号。在 async generator 中扩大 except 范围是安全的 — 取消语义保留,同时覆盖 `LLMProviderError` 等穿透异常。 + +## Prevention + +### 新增 WS 事件类型的 checklist + +当后端新增需要转发到前端的 WS 事件类型时: + +1. 在 `emit_team_event` 调用点使用新事件类型 +2. **同步**在 `_VALID_TEAM_EVENT_TYPES` frozenset 中添加(`src/agentkit/server/routes/chat.py`) +3. 在前端 `chatStream.ts` 添加对应 handler +4. 添加测试验证 `emit_team_event` 调用后前端能收到事件 + +白名单缺失不会抛错,只会静默丢弃 — 这是隐蔽 bug 的温床。考虑添加单元测试断言:对所有 `_VALID_TEAM_EVENT_TYPES` 中的类型,`emit_team_event` 必须实际转发。 + +### ReAct 流式合约测试模板 + +测试 `execute_stream` 的消费者时,必须使用符合真实合约的 mock 事件序列: + +```python +# 正确(符合合约): +events = [ + ReActEvent(event_type="token", step=0, data={"content": "Hel"}), + ReActEvent(event_type="token", step=0, data={"content": "lo"}), + ReActEvent(event_type="final_answer", step=0, data={"output": "Hello"}), +] +# 断言:token 拼接 == final_answer output == "Hello" + +# 错误(掩盖双重累积 bug): +events = [ + ReActEvent(event_type="token", step=0, data={"content": "Hello"}), + ReActEvent(event_type="final_answer", step=0, data={"output": " World"}), +] +# 断言:content == "Hello World" — 通过测试但掩盖 bug +``` + +### 异常处理范围 + +在 async generator 的异常处理中,优先使用 `except Exception` 而非窄类型列表,除非: +- 需要区分取消与异常(用 `except asyncio.CancelledError: raise` 在前,然后 `except Exception`) +- 需要特定异常类型的特殊处理(如 `except asyncio.TimeoutError` 重试逻辑) + +`except Exception` 在 async generator 中是安全的 — `CancelledError` 不会被意外捕获。