168 lines
8.4 KiB
Markdown
168 lines
8.4 KiB
Markdown
---
|
||
module: experts/server
|
||
date: 2026-07-01
|
||
problem_type: runtime_error
|
||
component: assistant
|
||
severity: high
|
||
symptoms:
|
||
- "前端流式输出内容翻倍(token + final_answer 双重累积,'Hello' 变为 'HelloHello')"
|
||
- "WebSocket 客户端收不到新增流式事件类型(expert_result_chunk 等)"
|
||
- "LLM 抛出 LLMProviderError 时前端 streaming 卡死,无错误反馈"
|
||
root_cause: logic_error
|
||
resolution_type: code_fix
|
||
related_components:
|
||
- server/routes/chat
|
||
- core/react
|
||
- server/frontend/stores/chatStream
|
||
tags:
|
||
- streaming
|
||
- websocket
|
||
- react-engine
|
||
- phase-executor
|
||
- expert-team
|
||
- event-whitelist
|
||
- async-generator
|
||
---
|
||
|
||
# 流式事件白名单 + token 双重累积 + 异常穿透
|
||
|
||
## Problem
|
||
|
||
引入专家团队流式输出(U3/U4)后,前端聊天界面出现三种症状:
|
||
|
||
1. **内容翻倍**:专家流式回复内容在前端显示两次(`"Hello"` → `"HelloHello"`),最终结果与中间 token 拼接重复
|
||
2. **事件丢失**:新加入的 `expert_result_chunk` / `team_synthesis_chunk` 等 WS 事件到达前端时被静默丢弃,导致整个流式功能失效
|
||
3. **流式卡死**:LLM Provider 抛出非 `RuntimeError` 的异常(如 `LLMProviderError` / `ConfigValidationError`)时,流式生成器未广播错误事件,前端 streaming 状态永不结束
|
||
|
||
## Symptoms
|
||
|
||
- 专家团队讨论/综合的 token 流式输出在前端拼接为 `"HelloHello"`(token 内容 + final_answer 全文)
|
||
- `emit_team_event` 调用日志显示 200 OK,但前端 WebSocket 客户端从未收到 `expert_result_chunk` 事件
|
||
- LLM Provider 因 API key 失效抛出 `LLMProviderError` 时,前端 streaming 光标永久闪烁,UI 卡在"thinking"状态
|
||
- `_phase_executor` 的 `accumulated.append(output)` 被调用两次:一次 token 累积,一次 final_answer 累积
|
||
|
||
## What Didn't Work
|
||
|
||
- **假设 final_answer 是单独的"补充内容"**:原测试用非重叠内容验证(token="Hello" + final_answer=" World",断言 content == "Hello World"),通过了测试但掩盖了真实合约。ReActEngine 实际合约是:token 拼接 = final_answer output(两者内容相同)。
|
||
- **假设 `emit_team_event` 会广播所有事件类型**:调用 `emit_team_event` 后日志显示成功,但 `_VALID_TEAM_EVENT_TYPES` frozenset 实际上是一个白名单 — 不在白名单的事件被 `emit_team_event` 静默丢弃(log 级别 DEBUG,看不到)。"广播成功"的日志误导了调试。
|
||
- **假设 `except (RuntimeError, asyncio.TimeoutError, ConnectionError)` 足够**:LLM 网关的异常层次是 `LLMProviderError(Exception)` 而非 `LLMProviderError(RuntimeError)`,所以穿透了流式异常处理。`asyncio.CancelledError` 在 Python 3.8+ 继承 `BaseException`(非 `Exception`),不会被 `except Exception` 捕获,所以扩大 except 范围不会破坏取消语义。
|
||
|
||
## Solution
|
||
|
||
三处修复,均位于专家团队流式事件转发路径:
|
||
|
||
### Fix 1: WS 事件白名单扩充
|
||
|
||
`src/agentkit/server/routes/chat.py` 的 `_VALID_TEAM_EVENT_TYPES` frozenset 必须包含所有 `emit_team_event` 调用的事件类型:
|
||
|
||
```python
|
||
_VALID_TEAM_EVENT_TYPES = frozenset({
|
||
"team_formed", "expert_step", "expert_result",
|
||
"expert_result_chunk", # 新增:U3 流式 token
|
||
"expert_result_chunk_reset", # 新增:U3 retry 重置
|
||
"plan_update", "team_synthesis",
|
||
"team_synthesis_chunk", # 新增:U4 综合流式 token
|
||
"team_dissolved",
|
||
# ... 其余不变
|
||
})
|
||
```
|
||
|
||
### Fix 2: final_answer 不再重复累积
|
||
|
||
`src/agentkit/experts/_phase_executor.py` 在 `_execute_phase_stream` 中区分 token 与 final_answer 的角色:
|
||
|
||
```python
|
||
# 修改前(双重累积 bug):
|
||
elif etype == "final_answer":
|
||
output = event.data.get("output", "")
|
||
if output:
|
||
accumulated.append(str(output)) # BUG: token 已累积,再 append 全文
|
||
await self._broadcast_event("expert_result_chunk", {
|
||
"expert_id": expert.config.name, "content": output,
|
||
}) # BUG: 再广播一次全文
|
||
|
||
# 修改后:
|
||
elif etype == "final_answer":
|
||
output = event.data.get("output", "")
|
||
# ReActEngine 先发 token(增量)再发 final_answer(全文)。
|
||
# token 已累积时,final_answer 仅作完成信号,不重复 append/broadcast;
|
||
# 无 token(如 _wrap_sync_as_stream fallback)时用 output 兜底。
|
||
if output and not accumulated:
|
||
accumulated.append(str(output))
|
||
```
|
||
|
||
### Fix 3: 异常处理扩展到 `except Exception`
|
||
|
||
`_execute_phase_stream` 的异常处理从窄类型扩展到 `Exception`,覆盖 `LLMProviderError` / `ConfigValidationError` 等穿透异常:
|
||
|
||
```python
|
||
# 修改前:
|
||
except (RuntimeError, asyncio.TimeoutError, ConnectionError) as e:
|
||
# 流式异常 — 广播 expert_result(error) 携带已累积内容
|
||
|
||
# 修改后:
|
||
except Exception as e:
|
||
# 兜底捕获 LLMProviderError/ConfigValidationError 等非 RuntimeError
|
||
# asyncio.CancelledError (BaseException) 已被上方 except 捕获,不会到达此处
|
||
# 流式异常 — 广播 expert_result(error) 携带已累积内容
|
||
```
|
||
|
||
## Why This Works
|
||
|
||
**ReAct 流式合约**:`ReActEngine.execute_stream()` 是 async generator,合约是:
|
||
1. 逐个 yield `token` 事件,每个 `data.content` 是增量内容片段
|
||
2. 最后 yield 一个 `final_answer` 事件,`data.output` 是所有 token 拼接的完整文本
|
||
|
||
token 和 final_answer **内容相同**(前者是增量,后者是聚合)。因此下游消费者必须选择一种累积策略,不能两者都累积:
|
||
|
||
- **累积 token**(本次方案):增量 append,最终 `accumulated = ["Hel", "lo"]`,拼接为 `"Hello"`。final_answer 仅作为完成信号。
|
||
- **使用 final_answer**:忽略 token,等 final_answer 一次性拿到全文。但这样失去流式效果,且 `_wrap_sync_as_stream` fallback 路径没有 token 事件,必须依赖 final_answer output。
|
||
|
||
混合方案(两者都累积)必然重复,因为 ReActEngine 的合约已经保证 `final_answer.output == "".join(tokens)`。
|
||
|
||
**WS 事件白名单**:`emit_team_event` 是"开放发送 + 白名单接收"的设计 — 服务端会广播任何 event_type,但 `_VALID_TEAM_EVENT_TYPES` frozenset 决定哪些事件类型被转发到 WebSocket 客户端。不在白名单的事件在转发层被静默丢弃(返回 200,但不实际发送)。新增事件类型时必须同步更新白名单。
|
||
|
||
**异常层次**:Python 3.8+ 的 `asyncio.CancelledError` 继承 `BaseException` 而非 `Exception`,所以 `except Exception` 不会捕获取消信号。在 async generator 中扩大 except 范围是安全的 — 取消语义保留,同时覆盖 `LLMProviderError` 等穿透异常。
|
||
|
||
## Prevention
|
||
|
||
### 新增 WS 事件类型的 checklist
|
||
|
||
当后端新增需要转发到前端的 WS 事件类型时:
|
||
|
||
1. 在 `emit_team_event` 调用点使用新事件类型
|
||
2. **同步**在 `_VALID_TEAM_EVENT_TYPES` frozenset 中添加(`src/agentkit/server/routes/chat.py`)
|
||
3. 在前端 `chatStream.ts` 添加对应 handler
|
||
4. 添加测试验证 `emit_team_event` 调用后前端能收到事件
|
||
|
||
白名单缺失不会抛错,只会静默丢弃 — 这是隐蔽 bug 的温床。考虑添加单元测试断言:对所有 `_VALID_TEAM_EVENT_TYPES` 中的类型,`emit_team_event` 必须实际转发。
|
||
|
||
### ReAct 流式合约测试模板
|
||
|
||
测试 `execute_stream` 的消费者时,必须使用符合真实合约的 mock 事件序列:
|
||
|
||
```python
|
||
# 正确(符合合约):
|
||
events = [
|
||
ReActEvent(event_type="token", step=0, data={"content": "Hel"}),
|
||
ReActEvent(event_type="token", step=0, data={"content": "lo"}),
|
||
ReActEvent(event_type="final_answer", step=0, data={"output": "Hello"}),
|
||
]
|
||
# 断言:token 拼接 == final_answer output == "Hello"
|
||
|
||
# 错误(掩盖双重累积 bug):
|
||
events = [
|
||
ReActEvent(event_type="token", step=0, data={"content": "Hello"}),
|
||
ReActEvent(event_type="final_answer", step=0, data={"output": " World"}),
|
||
]
|
||
# 断言:content == "Hello World" — 通过测试但掩盖 bug
|
||
```
|
||
|
||
### 异常处理范围
|
||
|
||
在 async generator 的异常处理中,优先使用 `except Exception` 而非窄类型列表,除非:
|
||
- 需要区分取消与异常(用 `except asyncio.CancelledError: raise` 在前,然后 `except Exception`)
|
||
- 需要特定异常类型的特殊处理(如 `except asyncio.TimeoutError` 重试逻辑)
|
||
|
||
`except Exception` 在 async generator 中是安全的 — `CancelledError` 不会被意外捕获。
|