fischer-agentkit/docs/plans/2026-06-17-002-fix-ws-task-...

24 KiB
Raw Permalink Blame History

title status created type origin
fix: WebSocket 断开后任务结果丢失与断线恢复 completed 2026-06-17 fix in-session investigation

fix: WebSocket 断开后任务结果丢失与断线恢复

Summary

当用户在复杂任务执行过程中刷新页面时WebSocket 断开导致 ReAct 任务中断、已收集的输出丢失、且无法恢复。本计划通过三层防御彻底解决Layer 1 确保部分结果持久化Layer 2 将任务后台化解耦 WebSocket 生命周期Layer 3 前端断线后恢复进行中任务。

Problem Frame

症状:用户布置复杂任务后刷新界面,操作停止且返回为空。

根因(经代码调查确认):

  1. 结果丢失portal.py 的 ReAct 流式路径中assistant 回复保存在 async for 循环之后portal.py:1064WebSocket 断开时该行永远不执行,collected_output 全部丢失。
  2. 任务中断ReAct 任务通过 async for event in react_engine.execute_stream(...) 直接在 WebSocket 协程中执行,未使用 BackgroundRunner 后台化WebSocket 断开 = 任务取消。
  3. 无状态追踪portal WebSocket 路径不使用 TaskStoretask_id 仅用于 EventQueue 事件发射,任务状态无法查询。
  4. 无恢复机制:前端 WebSocket 重连后3 秒自动重连),不检查是否有未完成任务,不恢复进行中的任务状态。

Requirements

  • R1: WebSocket 断开时,已收集的 ReAct 输出必须持久化到 conversation store
  • R2: ReAct 任务必须后台执行,与 WebSocket 连接生命周期解耦
  • R3: 每个任务必须在 TaskStore 中注册,状态可查询
  • R4: 后台任务的事件必须通过 EventQueue 分发WebSocket 可订阅
  • R5: 前端 WebSocket 重连后,必须检查当前对话是否有未完成任务并恢复
  • R6: 已完成的任务结果必须能从 conversation store 恢复显示
  • R7: 不破坏现有 DIRECT_CHAT 路径和 REST API 路径

Key Technical Decisions

KTD1: 复用 EventQueue 作为任务事件总线

决策:使用现有的 EventQueuecore/event_queue.py)分发后台任务事件,而非新建 EventBus 模块。

理由EventQueue 已具备所需能力——多订阅者广播(行 161-175、缓冲回放deque 100行 153、原子订阅行 193-202、哨兵关闭模式行 230-243。它已被 portal.py 的 _emit_event_safe 使用,且文件头注释明确说明设计目标包括 Portal WebSocket 订阅。

KTD2: 任务后台化使用 asyncio.create_task + EventQueue 订阅

决策:在 portal.py 中,将 ReAct 执行包装为 asyncio.create_taskWebSocket 协程通过 event_queue.subscribe() 订阅事件流。任务在后台独立运行WebSocket 仅转发事件给前端。

理由BackgroundRunnerserver/runner.py)虽然存在,但它调用 agent.execute()(非流式),不支持事件流。改造 BackgroundRunner 支持流式会侵入核心执行路径。直接在 portal.py 中用 asyncio.create_task 更聚焦、风险更低。

KTD3: 任务状态通过 TaskStore 追踪

决策:在 portal.py WebSocket 路径中,为每个用户消息创建 TaskStore 记录状态随任务进展更新PENDING → RUNNING → COMPLETED/FAILED

理由TaskStore 已在 app.state.task_store 可用,/api/v1/tasks/{task_id} 端点已存在。前端可通过现有 REST API 查询任务状态,无需新建端点。

KTD4: 前端通过 conversation_id 关联任务

决策:在 TaskStore 记录的 metadata 中存储 conversation_id,前端重连后通过 GET /api/v1/tasks?status=running 查找当前对话的未完成任务。

理由:避免新建专门的"按 conversation 查任务"端点。前端遍历 running 任务,匹配 metadata 中的 conversation_id 即可。

High-Level Technical Design

┌──────────────────────────────────────────────────────────────────┐
│                     WebSocket 连接存活                            │
│  ┌────────────┐    subscribe(task_id)    ┌──────────────────┐   │
│  │  WebSocket │ ◄────────────────────── │   EventQueue     │   │
│  │  协程      │    转发事件给前端         │  (缓冲回放+广播)  │   │
│  └────────────┘                          └────────┬─────────┘   │
│                                                  │ emit         │
├──────────────────────────────────────────────────┼─────────────┤
│                     WebSocket 断开                │             │
│  ┌────────────┐                                 │             │
│  │  WebSocket │ ✗ 断开                          │             │
│  │  协程      │                                 │             │
│  └────────────┘                                 │             │
│                                                  ▼             │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │            后台任务 (asyncio.create_task)                │   │
│  │  ┌─────────────┐  ┌──────────────┐  ┌────────────────┐ │   │
│  │  │ ReAct 执行  │─►│ EventQueue   │  │ TaskStore      │ │   │
│  │  │ execute_    │  │ .emit(event) │  │ .update_status │ │   │
│  │  │ stream()    │  │              │  │                │ │   │
│  │  └─────────────┘  └──────────────┘  └────────────────┘ │   │
│  │         │                                            │   │
│  │         ▼                                            │   │
│  │  ┌────────────────────────────────────────────────┐  │   │
│  │  │ conversation_store.add_message(assistant, ...)  │  │   │
│  │  │ (无论 WebSocket 是否存活,结果都持久化)          │  │   │
│  │  └────────────────────────────────────────────────┘  │   │
│  └─────────────────────────────────────────────────────────┘   │
│                                                                │
│  前端重连后:                                                   │
│  1. GET /api/v1/tasks?status=running → 查找未完成任务          │
│  2. 匹配 metadata.conversation_id → 当前对话的任务              │
│  3. event_queue.subscribe(task_id) → 恢复事件流                │
│  4. 或 GET /conversations/{id} → 拉取已完成的结果              │
└──────────────────────────────────────────────────────────────────┘

Scope Boundaries

In Scope

  • portal.py WebSocket 路径的 ReAct 任务后台化
  • WebSocket 断开时的部分结果持久化
  • TaskStore 注册和状态更新
  • EventQueue 事件分发集成
  • 前端 WebSocket 重连后的任务恢复
  • 前端任务状态查询 API 客户端

Out of Scope

  • BackgroundRunner 改造支持流式(风险过高,单独处理)
  • REST API 路径(/portal/chat的改造REST 已是同步调用,无此问题)
  • SSE 路径(/portal/chat/stream)的改造
  • Expert Team 协作模式的后台化(单独处理)
  • WebSocket 重连退避策略优化(独立改进)

Deferred to Follow-Up Work

  • SubmissionQueue 接入(目前完全闲置,可后续用于任务队列化)
  • Redis 分布式任务恢复(当前 InMemoryTaskStore 足够,分布式场景后续处理)
  • 任务进度百分比反馈(当前 ReAct 无细粒度进度概念)

Implementation Units

U1. Layer 1: WebSocket 断开时持久化已收集的输出

Goal: 确保 WebSocket 在 ReAct 流式过程中断开时,已收集的 collected_output 保存到 conversation store。

Requirements: R1

Dependencies: 无

Files:

  • src/agentkit/server/routes/portal.py — 修改 ReAct 流式路径的异常处理
  • tests/unit/test_portal_ws_persistence.py — 新建测试

Approach:

在 portal.py 的 portal_websocket 函数中ReAct 流式路径(约 1014-1064 行)的异常处理需要增强:

  1. except Exception as e: 块(约 1049 行)中,在发送 error 之前,将 collected_output 保存到 _conversation_store。使用 _ensure_non_empty 处理空输出。
  2. 在外层 except WebSocketDisconnect: 块(约 1102 行)中,检查是否有未保存的 collected_output,如果有则保存。
  3. 在外层 except Exception as e: 块(约 1104 行)中,同样处理。
  4. 需要将 collected_outputconvtask_id 声明提升到外层 try 作用域,使 except 块可以访问。

Patterns to follow:

  • _ensure_non_empty() 函数portal.py:58-62处理空输出
  • _conversation_store.add_message() 已有调用模式portal.py:871, 952, 1064
  • _emit_event_safe() 的异常吞咽模式portal.py:65-95——保存操作失败不应阻断后续清理

Test scenarios:

  • Happy path: ReAct 正常完成,collected_output 有内容WebSocket 未断开 → 结果保存到 conversation store验证现有行为不被破坏
  • WebSocket 断开final_answer 前): ReAct 流式中 WebSocket 断开,collected_output 为空 → conversation store 不写入空消息(或写入 EMPTY_LLM_RESPONSE
  • WebSocket 断开final_answer 后): ReAct 已产出 final_answerWebSocket 在后续步骤断开,collected_output 有部分内容 → 部分内容保存到 conversation store
  • 异常路径: ReAct 执行抛出异常,collected_output 有部分内容 → 部分内容保存error 事件正常发射
  • Integration: 保存操作自身失败conversation_store 异常)→ 不阻断 error 事件发射和后续清理

Verification: 模拟 WebSocket 断开场景,检查 SQLite conversation store 中是否有 assistant 消息记录。


U2. Layer 2: ReAct 任务后台化与 EventQueue 事件分发

Goal: 将 ReAct 执行从 WebSocket 协程中解耦,后台执行并通过 EventQueue 分发事件。WebSocket 仅订阅事件流并转发给前端。

Requirements: R2, R4

Dependencies: U1

Files:

  • src/agentkit/server/routes/portal.py — 重构 ReAct 流式路径
  • src/agentkit/core/event_queue.py — 可能需要扩展(添加 task_id 过滤订阅)
  • tests/unit/test_portal_ws_background_task.py — 新建测试

Approach:

  1. 提取后台执行函数:创建 _execute_react_background() 协程接收所有必要参数messages, tools, model, agent_name, system_prompt, timeout_seconds, conv_id, task_id, event_queue, conversation_store。该函数

    • 执行 react_engine.execute_stream()
    • 每个事件通过 event_queue.emit() 分发
    • final_answer 事件时累积输出
    • 正常结束后保存到 conversation_store
    • 异常时保存部分输出并发射 error 事件
    • 无论成功失败,都发射 task.completedtask.failed 事件
  2. WebSocket 协程改为订阅者:在 portal.py 的 ReAct 路径中:

    • 创建 task_id 并注册到 TaskStorePENDING → RUNNING
    • asyncio.create_task(_execute_react_background(...)) 启动后台任务
    • async for event in event_queue.subscribe(task_id) 订阅事件
    • 将事件通过 websocket.send_json() 转发给前端
    • WebSocket 断开时,async for 循环退出,后台任务继续运行
  3. EventQueue 扩展:当前 subscribe() 返回所有事件。需要添加按 task_id 过滤的订阅能力,或在前端转发时过滤。优先选择在 EventQueue 中添加 subscribe(task_id=None) 参数过滤。

  4. DIRECT_CHAT 路径DIRECT_CHAT 是同步 LLM 调用(非流式),保持现有逻辑不变,但同样在 TaskStore 注册。

Technical designdirectional guidance:

# 后台执行函数(伪代码)
async def _execute_react_background(
    react_engine, messages, tools, model, agent_name,
    system_prompt, timeout_seconds,
    conv_id, task_id, event_queue, conversation_store
):
    collected_output = []
    try:
        async for event in react_engine.execute_stream(...):
            if event.event_type == "final_answer":
                collected_output.append(event.data.get("output", ""))
            await event_queue.emit(Event.create(
                event_type=event.event_type,
                task_id=task_id,
                session_id=conv_id,
                data={"step": event.step, "data": event.data, ...}
            ))
        # 正常完成
        response_text = _ensure_non_empty("".join(collected_output))
        await conversation_store.add_message(conv_id, "assistant", response_text)
        await event_queue.emit(Event.create(
            event_type="task.completed", task_id=task_id, ...
        ))
    except Exception as e:
        # 保存部分输出
        if collected_output:
            partial = _ensure_non_empty("".join(collected_output))
            await conversation_store.add_message(conv_id, "assistant", partial)
        await event_queue.emit(Event.create(
            event_type="task.failed", task_id=task_id,
            data={"error": str(e)}
        ))

# WebSocket 协程(伪代码)
task_id = str(uuid.uuid4())
task_store.create(task_id, agent_name, {"conversation_id": conv.id})
asyncio.create_task(_execute_react_background(...))

# WebSocket 订阅事件并转发
async for event in event_queue.subscribe(task_id=task_id):
    if event.event_type in ("task.completed", "task.failed"):
        await websocket.send_json({"type": "result", ...})
        break
    await websocket.send_json({"type": "step", "data": ...})
    # WebSocket 断开时 async for 退出,后台任务继续

Patterns to follow:

  • EventQueue 的 subscribe() + 哨兵关闭模式event_queue.py:193-243
  • _emit_event_safe() 的异常吞咽模式portal.py:65-95
  • BackgroundRunner 的 asyncio.create_task + _on_done 回调模式runner.py:55-73

Test scenarios:

  • Happy path: 后台任务正常完成,事件通过 EventQueue 分发WebSocket 收到所有事件和最终结果
  • WebSocket 断开: 后台任务继续运行,结果保存到 conversation storeTaskStore 状态更新为 COMPLETED
  • 后台任务异常: ReAct 执行抛异常部分输出保存TaskStore 状态为 FAILEDerror 事件发射
  • EventQueue 订阅过滤: 多个并发任务,每个 WebSocket 只收到自己 task_id 的事件
  • Integration: 后台任务完成后conversation store 有 assistant 消息TaskStore 有 COMPLETED 记录

Verification: 启动后台任务后立即断开 WebSocket等待任务完成后检查 conversation store 和 TaskStore。


U3. Layer 2: TaskStore 注册与状态追踪

Goal: 在 portal WebSocket 路径中为每个用户消息创建 TaskStore 记录,状态随任务进展更新。

Requirements: R3

Dependencies: U2

Files:

  • src/agentkit/server/routes/portal.py — 集成 TaskStore
  • tests/unit/test_portal_ws_task_tracking.py — 新建测试

Approach:

  1. 在 portal.py 的 WebSocket 路径中,获取 task_store from websocket.app.state.task_store
  2. 在用户消息处理开始时,调用 task_store.create(task_id, agent_name, {"conversation_id": conv.id, "message": message_text})
  3. 在后台任务执行前,更新状态为 RUNNINGstarted_at=now)。
  4. 在后台任务完成时,更新状态为 COMPLETEDoutput_data={"output": response_text}, completed_at=now, progress=1.0)。
  5. 在后台任务失败时,更新状态为 FAILEDerror_message=str(e), completed_at=now)。
  6. DIRECT_CHAT 路径同样注册 TaskStore同步调用状态直接从 PENDING → COMPLETED

Patterns to follow:

  • BackgroundRunner._run_task 的状态更新模式runner.py:89-92, 141-147, 153-157
  • TaskStore.create/update_status 的调用签名task_store.py:127-163

Test scenarios:

  • Happy path: 用户发送消息 → TaskStore 创建 PENDING 记录 → 任务执行中状态为 RUNNING → 完成后状态为 COMPLETEDoutput_data 有内容
  • 任务失败: ReAct 执行异常 → TaskStore 状态为 FAILEDerror_message 有内容
  • WebSocket 断开后查询: WebSocket 断开,后台任务继续 → 通过 GET /api/v1/tasks/{task_id} 能查到 RUNNING 状态 → 任务完成后查到 COMPLETED
  • metadata 包含 conversation_id: TaskStore 记录的 metadata 中有 conversation_id 字段
  • Integration: 前端通过 GET /api/v1/tasks?status=running 能找到当前对话的运行中任务

Verification: 通过 REST API GET /api/v1/tasks/{task_id} 查询任务状态,验证状态流转正确。


U4. Layer 3: 前端任务状态查询 API 客户端

Goal: 在前端 API 客户端中添加任务状态查询方法,支持按 status 过滤和按 task_id 查询。

Requirements: R5, R6

Dependencies: U3

Files:

  • src/agentkit/server/frontend/src/api/client.ts — 添加任务 API 方法
  • src/agentkit/server/frontend/src/api/types.ts — 添加任务类型定义

Approach:

  1. types.ts 中添加 ITaskRecord 接口,对应后端 TaskRecord.to_dict() 的输出格式。
  2. client.ts 中添加方法:
    • getTask(taskId: string): Promise<ITaskRecord> — GET /api/v1/tasks/{taskId}注意tasks 路由前缀是 /api/v1/tasks,不是 /api/v1/portal
    • listTasks(status?: string): Promise<ITaskRecord[]> — GET /api/v1/tasks?status=running
  3. 由于 tasks 路由前缀不同(/api/v1/tasks vs /api/v1/portal),需要创建一个新的 ApiClient 实例或调整 BaseApiClient 的 baseUrl。

Patterns to follow:

  • 现有 getConversations() / getConversation(id) 的方法签名模式client.ts:30-37
  • IConversation 接口定义模式types.ts:51-57

Test scenarios:

  • Happy path: 调用 getTask(taskId) 返回正确的任务记录
  • 按状态过滤: 调用 listTasks("running") 只返回运行中任务
  • 任务不存在: 调用 getTask("invalid-id") 抛出 404 错误
  • Integration: 后台任务运行中,前端能通过 API 查询到 RUNNING 状态

Verification: TypeScript 编译通过(npm run build:frontendAPI 调用返回正确数据。


U5. Layer 3: 前端 WebSocket 重连后的任务恢复

Goal: 前端 WebSocket 重连后,检查当前对话是否有未完成任务,恢复事件流或拉取已完成的结果。

Requirements: R5, R6

Dependencies: U4

Files:

  • src/agentkit/server/frontend/src/stores/chat.ts — 添加重连恢复逻辑
  • src/agentkit/server/frontend/src/api/types.ts — 扩展 WsClientMessage 类型

Approach:

  1. 扩展 WebSocket 协议:添加 resume 消息类型,前端重连后发送 {type: "resume", task_id: "..."} 订阅已有后台任务的事件流。
  2. 后端处理 resume:在 portal.py 的 WebSocket 路径中,处理 resume 消息类型——通过 task_id 订阅 EventQueue 事件流,转发给前端。
  3. 前端重连恢复流程(在 connectWebSocketonopen 中):
    • 检查 currentConversationId 是否有值
    • 调用 listTasks("running") 查找运行中任务
    • 匹配 metadata.conversation_id === currentConversationId 的任务
    • 如果找到运行中任务:发送 resume 消息,设置 isLoading = true
    • 如果无运行中任务:调用 selectConversation(currentConversationId) 重新加载消息(包含已完成的结果)
  4. 后端 resume 处理:接收到 resume 消息后,通过 task_id 订阅 EventQueue转发事件直到 task.completedtask.failed

Technical designdirectional guidance:

// 前端重连恢复(伪代码)
socket.onopen = async () => {
  isWsConnected.value = true
  startHeartbeat()

  // 重连恢复逻辑
  if (currentConversationId.value) {
    await recoverTask(currentConversationId.value)
  }
}

async function recoverTask(convId: string) {
  const tasks = await apiClient.listTasks('running')
  const runningTask = tasks.find(
    t => t.metadata?.conversation_id === convId
  )

  if (runningTask) {
    // 恢复进行中任务的事件流
    isLoading.value = true
    ws.value.send(JSON.stringify({
      type: 'resume',
      task_id: runningTask.task_id
    }))
  } else {
    // 无运行中任务,重新加载对话消息
    await selectConversation(convId)
  }
}

Patterns to follow:

  • 现有 connectWebSocketonopen / onclose 模式chat.ts:209-259
  • selectConversation 的消息加载模式chat.ts:55-74
  • handleWsMessage 的事件处理模式chat.ts:270-528

Test scenarios:

  • Happy path - 有运行中任务: 重连后发现有运行中任务 → 发送 resume → 收到后续事件 → 任务完成显示结果
  • Happy path - 无运行中任务: 重连后无运行中任务 → 重新加载对话消息 → 显示已完成的结果
  • 任务在重连前已完成: 重连时任务已 COMPLETED → listTasks("running") 返回空 → 重新加载对话消息 → 结果显示
  • 多个运行中任务: 有多个对话的运行中任务 → 只恢复当前对话的任务
  • resume 后任务立即完成: resume 后立即收到 task.completed 事件 → 正确显示结果
  • Integration: 刷新页面 → 重连 → 恢复任务 → 最终结果正确显示

Verification: 启动复杂任务 → 刷新页面 → 验证任务继续运行 → 验证结果最终显示。


Risks & Dependencies

Risks

  1. EventQueue 订阅过滤改造风险:当前 subscribe() 无过滤,添加 task_id 过滤可能影响现有订阅者。缓解:使用可选参数 subscribe(task_id=None),默认行为不变。
  2. 后台任务泄漏风险WebSocket 断开后后台任务继续运行,如果任务本身卡住(如 LLM 超时任务会一直占用资源。缓解ReAct 已有 timeout_seconds 配置,后台任务同样受此约束。
  3. 并发任务事件混淆风险多个对话同时执行任务EventQueue 事件可能混淆。缓解:每个任务有唯一 task_id订阅时按 task_id 过滤。
  4. 前端重连时序风险:重连后查询任务状态时,任务可能刚好从 RUNNING 变为 COMPLETED。缓解先查 running如果空则重新加载对话消息会包含已完成结果

Dependencies

  • U1 → U2 → U3 → U4 → U5顺序依赖
  • EventQueue 已在 app.state 可用(无需新建)
  • TaskStore 已在 app.state 可用(无需新建)

System-Wide Impact

  • 后端portal.py WebSocket 路径重大重构,影响所有 WebSocket 聊天用户
  • 前端chat store 的 WebSocket 连接逻辑增强,影响所有聊天页面用户
  • API:新增 resume WebSocket 消息类型,无 REST API 变更
  • 兼容性DIRECT_CHAT 路径和 REST API 路径不受影响