fischer-agentkit/docs/solutions/runtime-errors/streaming-event-whitelist-a...

8.4 KiB
Raw Permalink Blame History

module date problem_type component severity symptoms root_cause resolution_type related_components tags
experts/server 2026-07-01 runtime_error assistant high
前端流式输出内容翻倍token + final_answer 双重累积,'Hello' 变为 'HelloHello'
WebSocket 客户端收不到新增流式事件类型expert_result_chunk 等)
LLM 抛出 LLMProviderError 时前端 streaming 卡死,无错误反馈
logic_error code_fix
server/routes/chat
core/react
server/frontend/stores/chatStream
streaming
websocket
react-engine
phase-executor
expert-team
event-whitelist
async-generator

流式事件白名单 + token 双重累积 + 异常穿透

Problem

引入专家团队流式输出U3/U4前端聊天界面出现三种症状

  1. 内容翻倍:专家流式回复内容在前端显示两次("Hello""HelloHello"),最终结果与中间 token 拼接重复
  2. 事件丢失:新加入的 expert_result_chunk / team_synthesis_chunk 等 WS 事件到达前端时被静默丢弃,导致整个流式功能失效
  3. 流式卡死LLM Provider 抛出非 RuntimeError 的异常(如 LLMProviderError / ConfigValidationError)时,流式生成器未广播错误事件,前端 streaming 状态永不结束

Symptoms

  • 专家团队讨论/综合的 token 流式输出在前端拼接为 "HelloHello"token 内容 + final_answer 全文)
  • emit_team_event 调用日志显示 200 OK但前端 WebSocket 客户端从未收到 expert_result_chunk 事件
  • LLM Provider 因 API key 失效抛出 LLMProviderError 时,前端 streaming 光标永久闪烁UI 卡在"thinking"状态
  • _phase_executoraccumulated.append(output) 被调用两次:一次 token 累积,一次 final_answer 累积

What Didn't Work

  • 假设 final_answer 是单独的"补充内容"原测试用非重叠内容验证token="Hello" + final_answer=" World",断言 content == "Hello World"通过了测试但掩盖了真实合约。ReActEngine 实际合约是token 拼接 = final_answer output两者内容相同
  • 假设 emit_team_event 会广播所有事件类型:调用 emit_team_event 后日志显示成功,但 _VALID_TEAM_EVENT_TYPES frozenset 实际上是一个白名单 — 不在白名单的事件被 emit_team_event 静默丢弃log 级别 DEBUG看不到。"广播成功"的日志误导了调试。
  • 假设 except (RuntimeError, asyncio.TimeoutError, ConnectionError) 足够LLM 网关的异常层次是 LLMProviderError(Exception) 而非 LLMProviderError(RuntimeError),所以穿透了流式异常处理。asyncio.CancelledError 在 Python 3.8+ 继承 BaseException(非 Exception),不会被 except Exception 捕获,所以扩大 except 范围不会破坏取消语义。

Solution

三处修复,均位于专家团队流式事件转发路径:

Fix 1: WS 事件白名单扩充

src/agentkit/server/routes/chat.py_VALID_TEAM_EVENT_TYPES frozenset 必须包含所有 emit_team_event 调用的事件类型:

_VALID_TEAM_EVENT_TYPES = frozenset({
    "team_formed", "expert_step", "expert_result",
    "expert_result_chunk",        # 新增U3 流式 token
    "expert_result_chunk_reset",   # 新增U3 retry 重置
    "plan_update", "team_synthesis",
    "team_synthesis_chunk",        # 新增U4 综合流式 token
    "team_dissolved",
    # ... 其余不变
})

Fix 2: final_answer 不再重复累积

src/agentkit/experts/_phase_executor.py_execute_phase_stream 中区分 token 与 final_answer 的角色:

# 修改前(双重累积 bug
elif etype == "final_answer":
    output = event.data.get("output", "")
    if output:
        accumulated.append(str(output))      # BUG: token 已累积,再 append 全文
    await self._broadcast_event("expert_result_chunk", {
        "expert_id": expert.config.name, "content": output,
    })                                        # BUG: 再广播一次全文

# 修改后:
elif etype == "final_answer":
    output = event.data.get("output", "")
    # ReActEngine 先发 token增量再发 final_answer全文
    # token 已累积时final_answer 仅作完成信号,不重复 append/broadcast
    # 无 token如 _wrap_sync_as_stream fallback时用 output 兜底。
    if output and not accumulated:
        accumulated.append(str(output))

Fix 3: 异常处理扩展到 except Exception

_execute_phase_stream 的异常处理从窄类型扩展到 Exception,覆盖 LLMProviderError / ConfigValidationError 等穿透异常:

# 修改前:
except (RuntimeError, asyncio.TimeoutError, ConnectionError) as e:
    # 流式异常 — 广播 expert_result(error) 携带已累积内容

# 修改后:
except Exception as e:
    # 兜底捕获 LLMProviderError/ConfigValidationError 等非 RuntimeError
    # asyncio.CancelledError (BaseException) 已被上方 except 捕获,不会到达此处
    # 流式异常 — 广播 expert_result(error) 携带已累积内容

Why This Works

ReAct 流式合约ReActEngine.execute_stream() 是 async generator合约是

  1. 逐个 yield token 事件,每个 data.content 是增量内容片段
  2. 最后 yield 一个 final_answer 事件,data.output 是所有 token 拼接的完整文本

token 和 final_answer 内容相同(前者是增量,后者是聚合)。因此下游消费者必须选择一种累积策略,不能两者都累积:

  • 累积 token(本次方案):增量 append最终 accumulated = ["Hel", "lo"],拼接为 "Hello"。final_answer 仅作为完成信号。
  • 使用 final_answer:忽略 token等 final_answer 一次性拿到全文。但这样失去流式效果,且 _wrap_sync_as_stream fallback 路径没有 token 事件,必须依赖 final_answer output。

混合方案(两者都累积)必然重复,因为 ReActEngine 的合约已经保证 final_answer.output == "".join(tokens)

WS 事件白名单emit_team_event 是"开放发送 + 白名单接收"的设计 — 服务端会广播任何 event_type_VALID_TEAM_EVENT_TYPES frozenset 决定哪些事件类型被转发到 WebSocket 客户端。不在白名单的事件在转发层被静默丢弃(返回 200但不实际发送。新增事件类型时必须同步更新白名单。

异常层次Python 3.8+ 的 asyncio.CancelledError 继承 BaseException 而非 Exception,所以 except Exception 不会捕获取消信号。在 async generator 中扩大 except 范围是安全的 — 取消语义保留,同时覆盖 LLMProviderError 等穿透异常。

Prevention

新增 WS 事件类型的 checklist

当后端新增需要转发到前端的 WS 事件类型时:

  1. emit_team_event 调用点使用新事件类型
  2. 同步_VALID_TEAM_EVENT_TYPES frozenset 中添加(src/agentkit/server/routes/chat.py
  3. 在前端 chatStream.ts 添加对应 handler
  4. 添加测试验证 emit_team_event 调用后前端能收到事件

白名单缺失不会抛错,只会静默丢弃 — 这是隐蔽 bug 的温床。考虑添加单元测试断言:对所有 _VALID_TEAM_EVENT_TYPES 中的类型,emit_team_event 必须实际转发。

ReAct 流式合约测试模板

测试 execute_stream 的消费者时,必须使用符合真实合约的 mock 事件序列:

# 正确(符合合约):
events = [
    ReActEvent(event_type="token", step=0, data={"content": "Hel"}),
    ReActEvent(event_type="token", step=0, data={"content": "lo"}),
    ReActEvent(event_type="final_answer", step=0, data={"output": "Hello"}),
]
# 断言token 拼接 == final_answer output == "Hello"

# 错误(掩盖双重累积 bug
events = [
    ReActEvent(event_type="token", step=0, data={"content": "Hello"}),
    ReActEvent(event_type="final_answer", step=0, data={"output": " World"}),
]
# 断言content == "Hello World" — 通过测试但掩盖 bug

异常处理范围

在 async generator 的异常处理中,优先使用 except Exception 而非窄类型列表,除非:

  • 需要区分取消与异常(用 except asyncio.CancelledError: raise 在前,然后 except Exception
  • 需要特定异常类型的特殊处理(如 except asyncio.TimeoutError 重试逻辑)

except Exception 在 async generator 中是安全的 — CancelledError 不会被意外捕获。