434 lines
24 KiB
Markdown
434 lines
24 KiB
Markdown
---
|
||
title: "fix: WebSocket 断开后任务结果丢失与断线恢复"
|
||
status: completed
|
||
created: 2026-06-17
|
||
type: fix
|
||
origin: in-session investigation
|
||
---
|
||
|
||
# fix: WebSocket 断开后任务结果丢失与断线恢复
|
||
|
||
## Summary
|
||
|
||
当用户在复杂任务执行过程中刷新页面时,WebSocket 断开导致 ReAct 任务中断、已收集的输出丢失、且无法恢复。本计划通过三层防御彻底解决:Layer 1 确保部分结果持久化,Layer 2 将任务后台化解耦 WebSocket 生命周期,Layer 3 前端断线后恢复进行中任务。
|
||
|
||
## Problem Frame
|
||
|
||
**症状**:用户布置复杂任务后刷新界面,操作停止且返回为空。
|
||
|
||
**根因**(经代码调查确认):
|
||
|
||
1. **结果丢失**:portal.py 的 ReAct 流式路径中,assistant 回复保存在 `async for` 循环之后(portal.py:1064),WebSocket 断开时该行永远不执行,`collected_output` 全部丢失。
|
||
2. **任务中断**:ReAct 任务通过 `async for event in react_engine.execute_stream(...)` 直接在 WebSocket 协程中执行,未使用 BackgroundRunner 后台化,WebSocket 断开 = 任务取消。
|
||
3. **无状态追踪**:portal WebSocket 路径不使用 TaskStore,`task_id` 仅用于 EventQueue 事件发射,任务状态无法查询。
|
||
4. **无恢复机制**:前端 WebSocket 重连后(3 秒自动重连),不检查是否有未完成任务,不恢复进行中的任务状态。
|
||
|
||
## Requirements
|
||
|
||
- **R1**: WebSocket 断开时,已收集的 ReAct 输出必须持久化到 conversation store
|
||
- **R2**: ReAct 任务必须后台执行,与 WebSocket 连接生命周期解耦
|
||
- **R3**: 每个任务必须在 TaskStore 中注册,状态可查询
|
||
- **R4**: 后台任务的事件必须通过 EventQueue 分发,WebSocket 可订阅
|
||
- **R5**: 前端 WebSocket 重连后,必须检查当前对话是否有未完成任务并恢复
|
||
- **R6**: 已完成的任务结果必须能从 conversation store 恢复显示
|
||
- **R7**: 不破坏现有 DIRECT_CHAT 路径和 REST API 路径
|
||
|
||
## Key Technical Decisions
|
||
|
||
### KTD1: 复用 EventQueue 作为任务事件总线
|
||
|
||
**决策**:使用现有的 `EventQueue`(`core/event_queue.py`)分发后台任务事件,而非新建 EventBus 模块。
|
||
|
||
**理由**:EventQueue 已具备所需能力——多订阅者广播(行 161-175)、缓冲回放(deque 100,行 153)、原子订阅(行 193-202)、哨兵关闭模式(行 230-243)。它已被 portal.py 的 `_emit_event_safe` 使用,且文件头注释明确说明设计目标包括 Portal WebSocket 订阅。
|
||
|
||
### KTD2: 任务后台化使用 asyncio.create_task + EventQueue 订阅
|
||
|
||
**决策**:在 portal.py 中,将 ReAct 执行包装为 `asyncio.create_task`,WebSocket 协程通过 `event_queue.subscribe()` 订阅事件流。任务在后台独立运行,WebSocket 仅转发事件给前端。
|
||
|
||
**理由**:BackgroundRunner(`server/runner.py`)虽然存在,但它调用 `agent.execute()`(非流式),不支持事件流。改造 BackgroundRunner 支持流式会侵入核心执行路径。直接在 portal.py 中用 `asyncio.create_task` 更聚焦、风险更低。
|
||
|
||
### KTD3: 任务状态通过 TaskStore 追踪
|
||
|
||
**决策**:在 portal.py WebSocket 路径中,为每个用户消息创建 TaskStore 记录,状态随任务进展更新(PENDING → RUNNING → COMPLETED/FAILED)。
|
||
|
||
**理由**:TaskStore 已在 `app.state.task_store` 可用,`/api/v1/tasks/{task_id}` 端点已存在。前端可通过现有 REST API 查询任务状态,无需新建端点。
|
||
|
||
### KTD4: 前端通过 conversation_id 关联任务
|
||
|
||
**决策**:在 TaskStore 记录的 `metadata` 中存储 `conversation_id`,前端重连后通过 `GET /api/v1/tasks?status=running` 查找当前对话的未完成任务。
|
||
|
||
**理由**:避免新建专门的"按 conversation 查任务"端点。前端遍历 running 任务,匹配 metadata 中的 conversation_id 即可。
|
||
|
||
## High-Level Technical Design
|
||
|
||
```
|
||
┌──────────────────────────────────────────────────────────────────┐
|
||
│ WebSocket 连接存活 │
|
||
│ ┌────────────┐ subscribe(task_id) ┌──────────────────┐ │
|
||
│ │ WebSocket │ ◄────────────────────── │ EventQueue │ │
|
||
│ │ 协程 │ 转发事件给前端 │ (缓冲回放+广播) │ │
|
||
│ └────────────┘ └────────┬─────────┘ │
|
||
│ │ emit │
|
||
├──────────────────────────────────────────────────┼─────────────┤
|
||
│ WebSocket 断开 │ │
|
||
│ ┌────────────┐ │ │
|
||
│ │ WebSocket │ ✗ 断开 │ │
|
||
│ │ 协程 │ │ │
|
||
│ └────────────┘ │ │
|
||
│ ▼ │
|
||
│ ┌─────────────────────────────────────────────────────────┐ │
|
||
│ │ 后台任务 (asyncio.create_task) │ │
|
||
│ │ ┌─────────────┐ ┌──────────────┐ ┌────────────────┐ │ │
|
||
│ │ │ ReAct 执行 │─►│ EventQueue │ │ TaskStore │ │ │
|
||
│ │ │ execute_ │ │ .emit(event) │ │ .update_status │ │ │
|
||
│ │ │ stream() │ │ │ │ │ │ │
|
||
│ │ └─────────────┘ └──────────────┘ └────────────────┘ │ │
|
||
│ │ │ │ │
|
||
│ │ ▼ │ │
|
||
│ │ ┌────────────────────────────────────────────────┐ │ │
|
||
│ │ │ conversation_store.add_message(assistant, ...) │ │ │
|
||
│ │ │ (无论 WebSocket 是否存活,结果都持久化) │ │ │
|
||
│ │ └────────────────────────────────────────────────┘ │ │
|
||
│ └─────────────────────────────────────────────────────────┘ │
|
||
│ │
|
||
│ 前端重连后: │
|
||
│ 1. GET /api/v1/tasks?status=running → 查找未完成任务 │
|
||
│ 2. 匹配 metadata.conversation_id → 当前对话的任务 │
|
||
│ 3. event_queue.subscribe(task_id) → 恢复事件流 │
|
||
│ 4. 或 GET /conversations/{id} → 拉取已完成的结果 │
|
||
└──────────────────────────────────────────────────────────────────┘
|
||
```
|
||
|
||
## Scope Boundaries
|
||
|
||
### In Scope
|
||
|
||
- portal.py WebSocket 路径的 ReAct 任务后台化
|
||
- WebSocket 断开时的部分结果持久化
|
||
- TaskStore 注册和状态更新
|
||
- EventQueue 事件分发集成
|
||
- 前端 WebSocket 重连后的任务恢复
|
||
- 前端任务状态查询 API 客户端
|
||
|
||
### Out of Scope
|
||
|
||
- BackgroundRunner 改造支持流式(风险过高,单独处理)
|
||
- REST API 路径(`/portal/chat`)的改造(REST 已是同步调用,无此问题)
|
||
- SSE 路径(`/portal/chat/stream`)的改造
|
||
- Expert Team 协作模式的后台化(单独处理)
|
||
- WebSocket 重连退避策略优化(独立改进)
|
||
|
||
### Deferred to Follow-Up Work
|
||
|
||
- SubmissionQueue 接入(目前完全闲置,可后续用于任务队列化)
|
||
- Redis 分布式任务恢复(当前 InMemoryTaskStore 足够,分布式场景后续处理)
|
||
- 任务进度百分比反馈(当前 ReAct 无细粒度进度概念)
|
||
|
||
---
|
||
|
||
## Implementation Units
|
||
|
||
### U1. Layer 1: WebSocket 断开时持久化已收集的输出
|
||
|
||
**Goal**: 确保 WebSocket 在 ReAct 流式过程中断开时,已收集的 `collected_output` 保存到 conversation store。
|
||
|
||
**Requirements**: R1
|
||
|
||
**Dependencies**: 无
|
||
|
||
**Files**:
|
||
- `src/agentkit/server/routes/portal.py` — 修改 ReAct 流式路径的异常处理
|
||
- `tests/unit/test_portal_ws_persistence.py` — 新建测试
|
||
|
||
**Approach**:
|
||
|
||
在 portal.py 的 `portal_websocket` 函数中,ReAct 流式路径(约 1014-1064 行)的异常处理需要增强:
|
||
|
||
1. 在 `except Exception as e:` 块(约 1049 行)中,在发送 error 之前,将 `collected_output` 保存到 `_conversation_store`。使用 `_ensure_non_empty` 处理空输出。
|
||
2. 在外层 `except WebSocketDisconnect:` 块(约 1102 行)中,检查是否有未保存的 `collected_output`,如果有则保存。
|
||
3. 在外层 `except Exception as e:` 块(约 1104 行)中,同样处理。
|
||
4. 需要将 `collected_output`、`conv`、`task_id` 声明提升到外层 try 作用域,使 except 块可以访问。
|
||
|
||
**Patterns to follow**:
|
||
- `_ensure_non_empty()` 函数(portal.py:58-62)处理空输出
|
||
- `_conversation_store.add_message()` 已有调用模式(portal.py:871, 952, 1064)
|
||
- `_emit_event_safe()` 的异常吞咽模式(portal.py:65-95)——保存操作失败不应阻断后续清理
|
||
|
||
**Test scenarios**:
|
||
- **Happy path**: ReAct 正常完成,`collected_output` 有内容,WebSocket 未断开 → 结果保存到 conversation store(验证现有行为不被破坏)
|
||
- **WebSocket 断开(final_answer 前)**: ReAct 流式中 WebSocket 断开,`collected_output` 为空 → conversation store 不写入空消息(或写入 EMPTY_LLM_RESPONSE)
|
||
- **WebSocket 断开(final_answer 后)**: ReAct 已产出 final_answer,WebSocket 在后续步骤断开,`collected_output` 有部分内容 → 部分内容保存到 conversation store
|
||
- **异常路径**: ReAct 执行抛出异常,`collected_output` 有部分内容 → 部分内容保存,error 事件正常发射
|
||
- **Integration**: 保存操作自身失败(conversation_store 异常)→ 不阻断 error 事件发射和后续清理
|
||
|
||
**Verification**: 模拟 WebSocket 断开场景,检查 SQLite conversation store 中是否有 assistant 消息记录。
|
||
|
||
---
|
||
|
||
### U2. Layer 2: ReAct 任务后台化与 EventQueue 事件分发
|
||
|
||
**Goal**: 将 ReAct 执行从 WebSocket 协程中解耦,后台执行并通过 EventQueue 分发事件。WebSocket 仅订阅事件流并转发给前端。
|
||
|
||
**Requirements**: R2, R4
|
||
|
||
**Dependencies**: U1
|
||
|
||
**Files**:
|
||
- `src/agentkit/server/routes/portal.py` — 重构 ReAct 流式路径
|
||
- `src/agentkit/core/event_queue.py` — 可能需要扩展(添加 task_id 过滤订阅)
|
||
- `tests/unit/test_portal_ws_background_task.py` — 新建测试
|
||
|
||
**Approach**:
|
||
|
||
1. **提取后台执行函数**:创建 `_execute_react_background()` 协程,接收所有必要参数(messages, tools, model, agent_name, system_prompt, timeout_seconds, conv_id, task_id, event_queue, conversation_store)。该函数:
|
||
- 执行 `react_engine.execute_stream()`
|
||
- 每个事件通过 `event_queue.emit()` 分发
|
||
- `final_answer` 事件时累积输出
|
||
- 正常结束后保存到 `conversation_store`
|
||
- 异常时保存部分输出并发射 error 事件
|
||
- 无论成功失败,都发射 `task.completed` 或 `task.failed` 事件
|
||
|
||
2. **WebSocket 协程改为订阅者**:在 portal.py 的 ReAct 路径中:
|
||
- 创建 task_id 并注册到 TaskStore(PENDING → RUNNING)
|
||
- `asyncio.create_task(_execute_react_background(...))` 启动后台任务
|
||
- `async for event in event_queue.subscribe(task_id)` 订阅事件
|
||
- 将事件通过 `websocket.send_json()` 转发给前端
|
||
- WebSocket 断开时,`async for` 循环退出,后台任务继续运行
|
||
|
||
3. **EventQueue 扩展**:当前 `subscribe()` 返回所有事件。需要添加按 `task_id` 过滤的订阅能力,或在前端转发时过滤。优先选择在 EventQueue 中添加 `subscribe(task_id=None)` 参数过滤。
|
||
|
||
4. **DIRECT_CHAT 路径**:DIRECT_CHAT 是同步 LLM 调用(非流式),保持现有逻辑不变,但同样在 TaskStore 注册。
|
||
|
||
**Technical design**(directional guidance):
|
||
|
||
```python
|
||
# 后台执行函数(伪代码)
|
||
async def _execute_react_background(
|
||
react_engine, messages, tools, model, agent_name,
|
||
system_prompt, timeout_seconds,
|
||
conv_id, task_id, event_queue, conversation_store
|
||
):
|
||
collected_output = []
|
||
try:
|
||
async for event in react_engine.execute_stream(...):
|
||
if event.event_type == "final_answer":
|
||
collected_output.append(event.data.get("output", ""))
|
||
await event_queue.emit(Event.create(
|
||
event_type=event.event_type,
|
||
task_id=task_id,
|
||
session_id=conv_id,
|
||
data={"step": event.step, "data": event.data, ...}
|
||
))
|
||
# 正常完成
|
||
response_text = _ensure_non_empty("".join(collected_output))
|
||
await conversation_store.add_message(conv_id, "assistant", response_text)
|
||
await event_queue.emit(Event.create(
|
||
event_type="task.completed", task_id=task_id, ...
|
||
))
|
||
except Exception as e:
|
||
# 保存部分输出
|
||
if collected_output:
|
||
partial = _ensure_non_empty("".join(collected_output))
|
||
await conversation_store.add_message(conv_id, "assistant", partial)
|
||
await event_queue.emit(Event.create(
|
||
event_type="task.failed", task_id=task_id,
|
||
data={"error": str(e)}
|
||
))
|
||
|
||
# WebSocket 协程(伪代码)
|
||
task_id = str(uuid.uuid4())
|
||
task_store.create(task_id, agent_name, {"conversation_id": conv.id})
|
||
asyncio.create_task(_execute_react_background(...))
|
||
|
||
# WebSocket 订阅事件并转发
|
||
async for event in event_queue.subscribe(task_id=task_id):
|
||
if event.event_type in ("task.completed", "task.failed"):
|
||
await websocket.send_json({"type": "result", ...})
|
||
break
|
||
await websocket.send_json({"type": "step", "data": ...})
|
||
# WebSocket 断开时 async for 退出,后台任务继续
|
||
```
|
||
|
||
**Patterns to follow**:
|
||
- EventQueue 的 `subscribe()` + 哨兵关闭模式(event_queue.py:193-243)
|
||
- `_emit_event_safe()` 的异常吞咽模式(portal.py:65-95)
|
||
- BackgroundRunner 的 `asyncio.create_task` + `_on_done` 回调模式(runner.py:55-73)
|
||
|
||
**Test scenarios**:
|
||
- **Happy path**: 后台任务正常完成,事件通过 EventQueue 分发,WebSocket 收到所有事件和最终结果
|
||
- **WebSocket 断开**: 后台任务继续运行,结果保存到 conversation store,TaskStore 状态更新为 COMPLETED
|
||
- **后台任务异常**: ReAct 执行抛异常,部分输出保存,TaskStore 状态为 FAILED,error 事件发射
|
||
- **EventQueue 订阅过滤**: 多个并发任务,每个 WebSocket 只收到自己 task_id 的事件
|
||
- **Integration**: 后台任务完成后,conversation store 有 assistant 消息,TaskStore 有 COMPLETED 记录
|
||
|
||
**Verification**: 启动后台任务后立即断开 WebSocket,等待任务完成后检查 conversation store 和 TaskStore。
|
||
|
||
---
|
||
|
||
### U3. Layer 2: TaskStore 注册与状态追踪
|
||
|
||
**Goal**: 在 portal WebSocket 路径中为每个用户消息创建 TaskStore 记录,状态随任务进展更新。
|
||
|
||
**Requirements**: R3
|
||
|
||
**Dependencies**: U2
|
||
|
||
**Files**:
|
||
- `src/agentkit/server/routes/portal.py` — 集成 TaskStore
|
||
- `tests/unit/test_portal_ws_task_tracking.py` — 新建测试
|
||
|
||
**Approach**:
|
||
|
||
1. 在 portal.py 的 WebSocket 路径中,获取 `task_store` from `websocket.app.state.task_store`。
|
||
2. 在用户消息处理开始时,调用 `task_store.create(task_id, agent_name, {"conversation_id": conv.id, "message": message_text})`。
|
||
3. 在后台任务执行前,更新状态为 `RUNNING`(`started_at=now`)。
|
||
4. 在后台任务完成时,更新状态为 `COMPLETED`(`output_data={"output": response_text}, completed_at=now, progress=1.0`)。
|
||
5. 在后台任务失败时,更新状态为 `FAILED`(`error_message=str(e), completed_at=now`)。
|
||
6. DIRECT_CHAT 路径同样注册 TaskStore(同步调用,状态直接从 PENDING → COMPLETED)。
|
||
|
||
**Patterns to follow**:
|
||
- BackgroundRunner._run_task 的状态更新模式(runner.py:89-92, 141-147, 153-157)
|
||
- TaskStore.create/update_status 的调用签名(task_store.py:127-163)
|
||
|
||
**Test scenarios**:
|
||
- **Happy path**: 用户发送消息 → TaskStore 创建 PENDING 记录 → 任务执行中状态为 RUNNING → 完成后状态为 COMPLETED,output_data 有内容
|
||
- **任务失败**: ReAct 执行异常 → TaskStore 状态为 FAILED,error_message 有内容
|
||
- **WebSocket 断开后查询**: WebSocket 断开,后台任务继续 → 通过 `GET /api/v1/tasks/{task_id}` 能查到 RUNNING 状态 → 任务完成后查到 COMPLETED
|
||
- **metadata 包含 conversation_id**: TaskStore 记录的 metadata 中有 conversation_id 字段
|
||
- **Integration**: 前端通过 `GET /api/v1/tasks?status=running` 能找到当前对话的运行中任务
|
||
|
||
**Verification**: 通过 REST API `GET /api/v1/tasks/{task_id}` 查询任务状态,验证状态流转正确。
|
||
|
||
---
|
||
|
||
### U4. Layer 3: 前端任务状态查询 API 客户端
|
||
|
||
**Goal**: 在前端 API 客户端中添加任务状态查询方法,支持按 status 过滤和按 task_id 查询。
|
||
|
||
**Requirements**: R5, R6
|
||
|
||
**Dependencies**: U3
|
||
|
||
**Files**:
|
||
- `src/agentkit/server/frontend/src/api/client.ts` — 添加任务 API 方法
|
||
- `src/agentkit/server/frontend/src/api/types.ts` — 添加任务类型定义
|
||
|
||
**Approach**:
|
||
|
||
1. 在 `types.ts` 中添加 `ITaskRecord` 接口,对应后端 `TaskRecord.to_dict()` 的输出格式。
|
||
2. 在 `client.ts` 中添加方法:
|
||
- `getTask(taskId: string): Promise<ITaskRecord>` — GET `/api/v1/tasks/{taskId}`(注意:tasks 路由前缀是 `/api/v1/tasks`,不是 `/api/v1/portal`)
|
||
- `listTasks(status?: string): Promise<ITaskRecord[]>` — GET `/api/v1/tasks?status=running`
|
||
3. 由于 tasks 路由前缀不同(`/api/v1/tasks` vs `/api/v1/portal`),需要创建一个新的 ApiClient 实例或调整 BaseApiClient 的 baseUrl。
|
||
|
||
**Patterns to follow**:
|
||
- 现有 `getConversations()` / `getConversation(id)` 的方法签名模式(client.ts:30-37)
|
||
- `IConversation` 接口定义模式(types.ts:51-57)
|
||
|
||
**Test scenarios**:
|
||
- **Happy path**: 调用 `getTask(taskId)` 返回正确的任务记录
|
||
- **按状态过滤**: 调用 `listTasks("running")` 只返回运行中任务
|
||
- **任务不存在**: 调用 `getTask("invalid-id")` 抛出 404 错误
|
||
- **Integration**: 后台任务运行中,前端能通过 API 查询到 RUNNING 状态
|
||
|
||
**Verification**: TypeScript 编译通过(`npm run build:frontend`),API 调用返回正确数据。
|
||
|
||
---
|
||
|
||
### U5. Layer 3: 前端 WebSocket 重连后的任务恢复
|
||
|
||
**Goal**: 前端 WebSocket 重连后,检查当前对话是否有未完成任务,恢复事件流或拉取已完成的结果。
|
||
|
||
**Requirements**: R5, R6
|
||
|
||
**Dependencies**: U4
|
||
|
||
**Files**:
|
||
- `src/agentkit/server/frontend/src/stores/chat.ts` — 添加重连恢复逻辑
|
||
- `src/agentkit/server/frontend/src/api/types.ts` — 扩展 WsClientMessage 类型
|
||
|
||
**Approach**:
|
||
|
||
1. **扩展 WebSocket 协议**:添加 `resume` 消息类型,前端重连后发送 `{type: "resume", task_id: "..."}` 订阅已有后台任务的事件流。
|
||
2. **后端处理 resume**:在 portal.py 的 WebSocket 路径中,处理 `resume` 消息类型——通过 task_id 订阅 EventQueue 事件流,转发给前端。
|
||
3. **前端重连恢复流程**(在 `connectWebSocket` 的 `onopen` 中):
|
||
- 检查 `currentConversationId` 是否有值
|
||
- 调用 `listTasks("running")` 查找运行中任务
|
||
- 匹配 `metadata.conversation_id === currentConversationId` 的任务
|
||
- 如果找到运行中任务:发送 `resume` 消息,设置 `isLoading = true`
|
||
- 如果无运行中任务:调用 `selectConversation(currentConversationId)` 重新加载消息(包含已完成的结果)
|
||
4. **后端 resume 处理**:接收到 `resume` 消息后,通过 `task_id` 订阅 EventQueue,转发事件直到 `task.completed` 或 `task.failed`。
|
||
|
||
**Technical design**(directional guidance):
|
||
|
||
```typescript
|
||
// 前端重连恢复(伪代码)
|
||
socket.onopen = async () => {
|
||
isWsConnected.value = true
|
||
startHeartbeat()
|
||
|
||
// 重连恢复逻辑
|
||
if (currentConversationId.value) {
|
||
await recoverTask(currentConversationId.value)
|
||
}
|
||
}
|
||
|
||
async function recoverTask(convId: string) {
|
||
const tasks = await apiClient.listTasks('running')
|
||
const runningTask = tasks.find(
|
||
t => t.metadata?.conversation_id === convId
|
||
)
|
||
|
||
if (runningTask) {
|
||
// 恢复进行中任务的事件流
|
||
isLoading.value = true
|
||
ws.value.send(JSON.stringify({
|
||
type: 'resume',
|
||
task_id: runningTask.task_id
|
||
}))
|
||
} else {
|
||
// 无运行中任务,重新加载对话消息
|
||
await selectConversation(convId)
|
||
}
|
||
}
|
||
```
|
||
|
||
**Patterns to follow**:
|
||
- 现有 `connectWebSocket` 的 `onopen` / `onclose` 模式(chat.ts:209-259)
|
||
- `selectConversation` 的消息加载模式(chat.ts:55-74)
|
||
- `handleWsMessage` 的事件处理模式(chat.ts:270-528)
|
||
|
||
**Test scenarios**:
|
||
- **Happy path - 有运行中任务**: 重连后发现有运行中任务 → 发送 resume → 收到后续事件 → 任务完成显示结果
|
||
- **Happy path - 无运行中任务**: 重连后无运行中任务 → 重新加载对话消息 → 显示已完成的结果
|
||
- **任务在重连前已完成**: 重连时任务已 COMPLETED → listTasks("running") 返回空 → 重新加载对话消息 → 结果显示
|
||
- **多个运行中任务**: 有多个对话的运行中任务 → 只恢复当前对话的任务
|
||
- **resume 后任务立即完成**: resume 后立即收到 task.completed 事件 → 正确显示结果
|
||
- **Integration**: 刷新页面 → 重连 → 恢复任务 → 最终结果正确显示
|
||
|
||
**Verification**: 启动复杂任务 → 刷新页面 → 验证任务继续运行 → 验证结果最终显示。
|
||
|
||
---
|
||
|
||
## Risks & Dependencies
|
||
|
||
### Risks
|
||
|
||
1. **EventQueue 订阅过滤改造风险**:当前 `subscribe()` 无过滤,添加 task_id 过滤可能影响现有订阅者。缓解:使用可选参数 `subscribe(task_id=None)`,默认行为不变。
|
||
2. **后台任务泄漏风险**:WebSocket 断开后后台任务继续运行,如果任务本身卡住(如 LLM 超时),任务会一直占用资源。缓解:ReAct 已有 `timeout_seconds` 配置,后台任务同样受此约束。
|
||
3. **并发任务事件混淆风险**:多个对话同时执行任务,EventQueue 事件可能混淆。缓解:每个任务有唯一 task_id,订阅时按 task_id 过滤。
|
||
4. **前端重连时序风险**:重连后查询任务状态时,任务可能刚好从 RUNNING 变为 COMPLETED。缓解:先查 running,如果空则重新加载对话消息(会包含已完成结果)。
|
||
|
||
### Dependencies
|
||
|
||
- U1 → U2 → U3 → U4 → U5(顺序依赖)
|
||
- EventQueue 已在 app.state 可用(无需新建)
|
||
- TaskStore 已在 app.state 可用(无需新建)
|
||
|
||
## System-Wide Impact
|
||
|
||
- **后端**:portal.py WebSocket 路径重大重构,影响所有 WebSocket 聊天用户
|
||
- **前端**:chat store 的 WebSocket 连接逻辑增强,影响所有聊天页面用户
|
||
- **API**:新增 `resume` WebSocket 消息类型,无 REST API 变更
|
||
- **兼容性**:DIRECT_CHAT 路径和 REST API 路径不受影响
|