fischer-agentkit/docs/solutions/integration-issues/streaming-event-contract-re...

15 KiB
Raw Permalink Blame History

module date problem_type component severity category symptoms root_cause resolution_type tags related_components
experts 2026-07-01 integration_issue service_object high integration-issues
expert_step WebSocket events reached frontend with missing fields — frontend WsServerMessage contract silently degraded (no expert_id/expert_name/expert_color/content/step)
cancel_task() could not cooperatively cancel a streaming task — execute_stream continued emitting tokens after user cancellation
If synthesis streaming raised CancelledError or any Exception, no terminal team_synthesis event was emitted; frontend streaming milestone spun forever
Frontend could not precisely match a team_synthesis terminal event to its open streaming milestone across retries / concurrent teams (no stable synthesis_id)
logic_error code_fix
streaming
websocket
expert-team
event-contract
cancellation
cancellation-token
phase-executor
synthesis-id
frontend_websocket_layer

专家团队流式事件合约缺口expert_step payload / CancellationToken / synthesis 终止 / synthesis_id 去重)

Problem

PR #13 (feat/ui-ue-enhancement) 引入专家团队流式输出后ce-code-review 暴露了 17 个 findings2 P0、1 P1、8 P2、6 P3。PR #13 已修复 3 个 P0/P2 流式事件 bugdocs/solutions/runtime-errors/streaming-event-whitelist-and-accumulation.md),但留下 4 个 residuals1 P1 + 3 P2记录在 docs/residual-review-findings/feat-ui-ue-enhancement.md。PR #14commit 47a437cmerge 8e8843c)专门收尾这 4 个 residuals — 它们共同指向同一根因簇:专家团队流式事件合约在后端生产者与前端消费者之间没有可验证的契约对齐

具体而言:后端 _phase_executor 转发 expert_step 事件时只传部分字段,与前端 WsServerMessage 类型契约不匹配;execute_stream 绕过 BaseAgent.execute() 注册路径导致 CancellationToken 机制失效;综合流式异常路径未广播终止事件导致前端里程碑永久 spinning前端按内容匹配 milestone 在并发 / 重试场景下不可靠。

Symptoms

  • Bug 1 (P1):前端消费 expert_step 事件时静默降级 — thinking / tool_call / tool_result 子类型的 payload 缺失 expert_id / expert_name / expert_color / content / step 字段,前端 WsServerMessage 联合类型的窄化分支匹配不到UI 显示空卡片或回退到默认分支。
  • Bug 2 (P2 #1):用户点击取消后,cancel_task() 找不到 task_id 对应的 CancellationToken(因为 execute_stream 从未注册),流式 token 继续到达前端UI 出现"已取消但仍写入"的状态错乱。
  • Bug 3 (P2 #2):综合阶段抛出 asyncio.CancelledError 或任意 Exception 时,外层 handler 只广播了通用 error 事件,前端等待 team_synthesis 终止事件以关闭 milestone — 永远收不到milestone 进入永久 streaming 状态loading 光标无限闪烁)。
  • Bug 4 (P2 #3):前端用 findLastMessage 查找 message_type === "milestone" && status === "streaming" 的消息来匹配终止事件 — 在重试或并发团队场景下,可能匹配到错误的 milestone导致错误的卡片被 finalizecontent 错位 / 状态错乱)。

What Didn't Work

  • PR #13 已修复 3 个相关 bug 但未触及这 4 处:白名单扩充让新事件类型到达前端,双重累积修复让 token 不再重复,异常扩展到 except Exception 让 LLM 异常不再穿透 — 但都没有验证 expert_step payload 是否满足前端类型契约。白名单只解决了"事件丢失",没有解决"事件到达但字段不全"。
  • 假设 execute_streamBaseAgent.execute() 的注册路径:实际上 execute_stream 是独立 async generator直接 async for event in self.handle_task_stream(task): yield event,从未调用 self._active_tokens[task.task_id] = tokencancel_task() 在 dict 中查找 task_id 永远 misscancel 信号无通道。同步 execute() 路径正常工作掩盖了流式路径的 gap。
  • 依赖外层 except Exception 兜底综合异常:外层确实捕获并广播 error 事件,但前端 streaming milestone 监听的是 team_synthesis 事件类型(带 status 字段),不是通用 error。两者事件类型不同,前端 milestone handler 不会触发。需要内层专门广播 team_synthesis 终止事件。
  • 用消息位置 / 内容匹配 milestone:原 findLastMessage 仅按 status === "streaming" 查找最后一条 milestone — 在单团队单次综合场景下工作,但并发团队或综合重试时多个 milestone 同时 streaming匹配到第一个或最后一个都不可靠。需要稳定标识符。
  • ce-code-review 在 PR #13 时已发现部分信号:但 P1 的 expert_step payload 缺口当时被归类为"建议"而非 blocker导致 PR #13 合入时未阻塞;后续 review 才升级为 P1 residual。

Solution

Fix 1 (P1): expert_step payload 与前端契约对齐

src/agentkit/experts/_phase_executor.py_execute_phase_stream 转发 thinking / tool_call / tool_result 时补全前端契约所需字段。

# thinking 事件(约 line 244
elif etype == "thinking":
    await self._broadcast_event("expert_step", {
        "expert_id": expert.config.name,
        "expert_name": expert.config.name,
        "expert_color": expert.config.color,
        "content": event.data.get("content", ""),
        "step": "thinking",
    })

# tool_call / tool_result 事件(约 line 260
elif etype in ("tool_call", "tool_result"):
    await self._broadcast_event("expert_step", {
        "expert_id": expert.config.name,
        "expert_name": expert.config.name,
        "expert_color": expert.config.color,
        "content": (
            event.data.get("tool_name")
            or event.data.get("name")
            or etype
        ),
        "step": etype,
        "step_data": event.data,
    })

测试更新:tests/unit/experts/test_phase_executor_streaming.pytest_thinking_events_forwarded_as_expert_step 断言补全,新增 test_tool_call_events_forwarded_as_expert_step 覆盖 tool_call / tool_result 分支。

Fix 2 (P2 #1): execute_stream 注册 CancellationToken

src/agentkit/core/config_driven.py 在流式入口注册 tokenfinally 清理,与 BaseAgent.execute() 保持对称。新增 importfrom agentkit.core.protocol import AgentCapability, CancellationToken, TaskMessage

async def execute_stream(self, task: TaskMessage) -> AsyncGenerator[ReActEvent, None]:
    token = CancellationToken()
    self._active_tokens[task.task_id] = token
    try:
        await self._register_mcp_tools()
        async for event in self.handle_task_stream(task):
            yield event
    finally:
        self._active_tokens.pop(task.task_id, None)

cancel_task() 现在能在 _active_tokens 中找到 task_id 对应的 token 并触发取消信号,handle_task_stream 内部的协作式取消点(await asyncio.sleep / 工具调用边界)会响应。

Fix 3 (P2 #2): team_synthesis 终止事件广播

src/agentkit/experts/orchestrator.py 在综合调用周围添加内层 try/except捕获 CancelledErrorException,分别广播 status: cancelled / status: errorteam_synthesis 终止事件(携带 synthesis_id),然后 re-raise 保留异常传播。

try:
    final_result = await self._synthesize_results(
        lead, task, completed, broadcast_callback=_broadcast_synthesis_chunk
    )
except asyncio.CancelledError:
    await self._broadcast_event(
        "team_synthesis",
        {"content": "", "phases_completed": len(completed),
         "phases_total": len(plan.phases), "status": "cancelled",
         "synthesis_id": synthesis_id},
    )
    raise
except Exception as synth_err:
    logger.error(f"Synthesis streaming failed: {synth_err}")
    await self._broadcast_event(
        "team_synthesis",
        {"content": "", "phases_completed": len(completed),
         "phases_total": len(plan.phases), "status": "error",
         "error": str(synth_err), "synthesis_id": synthesis_id},
    )
    raise

asyncio.CancelledError 在 Python 3.8+ 继承 BaseException,因此 except Exception 不会捕获它 — 两个 except 分支顺序正确,取消语义保留。

Fix 4 (P2 #3): synthesis_id 跨 chunk 与终止事件去重

后端orchestrator.py 约 line 289在综合开始时生成 synthesis_id = f"{plan.id}:synthesis",注入到 chunk 广播与终止事件。

synthesis_id = f"{plan.id}:synthesis"
async def _broadcast_synthesis_chunk(data: dict[str, object]) -> None:
    if isinstance(data, dict):
        data = {**data, "synthesis_id": synthesis_id}
    await self._broadcast_event("team_synthesis_chunk", data)

终止事件Fix 3 的 cancelled / error 分支)也携带同一 synthesis_id

前端类型src/agentkit/server/frontend/src/api/types.ts line 156-157扩展 team_synthesis_chunkteam_synthesis 的 data 类型以声明 synthesis_id / status / error 字段。

| { type: 'team_synthesis_chunk'; data: { chunk: string; synthesis_id?: string } }
| { type: 'team_synthesis'; data: { content: string; phases_completed?: number; phases_total?: number; synthesis_id?: string; status?: 'completed' | 'error' | 'cancelled'; error?: string } }

前端 handlersrc/agentkit/server/frontend/src/stores/chatStream.ts 约 line 907synthesis_id 做精确 milestone 匹配,回退到"任一 streaming milestone"以兼容历史未携带 id 的事件。

const sid = event.data.synthesis_id;
const finalStatus: 'completed' | 'error' =
    event.data.status === 'error' || event.data.status === 'cancelled'
        ? 'error'
        : 'completed';
const existing = findLastMessage(
    conv.messages,
    (m) =>
        m.message_type === "milestone" &&
        m.status === "streaming" &&
        (sid ? m.synthesis_id === sid || m.synthesis_id === undefined : true),
);

Why This Works

Fix 1 — 合约对齐而非字段补全:前端 WsServerMessage 是 discriminated union每个 type 分支有窄化的 data 形状。expert_step 的窄化要求 expert_id / expert_name / expert_color / content / step 全部存在,缺失任一字段会让 TypeScript 窄化失败运行时则回退到默认分支UI 显示空卡片)。修复后后端生产者与前端消费者在 data shape 上 1:1 对齐,前端无需做防御性兜底。

Fix 2 — 取消通道对称性BaseAgent.execute()execute_stream() 都是 task 的执行入口,必须在 _active_tokens 注册上对称。execute_stream 是 async generatorfinally 块保证即使 generator 提前 close消费者 break / 外层 cancel也会清理 token — 不会泄漏 dict 条目导致后续 cancel 误触发。CancellationToken 是协作式取消,handle_task_stream 内部在 await 边界检查 token.is_cancelled() 即可响应。

Fix 3 — 终止事件类型对齐:前端 milestone handler 监听的事件类型是 team_synthesis(带 status 字段),不是通用 error。外层 except Exception 广播 error 事件虽然对调试有用,但前端无法用它关闭 milestone — 事件类型不匹配。内层 try/except 在综合调用边界捕获异常并广播 team_synthesis 事件(status: cancelled|error),与前端 handler 监听的事件类型对齐。raise 保留异常传播,外层 handler 仍可记录日志 / 清理状态。

Fix 4 — 稳定标识符优于位置匹配:流式 chunk 与终止事件属于同一逻辑流,但 WS 是无序消息流,无法用"最后一条 streaming milestone"可靠匹配 — 并发团队 / 重试场景下多个 milestone 同时 streaming。synthesis_id = f"{plan.id}:synthesis" 用 plan id 命名空间隔离,保证同一团队的综合事件链共享 id跨团队不冲突。前端匹配优先 m.synthesis_id === sid,回退 m.synthesis_id === undefined 以兼容历史事件(未注入 id 的旧消息)— 向后兼容。

Prevention

流式事件合约测试

新增 / 修改 expert_step / team_synthesis 等流式事件 payload 时,必须有测试断言:每个广播的 event data 满足前端 WsServerMessage 类型契约的所有必填字段。可考虑:

  • 后端契约测试:在 _phase_executor / orchestrator 的流式测试中,断言每个 broadcast 的 event data 包含前端契约要求的全部字段(用 Pydantic 模型镜像前端 type或直接断言字段集合
  • 前端类型同步:后端字段变更时同步更新 types.ts,并依赖 npm run typecheck 暴露 drift。可考虑生成器从单一 schema 生成前后端类型(长期方案)。

取消通道注册 checklist

新增任何 task 执行入口(execute / execute_stream / execute_rewoo 等)时:

  1. 在入口处 self._active_tokens[task.task_id] = CancellationToken()
  2. finallyself._active_tokens.pop(task.task_id, None)
  3. 在内部 await 边界检查 token.is_cancelled()(协作式取消点)
  4. 添加测试:调用 cancel_task(task_id)generator 在下一个 yield 边界停止产出

终止事件对称性

任何"打开 streaming milestone"的事件类型 X必须有对应的"关闭 milestone"事件类型 X携带 status: completed|error|cancelled)。如果 X 的生产路径有多个异常分支,每个分支都必须广播 X 终止事件 — 不能依赖外层通用 error 事件关闭 milestone事件类型不匹配

新增流式事件类型 X 时,问:所有异常路径是否都广播了 X 终止事件?包括 CancelledErrorException、generator 提前 close。

Milestone 标识符

任何需要"打开 → 流式 → 关闭"配对的消息类型,必须有稳定标识符({plan_id}:synthesis 模式):

  • 打开事件携带 id
  • 每个 chunk 携带 id
  • 关闭事件携带同一 id
  • 前端用 id 精确匹配,回退到位置匹配仅用于兼容历史数据

考虑抽象 StreamSession:打开时生成 id所有 chunk / close 自动注入 id避免每处手写。

残留 review 的优先级升级

ce-code-review 在 PR #13 时将 expert_step payload 缺口标为"建议" — 但前端合约不匹配实际上是静默降级,用户无感知。未来 review 中,"前端类型契约字段缺失"应直接归为 P1 而非建议,因为:合约不匹配 = 前端窄化失败 = UI 静默降级 = 用户无错误反馈 = 难以诊断的 UX 退化。

  • docs/solutions/runtime-errors/streaming-event-whitelist-and-accumulation.md — PR #13 的 3 个 P0/P2 流式事件 bugWS 白名单缺失、token+final_answer 双重累积、异常范围过窄)。与本 doc 属同一流式事件合约簇,但 PR #13 与 PR #14 的 bug 谱系不同 — 建议未来用 ce-compound-refresh 合并为单一参考文档。
  • docs/residual-review-findings/feat-ui-ue-enhancement.md — 本 doc 4 个 residuals 的源 finding 列表。