fischer-agentkit/docs/plans/2026-06-17-002-fix-ws-task-...

434 lines
24 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
title: "fix: WebSocket 断开后任务结果丢失与断线恢复"
status: completed
created: 2026-06-17
type: fix
origin: in-session investigation
---
# fix: WebSocket 断开后任务结果丢失与断线恢复
## Summary
当用户在复杂任务执行过程中刷新页面时WebSocket 断开导致 ReAct 任务中断、已收集的输出丢失、且无法恢复。本计划通过三层防御彻底解决Layer 1 确保部分结果持久化Layer 2 将任务后台化解耦 WebSocket 生命周期Layer 3 前端断线后恢复进行中任务。
## Problem Frame
**症状**:用户布置复杂任务后刷新界面,操作停止且返回为空。
**根因**(经代码调查确认):
1. **结果丢失**portal.py 的 ReAct 流式路径中assistant 回复保存在 `async for` 循环之后portal.py:1064WebSocket 断开时该行永远不执行,`collected_output` 全部丢失。
2. **任务中断**ReAct 任务通过 `async for event in react_engine.execute_stream(...)` 直接在 WebSocket 协程中执行,未使用 BackgroundRunner 后台化WebSocket 断开 = 任务取消。
3. **无状态追踪**portal WebSocket 路径不使用 TaskStore`task_id` 仅用于 EventQueue 事件发射,任务状态无法查询。
4. **无恢复机制**:前端 WebSocket 重连后3 秒自动重连),不检查是否有未完成任务,不恢复进行中的任务状态。
## Requirements
- **R1**: WebSocket 断开时,已收集的 ReAct 输出必须持久化到 conversation store
- **R2**: ReAct 任务必须后台执行,与 WebSocket 连接生命周期解耦
- **R3**: 每个任务必须在 TaskStore 中注册,状态可查询
- **R4**: 后台任务的事件必须通过 EventQueue 分发WebSocket 可订阅
- **R5**: 前端 WebSocket 重连后,必须检查当前对话是否有未完成任务并恢复
- **R6**: 已完成的任务结果必须能从 conversation store 恢复显示
- **R7**: 不破坏现有 DIRECT_CHAT 路径和 REST API 路径
## Key Technical Decisions
### KTD1: 复用 EventQueue 作为任务事件总线
**决策**:使用现有的 `EventQueue``core/event_queue.py`)分发后台任务事件,而非新建 EventBus 模块。
**理由**EventQueue 已具备所需能力——多订阅者广播(行 161-175、缓冲回放deque 100行 153、原子订阅行 193-202、哨兵关闭模式行 230-243。它已被 portal.py 的 `_emit_event_safe` 使用,且文件头注释明确说明设计目标包括 Portal WebSocket 订阅。
### KTD2: 任务后台化使用 asyncio.create_task + EventQueue 订阅
**决策**:在 portal.py 中,将 ReAct 执行包装为 `asyncio.create_task`WebSocket 协程通过 `event_queue.subscribe()` 订阅事件流。任务在后台独立运行WebSocket 仅转发事件给前端。
**理由**BackgroundRunner`server/runner.py`)虽然存在,但它调用 `agent.execute()`(非流式),不支持事件流。改造 BackgroundRunner 支持流式会侵入核心执行路径。直接在 portal.py 中用 `asyncio.create_task` 更聚焦、风险更低。
### KTD3: 任务状态通过 TaskStore 追踪
**决策**:在 portal.py WebSocket 路径中,为每个用户消息创建 TaskStore 记录状态随任务进展更新PENDING → RUNNING → COMPLETED/FAILED
**理由**TaskStore 已在 `app.state.task_store` 可用,`/api/v1/tasks/{task_id}` 端点已存在。前端可通过现有 REST API 查询任务状态,无需新建端点。
### KTD4: 前端通过 conversation_id 关联任务
**决策**:在 TaskStore 记录的 `metadata` 中存储 `conversation_id`,前端重连后通过 `GET /api/v1/tasks?status=running` 查找当前对话的未完成任务。
**理由**:避免新建专门的"按 conversation 查任务"端点。前端遍历 running 任务,匹配 metadata 中的 conversation_id 即可。
## High-Level Technical Design
```
┌──────────────────────────────────────────────────────────────────┐
│ WebSocket 连接存活 │
│ ┌────────────┐ subscribe(task_id) ┌──────────────────┐ │
│ │ WebSocket │ ◄────────────────────── │ EventQueue │ │
│ │ 协程 │ 转发事件给前端 │ (缓冲回放+广播) │ │
│ └────────────┘ └────────┬─────────┘ │
│ │ emit │
├──────────────────────────────────────────────────┼─────────────┤
│ WebSocket 断开 │ │
│ ┌────────────┐ │ │
│ │ WebSocket │ ✗ 断开 │ │
│ │ 协程 │ │ │
│ └────────────┘ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 后台任务 (asyncio.create_task) │ │
│ │ ┌─────────────┐ ┌──────────────┐ ┌────────────────┐ │ │
│ │ │ ReAct 执行 │─►│ EventQueue │ │ TaskStore │ │ │
│ │ │ execute_ │ │ .emit(event) │ │ .update_status │ │ │
│ │ │ stream() │ │ │ │ │ │ │
│ │ └─────────────┘ └──────────────┘ └────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌────────────────────────────────────────────────┐ │ │
│ │ │ conversation_store.add_message(assistant, ...) │ │ │
│ │ │ (无论 WebSocket 是否存活,结果都持久化) │ │ │
│ │ └────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ 前端重连后: │
│ 1. GET /api/v1/tasks?status=running → 查找未完成任务 │
│ 2. 匹配 metadata.conversation_id → 当前对话的任务 │
│ 3. event_queue.subscribe(task_id) → 恢复事件流 │
│ 4. 或 GET /conversations/{id} → 拉取已完成的结果 │
└──────────────────────────────────────────────────────────────────┘
```
## Scope Boundaries
### In Scope
- portal.py WebSocket 路径的 ReAct 任务后台化
- WebSocket 断开时的部分结果持久化
- TaskStore 注册和状态更新
- EventQueue 事件分发集成
- 前端 WebSocket 重连后的任务恢复
- 前端任务状态查询 API 客户端
### Out of Scope
- BackgroundRunner 改造支持流式(风险过高,单独处理)
- REST API 路径(`/portal/chat`的改造REST 已是同步调用,无此问题)
- SSE 路径(`/portal/chat/stream`)的改造
- Expert Team 协作模式的后台化(单独处理)
- WebSocket 重连退避策略优化(独立改进)
### Deferred to Follow-Up Work
- SubmissionQueue 接入(目前完全闲置,可后续用于任务队列化)
- Redis 分布式任务恢复(当前 InMemoryTaskStore 足够,分布式场景后续处理)
- 任务进度百分比反馈(当前 ReAct 无细粒度进度概念)
---
## Implementation Units
### U1. Layer 1: WebSocket 断开时持久化已收集的输出
**Goal**: 确保 WebSocket 在 ReAct 流式过程中断开时,已收集的 `collected_output` 保存到 conversation store。
**Requirements**: R1
**Dependencies**: 无
**Files**:
- `src/agentkit/server/routes/portal.py` — 修改 ReAct 流式路径的异常处理
- `tests/unit/test_portal_ws_persistence.py` — 新建测试
**Approach**:
在 portal.py 的 `portal_websocket` 函数中ReAct 流式路径(约 1014-1064 行)的异常处理需要增强:
1.`except Exception as e:` 块(约 1049 行)中,在发送 error 之前,将 `collected_output` 保存到 `_conversation_store`。使用 `_ensure_non_empty` 处理空输出。
2. 在外层 `except WebSocketDisconnect:` 块(约 1102 行)中,检查是否有未保存的 `collected_output`,如果有则保存。
3. 在外层 `except Exception as e:` 块(约 1104 行)中,同样处理。
4. 需要将 `collected_output`、`conv`、`task_id` 声明提升到外层 try 作用域,使 except 块可以访问。
**Patterns to follow**:
- `_ensure_non_empty()` 函数portal.py:58-62处理空输出
- `_conversation_store.add_message()` 已有调用模式portal.py:871, 952, 1064
- `_emit_event_safe()` 的异常吞咽模式portal.py:65-95——保存操作失败不应阻断后续清理
**Test scenarios**:
- **Happy path**: ReAct 正常完成,`collected_output` 有内容WebSocket 未断开 → 结果保存到 conversation store验证现有行为不被破坏
- **WebSocket 断开final_answer 前)**: ReAct 流式中 WebSocket 断开,`collected_output` 为空 → conversation store 不写入空消息(或写入 EMPTY_LLM_RESPONSE
- **WebSocket 断开final_answer 后)**: ReAct 已产出 final_answerWebSocket 在后续步骤断开,`collected_output` 有部分内容 → 部分内容保存到 conversation store
- **异常路径**: ReAct 执行抛出异常,`collected_output` 有部分内容 → 部分内容保存error 事件正常发射
- **Integration**: 保存操作自身失败conversation_store 异常)→ 不阻断 error 事件发射和后续清理
**Verification**: 模拟 WebSocket 断开场景,检查 SQLite conversation store 中是否有 assistant 消息记录。
---
### U2. Layer 2: ReAct 任务后台化与 EventQueue 事件分发
**Goal**: 将 ReAct 执行从 WebSocket 协程中解耦,后台执行并通过 EventQueue 分发事件。WebSocket 仅订阅事件流并转发给前端。
**Requirements**: R2, R4
**Dependencies**: U1
**Files**:
- `src/agentkit/server/routes/portal.py` — 重构 ReAct 流式路径
- `src/agentkit/core/event_queue.py` — 可能需要扩展(添加 task_id 过滤订阅)
- `tests/unit/test_portal_ws_background_task.py` — 新建测试
**Approach**:
1. **提取后台执行函数**:创建 `_execute_react_background()` 协程接收所有必要参数messages, tools, model, agent_name, system_prompt, timeout_seconds, conv_id, task_id, event_queue, conversation_store。该函数
- 执行 `react_engine.execute_stream()`
- 每个事件通过 `event_queue.emit()` 分发
- `final_answer` 事件时累积输出
- 正常结束后保存到 `conversation_store`
- 异常时保存部分输出并发射 error 事件
- 无论成功失败,都发射 `task.completed``task.failed` 事件
2. **WebSocket 协程改为订阅者**:在 portal.py 的 ReAct 路径中:
- 创建 task_id 并注册到 TaskStorePENDING → RUNNING
- `asyncio.create_task(_execute_react_background(...))` 启动后台任务
- `async for event in event_queue.subscribe(task_id)` 订阅事件
- 将事件通过 `websocket.send_json()` 转发给前端
- WebSocket 断开时,`async for` 循环退出,后台任务继续运行
3. **EventQueue 扩展**:当前 `subscribe()` 返回所有事件。需要添加按 `task_id` 过滤的订阅能力,或在前端转发时过滤。优先选择在 EventQueue 中添加 `subscribe(task_id=None)` 参数过滤。
4. **DIRECT_CHAT 路径**DIRECT_CHAT 是同步 LLM 调用(非流式),保持现有逻辑不变,但同样在 TaskStore 注册。
**Technical design**directional guidance:
```python
# 后台执行函数(伪代码)
async def _execute_react_background(
react_engine, messages, tools, model, agent_name,
system_prompt, timeout_seconds,
conv_id, task_id, event_queue, conversation_store
):
collected_output = []
try:
async for event in react_engine.execute_stream(...):
if event.event_type == "final_answer":
collected_output.append(event.data.get("output", ""))
await event_queue.emit(Event.create(
event_type=event.event_type,
task_id=task_id,
session_id=conv_id,
data={"step": event.step, "data": event.data, ...}
))
# 正常完成
response_text = _ensure_non_empty("".join(collected_output))
await conversation_store.add_message(conv_id, "assistant", response_text)
await event_queue.emit(Event.create(
event_type="task.completed", task_id=task_id, ...
))
except Exception as e:
# 保存部分输出
if collected_output:
partial = _ensure_non_empty("".join(collected_output))
await conversation_store.add_message(conv_id, "assistant", partial)
await event_queue.emit(Event.create(
event_type="task.failed", task_id=task_id,
data={"error": str(e)}
))
# WebSocket 协程(伪代码)
task_id = str(uuid.uuid4())
task_store.create(task_id, agent_name, {"conversation_id": conv.id})
asyncio.create_task(_execute_react_background(...))
# WebSocket 订阅事件并转发
async for event in event_queue.subscribe(task_id=task_id):
if event.event_type in ("task.completed", "task.failed"):
await websocket.send_json({"type": "result", ...})
break
await websocket.send_json({"type": "step", "data": ...})
# WebSocket 断开时 async for 退出,后台任务继续
```
**Patterns to follow**:
- EventQueue 的 `subscribe()` + 哨兵关闭模式event_queue.py:193-243
- `_emit_event_safe()` 的异常吞咽模式portal.py:65-95
- BackgroundRunner 的 `asyncio.create_task` + `_on_done` 回调模式runner.py:55-73
**Test scenarios**:
- **Happy path**: 后台任务正常完成,事件通过 EventQueue 分发WebSocket 收到所有事件和最终结果
- **WebSocket 断开**: 后台任务继续运行,结果保存到 conversation storeTaskStore 状态更新为 COMPLETED
- **后台任务异常**: ReAct 执行抛异常部分输出保存TaskStore 状态为 FAILEDerror 事件发射
- **EventQueue 订阅过滤**: 多个并发任务,每个 WebSocket 只收到自己 task_id 的事件
- **Integration**: 后台任务完成后conversation store 有 assistant 消息TaskStore 有 COMPLETED 记录
**Verification**: 启动后台任务后立即断开 WebSocket等待任务完成后检查 conversation store 和 TaskStore。
---
### U3. Layer 2: TaskStore 注册与状态追踪
**Goal**: 在 portal WebSocket 路径中为每个用户消息创建 TaskStore 记录,状态随任务进展更新。
**Requirements**: R3
**Dependencies**: U2
**Files**:
- `src/agentkit/server/routes/portal.py` — 集成 TaskStore
- `tests/unit/test_portal_ws_task_tracking.py` — 新建测试
**Approach**:
1. 在 portal.py 的 WebSocket 路径中,获取 `task_store` from `websocket.app.state.task_store`
2. 在用户消息处理开始时,调用 `task_store.create(task_id, agent_name, {"conversation_id": conv.id, "message": message_text})`
3. 在后台任务执行前,更新状态为 `RUNNING``started_at=now`)。
4. 在后台任务完成时,更新状态为 `COMPLETED``output_data={"output": response_text}, completed_at=now, progress=1.0`)。
5. 在后台任务失败时,更新状态为 `FAILED``error_message=str(e), completed_at=now`)。
6. DIRECT_CHAT 路径同样注册 TaskStore同步调用状态直接从 PENDING → COMPLETED
**Patterns to follow**:
- BackgroundRunner._run_task 的状态更新模式runner.py:89-92, 141-147, 153-157
- TaskStore.create/update_status 的调用签名task_store.py:127-163
**Test scenarios**:
- **Happy path**: 用户发送消息 → TaskStore 创建 PENDING 记录 → 任务执行中状态为 RUNNING → 完成后状态为 COMPLETEDoutput_data 有内容
- **任务失败**: ReAct 执行异常 → TaskStore 状态为 FAILEDerror_message 有内容
- **WebSocket 断开后查询**: WebSocket 断开,后台任务继续 → 通过 `GET /api/v1/tasks/{task_id}` 能查到 RUNNING 状态 → 任务完成后查到 COMPLETED
- **metadata 包含 conversation_id**: TaskStore 记录的 metadata 中有 conversation_id 字段
- **Integration**: 前端通过 `GET /api/v1/tasks?status=running` 能找到当前对话的运行中任务
**Verification**: 通过 REST API `GET /api/v1/tasks/{task_id}` 查询任务状态,验证状态流转正确。
---
### U4. Layer 3: 前端任务状态查询 API 客户端
**Goal**: 在前端 API 客户端中添加任务状态查询方法,支持按 status 过滤和按 task_id 查询。
**Requirements**: R5, R6
**Dependencies**: U3
**Files**:
- `src/agentkit/server/frontend/src/api/client.ts` — 添加任务 API 方法
- `src/agentkit/server/frontend/src/api/types.ts` — 添加任务类型定义
**Approach**:
1.`types.ts` 中添加 `ITaskRecord` 接口,对应后端 `TaskRecord.to_dict()` 的输出格式。
2.`client.ts` 中添加方法:
- `getTask(taskId: string): Promise<ITaskRecord>` — GET `/api/v1/tasks/{taskId}`注意tasks 路由前缀是 `/api/v1/tasks`,不是 `/api/v1/portal`
- `listTasks(status?: string): Promise<ITaskRecord[]>` — GET `/api/v1/tasks?status=running`
3. 由于 tasks 路由前缀不同(`/api/v1/tasks` vs `/api/v1/portal`),需要创建一个新的 ApiClient 实例或调整 BaseApiClient 的 baseUrl。
**Patterns to follow**:
- 现有 `getConversations()` / `getConversation(id)` 的方法签名模式client.ts:30-37
- `IConversation` 接口定义模式types.ts:51-57
**Test scenarios**:
- **Happy path**: 调用 `getTask(taskId)` 返回正确的任务记录
- **按状态过滤**: 调用 `listTasks("running")` 只返回运行中任务
- **任务不存在**: 调用 `getTask("invalid-id")` 抛出 404 错误
- **Integration**: 后台任务运行中,前端能通过 API 查询到 RUNNING 状态
**Verification**: TypeScript 编译通过(`npm run build:frontend`API 调用返回正确数据。
---
### U5. Layer 3: 前端 WebSocket 重连后的任务恢复
**Goal**: 前端 WebSocket 重连后,检查当前对话是否有未完成任务,恢复事件流或拉取已完成的结果。
**Requirements**: R5, R6
**Dependencies**: U4
**Files**:
- `src/agentkit/server/frontend/src/stores/chat.ts` — 添加重连恢复逻辑
- `src/agentkit/server/frontend/src/api/types.ts` — 扩展 WsClientMessage 类型
**Approach**:
1. **扩展 WebSocket 协议**:添加 `resume` 消息类型,前端重连后发送 `{type: "resume", task_id: "..."}` 订阅已有后台任务的事件流。
2. **后端处理 resume**:在 portal.py 的 WebSocket 路径中,处理 `resume` 消息类型——通过 task_id 订阅 EventQueue 事件流,转发给前端。
3. **前端重连恢复流程**(在 `connectWebSocket``onopen` 中):
- 检查 `currentConversationId` 是否有值
- 调用 `listTasks("running")` 查找运行中任务
- 匹配 `metadata.conversation_id === currentConversationId` 的任务
- 如果找到运行中任务:发送 `resume` 消息,设置 `isLoading = true`
- 如果无运行中任务:调用 `selectConversation(currentConversationId)` 重新加载消息(包含已完成的结果)
4. **后端 resume 处理**:接收到 `resume` 消息后,通过 `task_id` 订阅 EventQueue转发事件直到 `task.completed``task.failed`
**Technical design**directional guidance:
```typescript
// 前端重连恢复(伪代码)
socket.onopen = async () => {
isWsConnected.value = true
startHeartbeat()
// 重连恢复逻辑
if (currentConversationId.value) {
await recoverTask(currentConversationId.value)
}
}
async function recoverTask(convId: string) {
const tasks = await apiClient.listTasks('running')
const runningTask = tasks.find(
t => t.metadata?.conversation_id === convId
)
if (runningTask) {
// 恢复进行中任务的事件流
isLoading.value = true
ws.value.send(JSON.stringify({
type: 'resume',
task_id: runningTask.task_id
}))
} else {
// 无运行中任务,重新加载对话消息
await selectConversation(convId)
}
}
```
**Patterns to follow**:
- 现有 `connectWebSocket``onopen` / `onclose` 模式chat.ts:209-259
- `selectConversation` 的消息加载模式chat.ts:55-74
- `handleWsMessage` 的事件处理模式chat.ts:270-528
**Test scenarios**:
- **Happy path - 有运行中任务**: 重连后发现有运行中任务 → 发送 resume → 收到后续事件 → 任务完成显示结果
- **Happy path - 无运行中任务**: 重连后无运行中任务 → 重新加载对话消息 → 显示已完成的结果
- **任务在重连前已完成**: 重连时任务已 COMPLETED → listTasks("running") 返回空 → 重新加载对话消息 → 结果显示
- **多个运行中任务**: 有多个对话的运行中任务 → 只恢复当前对话的任务
- **resume 后任务立即完成**: resume 后立即收到 task.completed 事件 → 正确显示结果
- **Integration**: 刷新页面 → 重连 → 恢复任务 → 最终结果正确显示
**Verification**: 启动复杂任务 → 刷新页面 → 验证任务继续运行 → 验证结果最终显示。
---
## Risks & Dependencies
### Risks
1. **EventQueue 订阅过滤改造风险**:当前 `subscribe()` 无过滤,添加 task_id 过滤可能影响现有订阅者。缓解:使用可选参数 `subscribe(task_id=None)`,默认行为不变。
2. **后台任务泄漏风险**WebSocket 断开后后台任务继续运行,如果任务本身卡住(如 LLM 超时任务会一直占用资源。缓解ReAct 已有 `timeout_seconds` 配置,后台任务同样受此约束。
3. **并发任务事件混淆风险**多个对话同时执行任务EventQueue 事件可能混淆。缓解:每个任务有唯一 task_id订阅时按 task_id 过滤。
4. **前端重连时序风险**:重连后查询任务状态时,任务可能刚好从 RUNNING 变为 COMPLETED。缓解先查 running如果空则重新加载对话消息会包含已完成结果
### Dependencies
- U1 → U2 → U3 → U4 → U5顺序依赖
- EventQueue 已在 app.state 可用(无需新建)
- TaskStore 已在 app.state 可用(无需新建)
## System-Wide Impact
- **后端**portal.py WebSocket 路径重大重构,影响所有 WebSocket 聊天用户
- **前端**chat store 的 WebSocket 连接逻辑增强,影响所有聊天页面用户
- **API**:新增 `resume` WebSocket 消息类型,无 REST API 变更
- **兼容性**DIRECT_CHAT 路径和 REST API 路径不受影响