fischer-agentkit/docs/solutions/integration-issues/streaming-event-contract-re...

225 lines
15 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
module: experts
date: 2026-07-01
problem_type: integration_issue
component: service_object
severity: high
category: integration-issues
symptoms:
- "expert_step WebSocket events reached frontend with missing fields — frontend WsServerMessage contract silently degraded (no expert_id/expert_name/expert_color/content/step)"
- "cancel_task() could not cooperatively cancel a streaming task — execute_stream continued emitting tokens after user cancellation"
- "If synthesis streaming raised CancelledError or any Exception, no terminal team_synthesis event was emitted; frontend streaming milestone spun forever"
- "Frontend could not precisely match a team_synthesis terminal event to its open streaming milestone across retries / concurrent teams (no stable synthesis_id)"
root_cause: logic_error
resolution_type: code_fix
tags:
- streaming
- websocket
- expert-team
- event-contract
- cancellation
- cancellation-token
- phase-executor
- synthesis-id
related_components:
- frontend_websocket_layer
---
# 专家团队流式事件合约缺口expert_step payload / CancellationToken / synthesis 终止 / synthesis_id 去重)
## Problem
PR #13 (`feat/ui-ue-enhancement`) 引入专家团队流式输出后ce-code-review 暴露了 17 个 findings2 P0、1 P1、8 P2、6 P3。PR #13 已修复 3 个 P0/P2 流式事件 bug`docs/solutions/runtime-errors/streaming-event-whitelist-and-accumulation.md`),但留下 4 个 residuals1 P1 + 3 P2记录在 `docs/residual-review-findings/feat-ui-ue-enhancement.md`。PR #14commit `47a437c`merge `8e8843c`)专门收尾这 4 个 residuals — 它们共同指向同一根因簇:**专家团队流式事件合约在后端生产者与前端消费者之间没有可验证的契约对齐**。
具体而言:后端 `_phase_executor` 转发 `expert_step` 事件时只传部分字段,与前端 `WsServerMessage` 类型契约不匹配;`execute_stream` 绕过 `BaseAgent.execute()` 注册路径导致 `CancellationToken` 机制失效;综合流式异常路径未广播终止事件导致前端里程碑永久 spinning前端按内容匹配 milestone 在并发 / 重试场景下不可靠。
## Symptoms
- **Bug 1 (P1)**:前端消费 `expert_step` 事件时静默降级 — `thinking` / `tool_call` / `tool_result` 子类型的 payload 缺失 `expert_id` / `expert_name` / `expert_color` / `content` / `step` 字段,前端 `WsServerMessage` 联合类型的窄化分支匹配不到UI 显示空卡片或回退到默认分支。
- **Bug 2 (P2 #1)**:用户点击取消后,`cancel_task()` 找不到 task_id 对应的 `CancellationToken`(因为 `execute_stream` 从未注册),流式 token 继续到达前端UI 出现"已取消但仍写入"的状态错乱。
- **Bug 3 (P2 #2)**:综合阶段抛出 `asyncio.CancelledError` 或任意 `Exception` 时,外层 handler 只广播了通用 `error` 事件,前端等待 `team_synthesis` 终止事件以关闭 milestone — 永远收不到milestone 进入永久 streaming 状态loading 光标无限闪烁)。
- **Bug 4 (P2 #3)**:前端用 `findLastMessage` 查找 `message_type === "milestone" && status === "streaming"` 的消息来匹配终止事件 — 在重试或并发团队场景下,可能匹配到错误的 milestone导致错误的卡片被 finalizecontent 错位 / 状态错乱)。
## What Didn't Work
- **PR #13 已修复 3 个相关 bug 但未触及这 4 处**:白名单扩充让新事件类型到达前端,双重累积修复让 token 不再重复,异常扩展到 `except Exception` 让 LLM 异常不再穿透 — 但都没有验证 `expert_step` payload 是否满足前端类型契约。白名单只解决了"事件丢失",没有解决"事件到达但字段不全"。
- **假设 `execute_stream``BaseAgent.execute()` 的注册路径**:实际上 `execute_stream` 是独立 async generator直接 `async for event in self.handle_task_stream(task): yield event`,从未调用 `self._active_tokens[task.task_id] = token`。`cancel_task()` 在 dict 中查找 task_id 永远 misscancel 信号无通道。同步 `execute()` 路径正常工作掩盖了流式路径的 gap。
- **依赖外层 `except Exception` 兜底综合异常**:外层确实捕获并广播 `error` 事件,但前端 streaming milestone 监听的是 `team_synthesis` 事件类型(带 `status` 字段),不是通用 `error`。两者事件类型不同,前端 milestone handler 不会触发。需要内层专门广播 `team_synthesis` 终止事件。
- **用消息位置 / 内容匹配 milestone**:原 `findLastMessage` 仅按 `status === "streaming"` 查找最后一条 milestone — 在单团队单次综合场景下工作,但并发团队或综合重试时多个 milestone 同时 streaming匹配到第一个或最后一个都不可靠。需要稳定标识符。
- **ce-code-review 在 PR #13 时已发现部分信号**:但 P1 的 expert_step payload 缺口当时被归类为"建议"而非 blocker导致 PR #13 合入时未阻塞;后续 review 才升级为 P1 residual。
## Solution
### Fix 1 (P1): `expert_step` payload 与前端契约对齐
`src/agentkit/experts/_phase_executor.py``_execute_phase_stream` 转发 thinking / tool_call / tool_result 时补全前端契约所需字段。
```python
# thinking 事件(约 line 244
elif etype == "thinking":
await self._broadcast_event("expert_step", {
"expert_id": expert.config.name,
"expert_name": expert.config.name,
"expert_color": expert.config.color,
"content": event.data.get("content", ""),
"step": "thinking",
})
# tool_call / tool_result 事件(约 line 260
elif etype in ("tool_call", "tool_result"):
await self._broadcast_event("expert_step", {
"expert_id": expert.config.name,
"expert_name": expert.config.name,
"expert_color": expert.config.color,
"content": (
event.data.get("tool_name")
or event.data.get("name")
or etype
),
"step": etype,
"step_data": event.data,
})
```
测试更新:`tests/unit/experts/test_phase_executor_streaming.py` 中 `test_thinking_events_forwarded_as_expert_step` 断言补全,新增 `test_tool_call_events_forwarded_as_expert_step` 覆盖 tool_call / tool_result 分支。
### Fix 2 (P2 #1): `execute_stream` 注册 `CancellationToken`
`src/agentkit/core/config_driven.py` 在流式入口注册 token`finally` 清理,与 `BaseAgent.execute()` 保持对称。新增 import`from agentkit.core.protocol import AgentCapability, CancellationToken, TaskMessage`。
```python
async def execute_stream(self, task: TaskMessage) -> AsyncGenerator[ReActEvent, None]:
token = CancellationToken()
self._active_tokens[task.task_id] = token
try:
await self._register_mcp_tools()
async for event in self.handle_task_stream(task):
yield event
finally:
self._active_tokens.pop(task.task_id, None)
```
`cancel_task()` 现在能在 `_active_tokens` 中找到 task_id 对应的 token 并触发取消信号,`handle_task_stream` 内部的协作式取消点(`await asyncio.sleep` / 工具调用边界)会响应。
### Fix 3 (P2 #2): `team_synthesis` 终止事件广播
`src/agentkit/experts/orchestrator.py` 在综合调用周围添加内层 try/except捕获 `CancelledError``Exception`,分别广播 `status: cancelled` / `status: error``team_synthesis` 终止事件(携带 `synthesis_id`),然后 re-raise 保留异常传播。
```python
try:
final_result = await self._synthesize_results(
lead, task, completed, broadcast_callback=_broadcast_synthesis_chunk
)
except asyncio.CancelledError:
await self._broadcast_event(
"team_synthesis",
{"content": "", "phases_completed": len(completed),
"phases_total": len(plan.phases), "status": "cancelled",
"synthesis_id": synthesis_id},
)
raise
except Exception as synth_err:
logger.error(f"Synthesis streaming failed: {synth_err}")
await self._broadcast_event(
"team_synthesis",
{"content": "", "phases_completed": len(completed),
"phases_total": len(plan.phases), "status": "error",
"error": str(synth_err), "synthesis_id": synthesis_id},
)
raise
```
`asyncio.CancelledError` 在 Python 3.8+ 继承 `BaseException`,因此 `except Exception` 不会捕获它 — 两个 except 分支顺序正确,取消语义保留。
### Fix 4 (P2 #3): `synthesis_id` 跨 chunk 与终止事件去重
**后端**`orchestrator.py` 约 line 289在综合开始时生成 `synthesis_id = f"{plan.id}:synthesis"`,注入到 chunk 广播与终止事件。
```python
synthesis_id = f"{plan.id}:synthesis"
async def _broadcast_synthesis_chunk(data: dict[str, object]) -> None:
if isinstance(data, dict):
data = {**data, "synthesis_id": synthesis_id}
await self._broadcast_event("team_synthesis_chunk", data)
```
终止事件Fix 3 的 cancelled / error 分支)也携带同一 `synthesis_id`
**前端类型**`src/agentkit/server/frontend/src/api/types.ts` line 156-157扩展 `team_synthesis_chunk``team_synthesis` 的 data 类型以声明 `synthesis_id` / `status` / `error` 字段。
```typescript
| { type: 'team_synthesis_chunk'; data: { chunk: string; synthesis_id?: string } }
| { type: 'team_synthesis'; data: { content: string; phases_completed?: number; phases_total?: number; synthesis_id?: string; status?: 'completed' | 'error' | 'cancelled'; error?: string } }
```
**前端 handler**`src/agentkit/server/frontend/src/stores/chatStream.ts` 约 line 907`synthesis_id` 做精确 milestone 匹配,回退到"任一 streaming milestone"以兼容历史未携带 id 的事件。
```typescript
const sid = event.data.synthesis_id;
const finalStatus: 'completed' | 'error' =
event.data.status === 'error' || event.data.status === 'cancelled'
? 'error'
: 'completed';
const existing = findLastMessage(
conv.messages,
(m) =>
m.message_type === "milestone" &&
m.status === "streaming" &&
(sid ? m.synthesis_id === sid || m.synthesis_id === undefined : true),
);
```
## Why This Works
**Fix 1 — 合约对齐而非字段补全**:前端 `WsServerMessage` 是 discriminated union每个 `type` 分支有窄化的 `data` 形状。`expert_step` 的窄化要求 `expert_id` / `expert_name` / `expert_color` / `content` / `step` 全部存在,缺失任一字段会让 TypeScript 窄化失败运行时则回退到默认分支UI 显示空卡片)。修复后后端生产者与前端消费者在 data shape 上 1:1 对齐,前端无需做防御性兜底。
**Fix 2 — 取消通道对称性**`BaseAgent.execute()` 与 `execute_stream()` 都是 task 的执行入口,必须在 `_active_tokens` 注册上对称。`execute_stream` 是 async generator`finally` 块保证即使 generator 提前 close消费者 break / 外层 cancel也会清理 token — 不会泄漏 dict 条目导致后续 cancel 误触发。`CancellationToken` 是协作式取消,`handle_task_stream` 内部在 await 边界检查 `token.is_cancelled()` 即可响应。
**Fix 3 — 终止事件类型对齐**:前端 milestone handler 监听的事件类型是 `team_synthesis`(带 `status` 字段),不是通用 `error`。外层 `except Exception` 广播 `error` 事件虽然对调试有用,但前端无法用它关闭 milestone — 事件类型不匹配。内层 try/except 在综合调用边界捕获异常并广播 `team_synthesis` 事件(`status: cancelled|error`),与前端 handler 监听的事件类型对齐。`raise` 保留异常传播,外层 handler 仍可记录日志 / 清理状态。
**Fix 4 — 稳定标识符优于位置匹配**:流式 chunk 与终止事件属于同一逻辑流,但 WS 是无序消息流,无法用"最后一条 streaming milestone"可靠匹配 — 并发团队 / 重试场景下多个 milestone 同时 streaming。`synthesis_id = f"{plan.id}:synthesis"` 用 plan id 命名空间隔离,保证同一团队的综合事件链共享 id跨团队不冲突。前端匹配优先 `m.synthesis_id === sid`,回退 `m.synthesis_id === undefined` 以兼容历史事件(未注入 id 的旧消息)— 向后兼容。
## Prevention
### 流式事件合约测试
新增 / 修改 `expert_step` / `team_synthesis` 等流式事件 payload 时,必须有测试断言:每个广播的 event data 满足前端 `WsServerMessage` 类型契约的所有必填字段。可考虑:
- **后端契约测试**:在 `_phase_executor` / `orchestrator` 的流式测试中,断言每个 broadcast 的 event data 包含前端契约要求的全部字段(用 Pydantic 模型镜像前端 type或直接断言字段集合
- **前端类型同步**:后端字段变更时同步更新 `types.ts`,并依赖 `npm run typecheck` 暴露 drift。可考虑生成器从单一 schema 生成前后端类型(长期方案)。
### 取消通道注册 checklist
新增任何 task 执行入口(`execute` / `execute_stream` / `execute_rewoo` 等)时:
1. 在入口处 `self._active_tokens[task.task_id] = CancellationToken()`
2.`finally``self._active_tokens.pop(task.task_id, None)`
3. 在内部 await 边界检查 `token.is_cancelled()`(协作式取消点)
4. 添加测试:调用 `cancel_task(task_id)`generator 在下一个 yield 边界停止产出
### 终止事件对称性
任何"打开 streaming milestone"的事件类型 X必须有对应的"关闭 milestone"事件类型 X携带 `status: completed|error|cancelled`)。如果 X 的生产路径有多个异常分支,每个分支都必须广播 X 终止事件 — 不能依赖外层通用 `error` 事件关闭 milestone事件类型不匹配
新增流式事件类型 X 时,问:所有异常路径是否都广播了 X 终止事件?包括 `CancelledError`、`Exception`、generator 提前 close。
### Milestone 标识符
任何需要"打开 → 流式 → 关闭"配对的消息类型,必须有稳定标识符(`{plan_id}:synthesis` 模式):
- 打开事件携带 id
- 每个 chunk 携带 id
- 关闭事件携带同一 id
- 前端用 id 精确匹配,回退到位置匹配仅用于兼容历史数据
考虑抽象 `StreamSession`:打开时生成 id所有 chunk / close 自动注入 id避免每处手写。
### 残留 review 的优先级升级
ce-code-review 在 PR #13 时将 expert_step payload 缺口标为"建议" — 但前端合约不匹配实际上是静默降级,用户无感知。未来 review 中,"前端类型契约字段缺失"应直接归为 P1 而非建议,因为:合约不匹配 = 前端窄化失败 = UI 静默降级 = 用户无错误反馈 = 难以诊断的 UX 退化。
## Related
- `docs/solutions/runtime-errors/streaming-event-whitelist-and-accumulation.md` — PR #13 的 3 个 P0/P2 流式事件 bugWS 白名单缺失、token+final_answer 双重累积、异常范围过窄)。与本 doc 属同一流式事件合约簇,但 PR #13 与 PR #14 的 bug 谱系不同 — 建议未来用 ce-compound-refresh 合并为单一参考文档。
- `docs/residual-review-findings/feat-ui-ue-enhancement.md` — 本 doc 4 个 residuals 的源 finding 列表。