fischer-agentkit/docs/plans/2026-06-07-015-feat-agentki...

551 lines
24 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
title: "feat: AgentKit Phase 8 — 对话式 Agent 与自适应编排"
status: completed
created: 2026-06-07
plan_type: feat
depth: deep
origin: Phase 1-7 完成后架构能力评估 — 对话模式(20%)、多Agent协作(70%)、自适应规划(10%)三大差距
branch: feat/agentkit-phase8-chat-adaptive
---
# AgentKit Phase 8 — 对话式 Agent 与自适应编排
## Summary
Phase 8 将 AgentKit 从"任务执行框架"演进为"对话式 Agent 平台"补齐三大核心差距Chat 模式(多轮对话 + Human-in-the-Loop、自适应编排反思-重规划闭环)、高级多 Agent 协作Agent 间直接通信 + 动态角色协商。分三个阶段交付先建立对话基础设施U1-U3再实现自适应编排U4-U5最后补齐多 Agent 协作U6-U8
## Problem Frame
Phase 1-7 构建了完整的 Agent 执行框架ReAct 引擎、三层记忆、Pipeline 编排、MCP 协议、上下文压缩、自进化系统。但当前架构是 **"计划-执行"** 模式,存在三大差距:
1. **对话能力缺失20%**:无会话管理、无多轮对话、无 Human-in-the-Loop。`conversation_id` 字段存在但从未使用。每次任务提交是独立的ReAct 引擎不保留对话历史。
2. **自适应规划缺失10%**Pipeline 定义是静态的,执行失败后无"分析原因→修改计划→重新执行"闭环。Orchestrator 分解一次执行一次,无迭代优化。
3. **多 Agent 高级协作不足70%**Worker 之间只能通过 SharedWorkspace 间接传递数据,无直接通信。角色固定,无运行时协商。无 Supervisor 监控。
## Requirements
- R1: 支持多轮对话:用户可在同一会话中持续与 Agent 交互,对话历史自动持久化
- R2: 支持 Human-in-the-LoopAgent 可主动向用户提问并等待回复,用户可在执行中追问/打断
- R3: 支持流式 LLM 输出ReAct 引擎的 LLM 调用支持 token-by-token streaming
- R4: 支持自适应编排Pipeline 执行失败后可触发反思→重规划→重新执行闭环
- R5: 支持 Agent 间直接通信Worker Agent 之间可发送消息,无需通过 SharedWorkspace 间接传递
- R6: 支持动态角色协商Agent 可在运行时协商分工,而非固定角色分配
- R7: 所有新功能向后兼容,现有 Task 模式不受影响
- R8: Chat 模式和 Task 模式可共存,同一 Agent 实例可同时服务两种模式
---
## Key Technical Decisions
### KTD-1: 会话模型设计 — Session + Message 双层模型
**决策**:采用 `Session`(会话)+ `Message`消息双层模型Session 持有对话元数据和 Agent 绑定Message 持有单条消息内容和角色。
**理由**
- Session 管理生命周期(创建/活跃/暂停/关闭),支持超时自动归档
- Message 按 role 分类user/assistant/tool/system与 ReAct 引擎的 messages 列表自然映射
- Session 可关联多个 AgentOrchestrator 模式下Message 可标注来源 Agent
**替代方案**
- 单层 Conversation 模型:简单但无法表达多 Agent 参与、会话状态管理
- 纯 Redis 存储:高性能但无持久化保证,重启丢失
### KTD-2: Human-in-the-Loop 实现 — AskHumanTool + WebSocket 双向通信
**决策**:注册 `AskHumanTool` 作为 ReAct 可调用工具Agent 调用时通过 WebSocket 向客户端推送问题并等待回复。
**理由**
- 工具化实现与现有 ReAct 循环无缝集成,无需修改引擎核心
- WebSocket 已有基础设施,只需扩展为双向通信
- 非对话模式下 AskHumanTool 不注册,零侵入
**替代方案**
- 特殊消息类型:需要修改 ReAct 引擎核心循环,侵入性高
- HTTP 轮询:延迟高,用户体验差
### KTD-3: 流式 LLM 输出 — LLMGateway.chat_stream() + ReActEvent 扩展
**决策**:在 LLMGateway 新增 `chat_stream()` 方法返回 `AsyncIterator[str]`ReAct 引擎在 `execute_stream()` 中消费并包装为 `token` 类型 ReActEvent。
**理由**
- 流式输出是 Chat 模式的基本要求,用户需要实时看到 Agent 的"思考过程"
- 在 LLM Gateway 层实现流式,所有 Provider 统一接口
- ReActEvent 已有 streaming 机制,只需新增 `token` 事件类型
### KTD-4: 自适应编排 — Reflect-then-Replan 模式
**决策**:在 PipelineEngine 执行失败时,触发 LLM 反思分析失败原因,生成修正后的 Pipeline 重新执行,最多重试 N 次。
**理由**
- 与现有 PipelineEngine 的重试机制互补:重试处理瞬态错误,反思处理结构性错误
- LLM 反思可利用执行上下文(哪步失败、错误信息、已完成步骤的输出)做出更智能的调整
- 可配置开关,默认关闭,不影响现有 Pipeline 行为
### KTD-5: Agent 间通信 — MessageBus 抽象层
**决策**:新增 `MessageBus` 抽象层,基于 Redis Streams 实现,支持 Agent 间点对点和广播通信。
**理由**
- Redis Streams 比 Pub/Sub 可靠(消息持久化 + 消费者组),比 Queue 灵活(多消费者)
- 抽象层解耦底层实现,未来可替换为 Kafka/NATS
- 与现有 SharedWorkspace 互补Workspace 用于共享状态MessageBus 用于事件通知
---
## High-Level Technical Design
```mermaid
flowchart TB
subgraph Chat["Chat 模式(新增)"]
Client[客户端] <-->|WebSocket 双向| WS[WS Handler]
WS -->|发送消息| SM[SessionManager]
SM -->|加载会话| SP[Session + Messages]
SM -->|调用 Agent| Agent[ReAct Engine]
Agent -->|AskHumanTool| WS
Agent -->|token streaming| WS
end
subgraph Task["Task 模式(现有)"]
API[REST API] -->|提交任务| Dispatcher[TaskDispatcher]
Dispatcher -->|分发| Agent
end
subgraph Adaptive["自适应编排(新增)"]
PE[PipelineEngine] -->|执行失败| Reflector[LLM Reflector]
Reflector -->|分析原因| Replanner[Replanner]
Replanner -->|生成新计划| PE
end
subgraph MultiAgent["多 Agent 协作(增强)"]
Orch[Orchestrator] -->|分配任务| W1[Worker 1]
Orch -->|分配任务| W2[Worker 2]
W1 <-->|MessageBus| W2
Orch -->|动态协商| Negotiator[Role Negotiator]
end
Agent --> PE
Agent --> Orch
style Chat fill:#c8e6c9,color:#1a5e20
style Adaptive fill:#bbdefb,color:#0d47a1
style MultiAgent fill:#fff3e0,color:#e65100
```
---
## Scope Boundaries
### In Scope
- Session/Message 模型与持久化
- Chat APIREST + WebSocket 双向通信)
- AskHumanTool 实现
- LLMGateway.chat_stream() 流式输出
- ReAct 引擎 token streaming
- PipelineEngine 反思-重规划闭环
- MessageBus 抽象层与 Redis Streams 实现
- Agent 间点对点通信
### Deferred to Follow-Up Work
- 多租户隔离与用户认证OAuth/JWT
- Agent 沙箱与工具权限控制
- 声明式 Pipeline YAML 定义
- Pipeline 运行时监控/可视化
- 跨 Agent 记忆共享
- 记忆主动遗忘/压缩策略
- 性能基准测试与混沌测试
- 动态角色协商Phase 8 仅实现基础通信,协商逻辑留待 Phase 9
---
## Implementation Units
### U1. Session/Message 模型与持久化
**Goal:** 建立对话基础设施 — Session 生命周期管理 + Message 持久化存储
**Requirements:** R1, R7
**Dependencies:**
**Files:**
- `src/agentkit/session/__init__.py` (新建)
- `src/agentkit/session/models.py` (新建) — Session + Message 数据模型
- `src/agentkit/session/manager.py` (新建) — SessionManager 会话管理器
- `src/agentkit/session/store.py` (新建) — 会话存储InMemory + Redis 双后端)
- `tests/unit/test_session_models.py` (新建)
- `tests/unit/test_session_manager.py` (新建)
- `tests/unit/test_session_store.py` (新建)
**Approach:**
- `Session` 模型:`session_id`, `agent_name`, `status` (active/paused/closed), `metadata`, `created_at`, `updated_at`, `ttl`
- `Message` 模型:`message_id`, `session_id`, `role` (user/assistant/tool/system), `content`, `tool_call_id` (可选), `agent_name` (可选,多 Agent 场景), `created_at`
- `SessionManager`:创建/获取/暂停/关闭会话,追加消息,加载对话历史,超时归档
- `SessionStore`InMemory测试用+ Redis生产用双后端接口统一
- Session 与 Agent 绑定:创建时指定 `agent_name`,后续消息自动路由到该 Agent
- 对话历史加载:`get_messages(session_id)` 返回完整消息列表,可直接传入 ReAct 引擎
**Patterns to follow:** `TaskStore` 的 InMemory + Redis 双后端模式(`src/agentkit/server/task_store.py`
**Test scenarios:**
- 创建 Session 返回有效 session_id状态为 active
- 追加 Message 后通过 get_messages 获取完整历史
- Session 状态转换active → paused → active → closed
- 关闭的 Session 不接受新 Message
- InMemory 和 Redis Store 行为一致
- Session TTL 过期后自动归档为 closed
- 获取不存在的 Session 返回 None
- 大量消息的会话加载性能1000+ messages
**Verification:** Session 模型可创建、持久化、加载Message 可追加和查询,状态转换正确
---
### U2. Chat API 与 WebSocket 双向通信
**Goal:** 暴露 Chat 模式的 REST + WebSocket API支持多轮对话和实时交互
**Requirements:** R1, R2, R7, R8
**Dependencies:** U1
**Files:**
- `src/agentkit/server/routes/chat.py` (新建) — Chat REST + WebSocket 路由
- `src/agentkit/server/app.py` (修改) — 注册 Chat 路由,注入 SessionManager
- `src/agentkit/server/config.py` (修改) — 新增 session 配置
- `tests/unit/test_chat_routes.py` (新建)
- `tests/integration/test_chat_e2e.py` (新建)
**Approach:**
- REST 端点:
- `POST /api/v1/chat/sessions` — 创建会话
- `GET /api/v1/chat/sessions/{id}` — 获取会话信息
- `POST /api/v1/chat/sessions/{id}/messages` — 发送消息(同步模式,等待 Agent 完整回复)
- `GET /api/v1/chat/sessions/{id}/messages` — 获取对话历史
- `DELETE /api/v1/chat/sessions/{id}` — 关闭会话
- WebSocket 端点:`/ws/chat/{session_id}` — 双向通信
- 客户端 → 服务端:`{"type": "message", "content": "..."}`, `{"type": "cancel"}`
- 服务端 → 客户端:`{"type": "token", "content": "..."}`, `{"type": "tool_call", ...}`, `{"type": "ask_human", "question": "..."}`, `{"type": "final_answer", ...}`
- Chat 路由与现有 Task 路由共存,同一 Agent 实例可同时服务两种模式
- WebSocket handler 接收用户消息后,加载 Session 历史 + 新消息,调用 Agent.execute_stream()
**Patterns to follow:** 现有 `routes/tasks.py` 的 stream 端点 + `routes/ws.py` 的 WebSocket handler
**Test scenarios:**
- POST /chat/sessions 创建会话返回 session_id
- POST /chat/sessions/{id}/messages 发送消息获得 Agent 回复
- GET /chat/sessions/{id}/messages 返回完整对话历史
- WebSocket 连接后发送消息收到 token 流式事件
- WebSocket 发送 cancel 消息取消 Agent 执行
- 关闭的 Session 发送消息返回 400 错误
- 不存在的 Session 返回 404
- 同一 Agent 同时服务 Chat 和 Task 模式
**Verification:** Chat API 端到端可用WebSocket 双向通信正常,多轮对话历史持久化
---
### U3. AskHumanTool + 流式 LLM 输出
**Goal:** 实现 Human-in-the-Loop 和 token-by-token 流式输出
**Requirements:** R2, R3, R7
**Dependencies:** U1, U2
**Files:**
- `src/agentkit/tools/ask_human.py` (新建) — AskHumanTool
- `src/agentkit/tools/__init__.py` (修改) — 导出 AskHumanTool
- `src/agentkit/llm/gateway.py` (修改) — 新增 chat_stream() 方法
- `src/agentkit/llm/protocol.py` (修改) — 新增 LLMStreamResponse
- `src/agentkit/core/react.py` (修改) — execute_stream() 支持 token 事件
- `src/agentkit/core/config_driven.py` (修改) — Chat 模式下注册 AskHumanTool
- `tests/unit/test_ask_human_tool.py` (新建)
- `tests/unit/test_llm_streaming.py` (新建)
- `tests/unit/test_react_token_streaming.py` (新建)
**Approach:**
- `AskHumanTool`
- 参数:`question: str`, `options: list[str] | None`
- 执行时通过 `context` 获取 WebSocket 连接,向客户端推送问题
- 使用 `asyncio.Future` 等待用户回复,设置超时(默认 60s
- 超时返回默认回复或取消当前 ReAct 循环
- 仅在 Chat 模式下注册(通过 ConfigDrivenAgent 的 mode 参数判断)
- `LLMGateway.chat_stream()`
- 新增方法,签名与 `chat()` 一致,返回 `AsyncIterator[LLMStreamChunk]`
- `LLMStreamChunk``content: str`, `finish_reason: str | None`, `usage: dict | None`
- 各 Provider 实现 `_stream_chat()` 方法,调用底层 SDK 的 stream API
- 优先实现 OpenAI 和 Anthropic 的流式(覆盖 80% 用例)
- ReAct token streaming
- `execute_stream()` 中调用 `llm_gateway.chat_stream()` 替代 `chat()`
- 新增 `ReActEvent` 类型:`type="token", content=str`
- token 事件直接透传到 WebSocket 客户端
**Patterns to follow:** `HeadroomRetrieveTool` 的条件注册模式;现有 Provider 的 `chat()` 实现模式
**Test scenarios:**
- AskHumanTool 执行后向客户端推送问题
- AskHumanTool 收到用户回复后返回结果
- AskHumanTool 超时后返回默认回复
- 非 Chat 模式下 AskHumanTool 不注册
- LLMGateway.chat_stream() 返回 AsyncIterator
- 流式输出的内容拼接后与完整输出一致
- ReAct execute_stream() 产出 token 类型事件
- 流式输出中断(取消)时正确清理资源
**Verification:** Agent 可在执行中向用户提问并等待回复LLM 输出以 token 粒度流式推送
---
### U4. PipelineEngine 反思-重规划闭环
**Goal:** Pipeline 执行失败后自动反思分析原因,生成修正计划重新执行
**Requirements:** R4, R7
**Dependencies:** 无(独立于 U1-U3
**Files:**
- `src/agentkit/orchestrator/reflection.py` (新建) — PipelineReflector + PipelineReplanner
- `src/agentkit/orchestrator/pipeline_engine.py` (修改) — 集成反思-重规划
- `src/agentkit/orchestrator/pipeline_schema.py` (修改) — 新增 AdaptiveConfig
- `tests/unit/test_pipeline_reflection.py` (新建)
- `tests/integration/test_adaptive_pipeline.py` (新建)
**Approach:**
- `PipelineReflector`
- 输入:失败的 Pipeline + 执行上下文(哪步失败、错误信息、已完成步骤输出)
- 调用 LLM 分析失败原因输出结构化反思报告failure_type, root_cause, suggested_fix
- failure_type 分类:`input_error`(输入问题)、`resource_error`(资源不可用)、`logic_error`(步骤逻辑错误)、`timeout`(超时)
- `PipelineReplanner`
- 输入:原始 Pipeline + 反思报告
- 调用 LLM 生成修正后的 Pipeline调整步骤顺序、替换失败步骤、增加前置检查
- 保留已完成步骤的结果,仅重新执行失败及后续步骤
- `AdaptiveConfig`
- `enabled: bool = False` — 默认关闭
- `max_reflections: int = 3` — 最大反思次数
- `reflection_model: str = "default"` — 反思使用的 LLM 模型
- `skip_stages: list[str] = []` — 不参与反思的步骤
- PipelineEngine 集成:
- 执行失败时检查 AdaptiveConfig.enabled
- 启用则触发反思→重规划→重新执行循环
- 每次反思记录到 PipelineResult.metadata
**Patterns to follow:** `StepRetryPolicy` 的配置模式;`SagaOrchestrator` 的补偿追踪模式
**Test scenarios:**
- Pipeline 执行失败且 adaptive 未启用时,行为与现有一致
- Pipeline 执行失败且 adaptive 启用时,触发反思
- 反思报告包含 failure_type 和 root_cause
- 重规划生成的新 Pipeline 保留已完成步骤结果
- 达到 max_reflections 后停止重试
- 反思-重规划成功后 Pipeline 最终完成
- 连续反思仍失败时返回最终失败结果
- 集成测试:模拟资源错误→反思→调整→成功
**Verification:** Pipeline 执行失败后可自动反思并重规划,最终完成或达到最大重试次数
---
### U5. Orchestrator 自适应任务分解
**Goal:** Orchestrator 支持迭代式任务分解 — 执行→评估→再分解
**Requirements:** R4, R7
**Dependencies:** U4
**Files:**
- `src/agentkit/core/orchestrator.py` (修改) — 新增 execute_adaptive() 方法
- `tests/unit/test_orchestrator_adaptive.py` (新建)
- `tests/integration/test_orchestrator_adaptive.py` (新建)
**Approach:**
- `execute_adaptive()` 方法:
- 第一轮:与现有 execute() 一致,分解任务并执行
- 评估LLM 评估子任务结果质量0-1 评分 + 改进建议)
- 如果评估不通过且未达 max_iterations
- 基于评估反馈重新分解未达标的子任务
- 保留已完成的子任务结果
- 执行新分解的子任务
- 如果评估通过或达到 max_iterations汇总结果返回
- 新增 `OrchestratorConfig`
- `adaptive: bool = False`
- `max_iterations: int = 3`
- `quality_threshold: float = 0.7`
**Patterns to follow:** `DynamicPipeline.execute_loop()` 的迭代模式
**Test scenarios:**
- adaptive=False 时行为与现有 execute() 一致
- 第一轮评估通过时直接返回结果
- 第一轮评估不通过时触发再分解
- 再分解保留已完成子任务结果
- 达到 max_iterations 时返回当前最佳结果
- 评估反馈正确传递给 LLM 用于再分解
- 集成测试:复杂任务经两轮分解后完成
**Verification:** Orchestrator 可根据执行结果迭代优化任务分解
---
### U6. MessageBus 抽象层与 Redis Streams 实现
**Goal:** 新增 Agent 间通信基础设施 — MessageBus 抽象 + Redis Streams 实现
**Requirements:** R5, R7
**Dependencies:** 无(独立于 U1-U5
**Files:**
- `src/agentkit/bus/__init__.py` (新建)
- `src/agentkit/bus/protocol.py` (新建) — MessageBus Protocol 定义
- `src/agentkit/bus/message.py` (新建) — AgentMessage 消息模型
- `src/agentkit/bus/redis_bus.py` (新建) — Redis Streams 实现
- `src/agentkit/bus/memory_bus.py` (新建) — InMemory 实现(测试用)
- `tests/unit/test_bus_protocol.py` (新建)
- `tests/unit/test_redis_bus.py` (新建)
- `tests/unit/test_memory_bus.py` (新建)
**Approach:**
- `AgentMessage` 模型:
- `message_id: str` — UUID
- `sender: str` — 发送者 Agent 名称
- `recipient: str | None` — 接收者None 为广播)
- `topic: str` — 消息主题(如 "task.result", "agent.status"
- `payload: dict[str, Any]` — 消息内容
- `timestamp: datetime` — 发送时间
- `correlation_id: str | None` — 关联 ID请求-响应模式)
- `MessageBus` Protocol
- `publish(message: AgentMessage) -> None` — 发布消息
- `subscribe(agent_name: str, handler: Callable) -> None` — 订阅消息
- `unsubscribe(agent_name: str) -> None` — 取消订阅
- `request(message: AgentMessage, timeout: float) -> AgentMessage` — 请求-响应模式
- `broadcast(message: AgentMessage) -> None` — 广播消息
- `RedisMessageBus`
- 使用 Redis StreamsXADD/XREADGROUP实现
- 每个 Agent 一个 Consumer Group支持多消费者
- 消息确认机制XACK防止消息丢失
- 死信队列:超过重试次数的消息转入死信
- `InMemoryMessageBus`
- asyncio.Queue 实现,测试用
- 行为与 Redis 实现一致
**Patterns to follow:** `SharedWorkspace` 的 Redis + InMemory 双模式;`TaskDispatcher` 的 Redis Queue 模式
**Test scenarios:**
- 点对点消息Agent A 发送消息给 Agent BB 收到
- 广播消息Agent A 广播,所有订阅者收到
- 请求-响应Agent A 发送请求Agent B 回复A 收到响应
- 请求超时Agent A 发送请求,无响应,超时后抛出异常
- 取消订阅后不再收到消息
- InMemory 和 Redis 实现行为一致
- 消息确认:消费者处理完消息后 XACK
- 死信队列:消息重试 3 次后转入死信
**Verification:** Agent 间可通过 MessageBus 点对点和广播通信,请求-响应模式正常工作
---
### U7. Orchestrator 集成 MessageBus
**Goal:** Orchestrator 通过 MessageBus 协调 Worker支持 Worker 间直接通信
**Requirements:** R5, R7
**Dependencies:** U6
**Files:**
- `src/agentkit/core/orchestrator.py` (修改) — 注入 MessageBusWorker 间通信
- `src/agentkit/core/agent_pool.py` (修改) — Agent 注册到 MessageBus
- `src/agentkit/server/app.py` (修改) — 初始化 MessageBus
- `tests/unit/test_orchestrator_bus.py` (新建)
- `tests/integration/test_multi_agent_communication.py` (新建)
**Approach:**
- Orchestrator 注入 MessageBus
- 创建 Orchestrator 时传入可选的 MessageBus
- Worker 执行子任务时,通过 MessageBus 发布进度和中间结果
- Worker 之间可直接通信(如 Agent A 请求 Agent B 的中间结果)
- AgentPool 集成:
- Agent 创建时自动注册到 MessageBus订阅自己的消息
- Agent 销毁时取消订阅
- Agent 的 `handle_message()` 方法处理收到的消息
- 消息路由:
- 点对点:`recipient="agent_name"`
- 广播:`topic="task.progress"`
- Orchestrator 订阅所有 Worker 的进度和结果
**Patterns to follow:** `MCPManager` 的生命周期管理模式
**Test scenarios:**
- Worker 通过 MessageBus 发布进度Orchestrator 收到
- Worker A 直接请求 Worker B 的中间结果
- Agent 创建时自动注册到 MessageBus
- Agent 销毁时取消订阅
- 无 MessageBus 时 Orchestrator 行为不变(向后兼容)
- 集成测试:两 Worker 协作完成需要中间结果交换的任务
**Verification:** Orchestrator 可通过 MessageBus 协调 WorkerWorker 间可直接通信
---
### U8. Chat 模式集成测试与文档更新
**Goal:** 端到端验证 Chat 模式 + 自适应编排 + 多 Agent 通信的集成
**Requirements:** R1-R8
**Dependencies:** U1-U7
**Files:**
- `tests/integration/test_chat_adaptive_e2e.py` (新建) — Chat + 自适应编排 E2E
- `tests/integration/test_chat_multi_agent_e2e.py` (新建) — Chat + 多 Agent 协作 E2E
- `configs/llm_config.yaml` (修改) — 新增 session/bus/adaptive 配置段
**Approach:**
- Chat + 自适应 E2E
- 创建会话 → 发送复杂任务 → Agent 执行 Pipeline → 失败触发反思 → 重规划 → 成功 → 多轮对话继续
- Chat + 多 Agent E2E
- 创建会话 → 发送多 Agent 任务 → Orchestrator 分解 → Worker 通过 MessageBus 协作 → 结果汇总 → 用户追问
- 配置更新:
- `session` 段:`backend: redis`, `ttl: 3600`
- `bus` 段:`backend: redis_streams`
- `adaptive` 段:`enabled: true`, `max_reflections: 3`
**Test scenarios:**
- Chat + 自适应:多轮对话中 Pipeline 失败后自动反思重规划
- Chat + 多 Agent用户通过 Chat 模式触发多 Agent 协作
- AskHumanToolAgent 在执行中向用户提问,用户回复后继续
- 流式输出Chat 模式下 token-by-token 推送
- 配置加载:新配置段正确解析
**Verification:** 所有新功能端到端可用,配置正确加载
---
## Risks & Mitigations
| 风险 | 影响 | 缓解 |
|------|------|------|
| WebSocket 双向通信复杂度高 | 中 | 先实现 REST 同步模式WebSocket 作为增强 |
| LLM 流式输出 Provider 适配工作量大 | 中 | 优先适配 OpenAI/Anthropic其余 Provider 降级为非流式 |
| 反思-重规划 LLM 调用增加成本 | 低 | 默认关闭,可配置;反思使用低成本模型 |
| Redis Streams 运维复杂度 | 低 | InMemory 实现可用于开发/测试,生产用 Redis |
| Session 大量消息加载性能 | 中 | 分页加载 + 摘要压缩(长期消息自动摘要) |
## Open Questions
- Q1: Session 消息的分页策略?默认按时间倒序分页,还是按 ReAct 循环分页?
- Q2: AskHumanTool 超时后的默认行为?返回默认回复还是抛出异常让 ReAct 处理?
- Q3: 反思-重规划是否需要人工确认?自动执行还是需要用户审批后才能重规划?
## System-Wide Impact
- **API 层**:新增 5 个 REST 端点 + 1 个 WebSocket 端点
- **ReAct 引擎**execute_stream() 扩展 token 事件类型,非破坏性变更
- **LLM Gateway**:新增 chat_stream() 方法,所有 Provider 需实现(可降级为非流式)
- **Orchestrator**:新增 execute_adaptive(),现有 execute() 不变
- **PipelineEngine**:反思-重规划为可选增强,默认关闭
- **新模块**`session/`, `bus/` — 两个新子包
- **配置**:新增 session/bus/adaptive 配置段