--- title: "feat: AgentKit Phase 8 — 对话式 Agent 与自适应编排" status: completed created: 2026-06-07 plan_type: feat depth: deep origin: Phase 1-7 完成后架构能力评估 — 对话模式(20%)、多Agent协作(70%)、自适应规划(10%)三大差距 branch: feat/agentkit-phase8-chat-adaptive --- # AgentKit Phase 8 — 对话式 Agent 与自适应编排 ## Summary Phase 8 将 AgentKit 从"任务执行框架"演进为"对话式 Agent 平台",补齐三大核心差距:Chat 模式(多轮对话 + Human-in-the-Loop)、自适应编排(反思-重规划闭环)、高级多 Agent 协作(Agent 间直接通信 + 动态角色协商)。分三个阶段交付:先建立对话基础设施(U1-U3),再实现自适应编排(U4-U5),最后补齐多 Agent 协作(U6-U8)。 ## Problem Frame Phase 1-7 构建了完整的 Agent 执行框架:ReAct 引擎、三层记忆、Pipeline 编排、MCP 协议、上下文压缩、自进化系统。但当前架构是 **"计划-执行"** 模式,存在三大差距: 1. **对话能力缺失(20%)**:无会话管理、无多轮对话、无 Human-in-the-Loop。`conversation_id` 字段存在但从未使用。每次任务提交是独立的,ReAct 引擎不保留对话历史。 2. **自适应规划缺失(10%)**:Pipeline 定义是静态的,执行失败后无"分析原因→修改计划→重新执行"闭环。Orchestrator 分解一次执行一次,无迭代优化。 3. **多 Agent 高级协作不足(70%)**:Worker 之间只能通过 SharedWorkspace 间接传递数据,无直接通信。角色固定,无运行时协商。无 Supervisor 监控。 ## Requirements - R1: 支持多轮对话:用户可在同一会话中持续与 Agent 交互,对话历史自动持久化 - R2: 支持 Human-in-the-Loop:Agent 可主动向用户提问并等待回复,用户可在执行中追问/打断 - R3: 支持流式 LLM 输出:ReAct 引擎的 LLM 调用支持 token-by-token streaming - R4: 支持自适应编排:Pipeline 执行失败后可触发反思→重规划→重新执行闭环 - R5: 支持 Agent 间直接通信:Worker Agent 之间可发送消息,无需通过 SharedWorkspace 间接传递 - R6: 支持动态角色协商:Agent 可在运行时协商分工,而非固定角色分配 - R7: 所有新功能向后兼容,现有 Task 模式不受影响 - R8: Chat 模式和 Task 模式可共存,同一 Agent 实例可同时服务两种模式 --- ## Key Technical Decisions ### KTD-1: 会话模型设计 — Session + Message 双层模型 **决策**:采用 `Session`(会话)+ `Message`(消息)双层模型,Session 持有对话元数据和 Agent 绑定,Message 持有单条消息内容和角色。 **理由**: - Session 管理生命周期(创建/活跃/暂停/关闭),支持超时自动归档 - Message 按 role 分类(user/assistant/tool/system),与 ReAct 引擎的 messages 列表自然映射 - Session 可关联多个 Agent(Orchestrator 模式下),Message 可标注来源 Agent **替代方案**: - 单层 Conversation 模型:简单但无法表达多 Agent 参与、会话状态管理 - 纯 Redis 存储:高性能但无持久化保证,重启丢失 ### KTD-2: Human-in-the-Loop 实现 — AskHumanTool + WebSocket 双向通信 **决策**:注册 `AskHumanTool` 作为 ReAct 可调用工具,Agent 调用时通过 WebSocket 向客户端推送问题并等待回复。 **理由**: - 工具化实现与现有 ReAct 循环无缝集成,无需修改引擎核心 - WebSocket 已有基础设施,只需扩展为双向通信 - 非对话模式下 AskHumanTool 不注册,零侵入 **替代方案**: - 特殊消息类型:需要修改 ReAct 引擎核心循环,侵入性高 - HTTP 轮询:延迟高,用户体验差 ### KTD-3: 流式 LLM 输出 — LLMGateway.chat_stream() + ReActEvent 扩展 **决策**:在 LLMGateway 新增 `chat_stream()` 方法返回 `AsyncIterator[str]`,ReAct 引擎在 `execute_stream()` 中消费并包装为 `token` 类型 ReActEvent。 **理由**: - 流式输出是 Chat 模式的基本要求,用户需要实时看到 Agent 的"思考过程" - 在 LLM Gateway 层实现流式,所有 Provider 统一接口 - ReActEvent 已有 streaming 机制,只需新增 `token` 事件类型 ### KTD-4: 自适应编排 — Reflect-then-Replan 模式 **决策**:在 PipelineEngine 执行失败时,触发 LLM 反思分析失败原因,生成修正后的 Pipeline 重新执行,最多重试 N 次。 **理由**: - 与现有 PipelineEngine 的重试机制互补:重试处理瞬态错误,反思处理结构性错误 - LLM 反思可利用执行上下文(哪步失败、错误信息、已完成步骤的输出)做出更智能的调整 - 可配置开关,默认关闭,不影响现有 Pipeline 行为 ### KTD-5: Agent 间通信 — MessageBus 抽象层 **决策**:新增 `MessageBus` 抽象层,基于 Redis Streams 实现,支持 Agent 间点对点和广播通信。 **理由**: - Redis Streams 比 Pub/Sub 可靠(消息持久化 + 消费者组),比 Queue 灵活(多消费者) - 抽象层解耦底层实现,未来可替换为 Kafka/NATS - 与现有 SharedWorkspace 互补:Workspace 用于共享状态,MessageBus 用于事件通知 --- ## High-Level Technical Design ```mermaid flowchart TB subgraph Chat["Chat 模式(新增)"] Client[客户端] <-->|WebSocket 双向| WS[WS Handler] WS -->|发送消息| SM[SessionManager] SM -->|加载会话| SP[Session + Messages] SM -->|调用 Agent| Agent[ReAct Engine] Agent -->|AskHumanTool| WS Agent -->|token streaming| WS end subgraph Task["Task 模式(现有)"] API[REST API] -->|提交任务| Dispatcher[TaskDispatcher] Dispatcher -->|分发| Agent end subgraph Adaptive["自适应编排(新增)"] PE[PipelineEngine] -->|执行失败| Reflector[LLM Reflector] Reflector -->|分析原因| Replanner[Replanner] Replanner -->|生成新计划| PE end subgraph MultiAgent["多 Agent 协作(增强)"] Orch[Orchestrator] -->|分配任务| W1[Worker 1] Orch -->|分配任务| W2[Worker 2] W1 <-->|MessageBus| W2 Orch -->|动态协商| Negotiator[Role Negotiator] end Agent --> PE Agent --> Orch style Chat fill:#c8e6c9,color:#1a5e20 style Adaptive fill:#bbdefb,color:#0d47a1 style MultiAgent fill:#fff3e0,color:#e65100 ``` --- ## Scope Boundaries ### In Scope - Session/Message 模型与持久化 - Chat API(REST + WebSocket 双向通信) - AskHumanTool 实现 - LLMGateway.chat_stream() 流式输出 - ReAct 引擎 token streaming - PipelineEngine 反思-重规划闭环 - MessageBus 抽象层与 Redis Streams 实现 - Agent 间点对点通信 ### Deferred to Follow-Up Work - 多租户隔离与用户认证(OAuth/JWT) - Agent 沙箱与工具权限控制 - 声明式 Pipeline YAML 定义 - Pipeline 运行时监控/可视化 - 跨 Agent 记忆共享 - 记忆主动遗忘/压缩策略 - 性能基准测试与混沌测试 - 动态角色协商(Phase 8 仅实现基础通信,协商逻辑留待 Phase 9) --- ## Implementation Units ### U1. Session/Message 模型与持久化 **Goal:** 建立对话基础设施 — Session 生命周期管理 + Message 持久化存储 **Requirements:** R1, R7 **Dependencies:** 无 **Files:** - `src/agentkit/session/__init__.py` (新建) - `src/agentkit/session/models.py` (新建) — Session + Message 数据模型 - `src/agentkit/session/manager.py` (新建) — SessionManager 会话管理器 - `src/agentkit/session/store.py` (新建) — 会话存储(InMemory + Redis 双后端) - `tests/unit/test_session_models.py` (新建) - `tests/unit/test_session_manager.py` (新建) - `tests/unit/test_session_store.py` (新建) **Approach:** - `Session` 模型:`session_id`, `agent_name`, `status` (active/paused/closed), `metadata`, `created_at`, `updated_at`, `ttl` - `Message` 模型:`message_id`, `session_id`, `role` (user/assistant/tool/system), `content`, `tool_call_id` (可选), `agent_name` (可选,多 Agent 场景), `created_at` - `SessionManager`:创建/获取/暂停/关闭会话,追加消息,加载对话历史,超时归档 - `SessionStore`:InMemory(测试用)+ Redis(生产用)双后端,接口统一 - Session 与 Agent 绑定:创建时指定 `agent_name`,后续消息自动路由到该 Agent - 对话历史加载:`get_messages(session_id)` 返回完整消息列表,可直接传入 ReAct 引擎 **Patterns to follow:** `TaskStore` 的 InMemory + Redis 双后端模式(`src/agentkit/server/task_store.py`) **Test scenarios:** - 创建 Session 返回有效 session_id,状态为 active - 追加 Message 后通过 get_messages 获取完整历史 - Session 状态转换:active → paused → active → closed - 关闭的 Session 不接受新 Message - InMemory 和 Redis Store 行为一致 - Session TTL 过期后自动归档为 closed - 获取不存在的 Session 返回 None - 大量消息的会话加载性能(1000+ messages) **Verification:** Session 模型可创建、持久化、加载,Message 可追加和查询,状态转换正确 --- ### U2. Chat API 与 WebSocket 双向通信 **Goal:** 暴露 Chat 模式的 REST + WebSocket API,支持多轮对话和实时交互 **Requirements:** R1, R2, R7, R8 **Dependencies:** U1 **Files:** - `src/agentkit/server/routes/chat.py` (新建) — Chat REST + WebSocket 路由 - `src/agentkit/server/app.py` (修改) — 注册 Chat 路由,注入 SessionManager - `src/agentkit/server/config.py` (修改) — 新增 session 配置 - `tests/unit/test_chat_routes.py` (新建) - `tests/integration/test_chat_e2e.py` (新建) **Approach:** - REST 端点: - `POST /api/v1/chat/sessions` — 创建会话 - `GET /api/v1/chat/sessions/{id}` — 获取会话信息 - `POST /api/v1/chat/sessions/{id}/messages` — 发送消息(同步模式,等待 Agent 完整回复) - `GET /api/v1/chat/sessions/{id}/messages` — 获取对话历史 - `DELETE /api/v1/chat/sessions/{id}` — 关闭会话 - WebSocket 端点:`/ws/chat/{session_id}` — 双向通信 - 客户端 → 服务端:`{"type": "message", "content": "..."}`, `{"type": "cancel"}` - 服务端 → 客户端:`{"type": "token", "content": "..."}`, `{"type": "tool_call", ...}`, `{"type": "ask_human", "question": "..."}`, `{"type": "final_answer", ...}` - Chat 路由与现有 Task 路由共存,同一 Agent 实例可同时服务两种模式 - WebSocket handler 接收用户消息后,加载 Session 历史 + 新消息,调用 Agent.execute_stream() **Patterns to follow:** 现有 `routes/tasks.py` 的 stream 端点 + `routes/ws.py` 的 WebSocket handler **Test scenarios:** - POST /chat/sessions 创建会话返回 session_id - POST /chat/sessions/{id}/messages 发送消息获得 Agent 回复 - GET /chat/sessions/{id}/messages 返回完整对话历史 - WebSocket 连接后发送消息收到 token 流式事件 - WebSocket 发送 cancel 消息取消 Agent 执行 - 关闭的 Session 发送消息返回 400 错误 - 不存在的 Session 返回 404 - 同一 Agent 同时服务 Chat 和 Task 模式 **Verification:** Chat API 端到端可用,WebSocket 双向通信正常,多轮对话历史持久化 --- ### U3. AskHumanTool + 流式 LLM 输出 **Goal:** 实现 Human-in-the-Loop 和 token-by-token 流式输出 **Requirements:** R2, R3, R7 **Dependencies:** U1, U2 **Files:** - `src/agentkit/tools/ask_human.py` (新建) — AskHumanTool - `src/agentkit/tools/__init__.py` (修改) — 导出 AskHumanTool - `src/agentkit/llm/gateway.py` (修改) — 新增 chat_stream() 方法 - `src/agentkit/llm/protocol.py` (修改) — 新增 LLMStreamResponse - `src/agentkit/core/react.py` (修改) — execute_stream() 支持 token 事件 - `src/agentkit/core/config_driven.py` (修改) — Chat 模式下注册 AskHumanTool - `tests/unit/test_ask_human_tool.py` (新建) - `tests/unit/test_llm_streaming.py` (新建) - `tests/unit/test_react_token_streaming.py` (新建) **Approach:** - `AskHumanTool`: - 参数:`question: str`, `options: list[str] | None` - 执行时通过 `context` 获取 WebSocket 连接,向客户端推送问题 - 使用 `asyncio.Future` 等待用户回复,设置超时(默认 60s) - 超时返回默认回复或取消当前 ReAct 循环 - 仅在 Chat 模式下注册(通过 ConfigDrivenAgent 的 mode 参数判断) - `LLMGateway.chat_stream()`: - 新增方法,签名与 `chat()` 一致,返回 `AsyncIterator[LLMStreamChunk]` - `LLMStreamChunk`:`content: str`, `finish_reason: str | None`, `usage: dict | None` - 各 Provider 实现 `_stream_chat()` 方法,调用底层 SDK 的 stream API - 优先实现 OpenAI 和 Anthropic 的流式(覆盖 80% 用例) - ReAct token streaming: - `execute_stream()` 中调用 `llm_gateway.chat_stream()` 替代 `chat()` - 新增 `ReActEvent` 类型:`type="token", content=str` - token 事件直接透传到 WebSocket 客户端 **Patterns to follow:** `HeadroomRetrieveTool` 的条件注册模式;现有 Provider 的 `chat()` 实现模式 **Test scenarios:** - AskHumanTool 执行后向客户端推送问题 - AskHumanTool 收到用户回复后返回结果 - AskHumanTool 超时后返回默认回复 - 非 Chat 模式下 AskHumanTool 不注册 - LLMGateway.chat_stream() 返回 AsyncIterator - 流式输出的内容拼接后与完整输出一致 - ReAct execute_stream() 产出 token 类型事件 - 流式输出中断(取消)时正确清理资源 **Verification:** Agent 可在执行中向用户提问并等待回复,LLM 输出以 token 粒度流式推送 --- ### U4. PipelineEngine 反思-重规划闭环 **Goal:** Pipeline 执行失败后自动反思分析原因,生成修正计划重新执行 **Requirements:** R4, R7 **Dependencies:** 无(独立于 U1-U3) **Files:** - `src/agentkit/orchestrator/reflection.py` (新建) — PipelineReflector + PipelineReplanner - `src/agentkit/orchestrator/pipeline_engine.py` (修改) — 集成反思-重规划 - `src/agentkit/orchestrator/pipeline_schema.py` (修改) — 新增 AdaptiveConfig - `tests/unit/test_pipeline_reflection.py` (新建) - `tests/integration/test_adaptive_pipeline.py` (新建) **Approach:** - `PipelineReflector`: - 输入:失败的 Pipeline + 执行上下文(哪步失败、错误信息、已完成步骤输出) - 调用 LLM 分析失败原因,输出结构化反思报告(failure_type, root_cause, suggested_fix) - failure_type 分类:`input_error`(输入问题)、`resource_error`(资源不可用)、`logic_error`(步骤逻辑错误)、`timeout`(超时) - `PipelineReplanner`: - 输入:原始 Pipeline + 反思报告 - 调用 LLM 生成修正后的 Pipeline(调整步骤顺序、替换失败步骤、增加前置检查) - 保留已完成步骤的结果,仅重新执行失败及后续步骤 - `AdaptiveConfig`: - `enabled: bool = False` — 默认关闭 - `max_reflections: int = 3` — 最大反思次数 - `reflection_model: str = "default"` — 反思使用的 LLM 模型 - `skip_stages: list[str] = []` — 不参与反思的步骤 - PipelineEngine 集成: - 执行失败时检查 AdaptiveConfig.enabled - 启用则触发反思→重规划→重新执行循环 - 每次反思记录到 PipelineResult.metadata **Patterns to follow:** `StepRetryPolicy` 的配置模式;`SagaOrchestrator` 的补偿追踪模式 **Test scenarios:** - Pipeline 执行失败且 adaptive 未启用时,行为与现有一致 - Pipeline 执行失败且 adaptive 启用时,触发反思 - 反思报告包含 failure_type 和 root_cause - 重规划生成的新 Pipeline 保留已完成步骤结果 - 达到 max_reflections 后停止重试 - 反思-重规划成功后 Pipeline 最终完成 - 连续反思仍失败时返回最终失败结果 - 集成测试:模拟资源错误→反思→调整→成功 **Verification:** Pipeline 执行失败后可自动反思并重规划,最终完成或达到最大重试次数 --- ### U5. Orchestrator 自适应任务分解 **Goal:** Orchestrator 支持迭代式任务分解 — 执行→评估→再分解 **Requirements:** R4, R7 **Dependencies:** U4 **Files:** - `src/agentkit/core/orchestrator.py` (修改) — 新增 execute_adaptive() 方法 - `tests/unit/test_orchestrator_adaptive.py` (新建) - `tests/integration/test_orchestrator_adaptive.py` (新建) **Approach:** - `execute_adaptive()` 方法: - 第一轮:与现有 execute() 一致,分解任务并执行 - 评估:LLM 评估子任务结果质量(0-1 评分 + 改进建议) - 如果评估不通过且未达 max_iterations: - 基于评估反馈重新分解未达标的子任务 - 保留已完成的子任务结果 - 执行新分解的子任务 - 如果评估通过或达到 max_iterations:汇总结果返回 - 新增 `OrchestratorConfig`: - `adaptive: bool = False` - `max_iterations: int = 3` - `quality_threshold: float = 0.7` **Patterns to follow:** `DynamicPipeline.execute_loop()` 的迭代模式 **Test scenarios:** - adaptive=False 时行为与现有 execute() 一致 - 第一轮评估通过时直接返回结果 - 第一轮评估不通过时触发再分解 - 再分解保留已完成子任务结果 - 达到 max_iterations 时返回当前最佳结果 - 评估反馈正确传递给 LLM 用于再分解 - 集成测试:复杂任务经两轮分解后完成 **Verification:** Orchestrator 可根据执行结果迭代优化任务分解 --- ### U6. MessageBus 抽象层与 Redis Streams 实现 **Goal:** 新增 Agent 间通信基础设施 — MessageBus 抽象 + Redis Streams 实现 **Requirements:** R5, R7 **Dependencies:** 无(独立于 U1-U5) **Files:** - `src/agentkit/bus/__init__.py` (新建) - `src/agentkit/bus/protocol.py` (新建) — MessageBus Protocol 定义 - `src/agentkit/bus/message.py` (新建) — AgentMessage 消息模型 - `src/agentkit/bus/redis_bus.py` (新建) — Redis Streams 实现 - `src/agentkit/bus/memory_bus.py` (新建) — InMemory 实现(测试用) - `tests/unit/test_bus_protocol.py` (新建) - `tests/unit/test_redis_bus.py` (新建) - `tests/unit/test_memory_bus.py` (新建) **Approach:** - `AgentMessage` 模型: - `message_id: str` — UUID - `sender: str` — 发送者 Agent 名称 - `recipient: str | None` — 接收者(None 为广播) - `topic: str` — 消息主题(如 "task.result", "agent.status") - `payload: dict[str, Any]` — 消息内容 - `timestamp: datetime` — 发送时间 - `correlation_id: str | None` — 关联 ID(请求-响应模式) - `MessageBus` Protocol: - `publish(message: AgentMessage) -> None` — 发布消息 - `subscribe(agent_name: str, handler: Callable) -> None` — 订阅消息 - `unsubscribe(agent_name: str) -> None` — 取消订阅 - `request(message: AgentMessage, timeout: float) -> AgentMessage` — 请求-响应模式 - `broadcast(message: AgentMessage) -> None` — 广播消息 - `RedisMessageBus`: - 使用 Redis Streams(XADD/XREADGROUP)实现 - 每个 Agent 一个 Consumer Group,支持多消费者 - 消息确认机制(XACK),防止消息丢失 - 死信队列:超过重试次数的消息转入死信 - `InMemoryMessageBus`: - asyncio.Queue 实现,测试用 - 行为与 Redis 实现一致 **Patterns to follow:** `SharedWorkspace` 的 Redis + InMemory 双模式;`TaskDispatcher` 的 Redis Queue 模式 **Test scenarios:** - 点对点消息:Agent A 发送消息给 Agent B,B 收到 - 广播消息:Agent A 广播,所有订阅者收到 - 请求-响应:Agent A 发送请求,Agent B 回复,A 收到响应 - 请求超时:Agent A 发送请求,无响应,超时后抛出异常 - 取消订阅后不再收到消息 - InMemory 和 Redis 实现行为一致 - 消息确认:消费者处理完消息后 XACK - 死信队列:消息重试 3 次后转入死信 **Verification:** Agent 间可通过 MessageBus 点对点和广播通信,请求-响应模式正常工作 --- ### U7. Orchestrator 集成 MessageBus **Goal:** Orchestrator 通过 MessageBus 协调 Worker,支持 Worker 间直接通信 **Requirements:** R5, R7 **Dependencies:** U6 **Files:** - `src/agentkit/core/orchestrator.py` (修改) — 注入 MessageBus,Worker 间通信 - `src/agentkit/core/agent_pool.py` (修改) — Agent 注册到 MessageBus - `src/agentkit/server/app.py` (修改) — 初始化 MessageBus - `tests/unit/test_orchestrator_bus.py` (新建) - `tests/integration/test_multi_agent_communication.py` (新建) **Approach:** - Orchestrator 注入 MessageBus: - 创建 Orchestrator 时传入可选的 MessageBus - Worker 执行子任务时,通过 MessageBus 发布进度和中间结果 - Worker 之间可直接通信(如 Agent A 请求 Agent B 的中间结果) - AgentPool 集成: - Agent 创建时自动注册到 MessageBus(订阅自己的消息) - Agent 销毁时取消订阅 - Agent 的 `handle_message()` 方法处理收到的消息 - 消息路由: - 点对点:`recipient="agent_name"` - 广播:`topic="task.progress"` - Orchestrator 订阅所有 Worker 的进度和结果 **Patterns to follow:** `MCPManager` 的生命周期管理模式 **Test scenarios:** - Worker 通过 MessageBus 发布进度,Orchestrator 收到 - Worker A 直接请求 Worker B 的中间结果 - Agent 创建时自动注册到 MessageBus - Agent 销毁时取消订阅 - 无 MessageBus 时 Orchestrator 行为不变(向后兼容) - 集成测试:两 Worker 协作完成需要中间结果交换的任务 **Verification:** Orchestrator 可通过 MessageBus 协调 Worker,Worker 间可直接通信 --- ### U8. Chat 模式集成测试与文档更新 **Goal:** 端到端验证 Chat 模式 + 自适应编排 + 多 Agent 通信的集成 **Requirements:** R1-R8 **Dependencies:** U1-U7 **Files:** - `tests/integration/test_chat_adaptive_e2e.py` (新建) — Chat + 自适应编排 E2E - `tests/integration/test_chat_multi_agent_e2e.py` (新建) — Chat + 多 Agent 协作 E2E - `configs/llm_config.yaml` (修改) — 新增 session/bus/adaptive 配置段 **Approach:** - Chat + 自适应 E2E: - 创建会话 → 发送复杂任务 → Agent 执行 Pipeline → 失败触发反思 → 重规划 → 成功 → 多轮对话继续 - Chat + 多 Agent E2E: - 创建会话 → 发送多 Agent 任务 → Orchestrator 分解 → Worker 通过 MessageBus 协作 → 结果汇总 → 用户追问 - 配置更新: - `session` 段:`backend: redis`, `ttl: 3600` - `bus` 段:`backend: redis_streams` - `adaptive` 段:`enabled: true`, `max_reflections: 3` **Test scenarios:** - Chat + 自适应:多轮对话中 Pipeline 失败后自动反思重规划 - Chat + 多 Agent:用户通过 Chat 模式触发多 Agent 协作 - AskHumanTool:Agent 在执行中向用户提问,用户回复后继续 - 流式输出:Chat 模式下 token-by-token 推送 - 配置加载:新配置段正确解析 **Verification:** 所有新功能端到端可用,配置正确加载 --- ## Risks & Mitigations | 风险 | 影响 | 缓解 | |------|------|------| | WebSocket 双向通信复杂度高 | 中 | 先实现 REST 同步模式,WebSocket 作为增强 | | LLM 流式输出 Provider 适配工作量大 | 中 | 优先适配 OpenAI/Anthropic,其余 Provider 降级为非流式 | | 反思-重规划 LLM 调用增加成本 | 低 | 默认关闭,可配置;反思使用低成本模型 | | Redis Streams 运维复杂度 | 低 | InMemory 实现可用于开发/测试,生产用 Redis | | Session 大量消息加载性能 | 中 | 分页加载 + 摘要压缩(长期消息自动摘要) | ## Open Questions - Q1: Session 消息的分页策略?默认按时间倒序分页,还是按 ReAct 循环分页? - Q2: AskHumanTool 超时后的默认行为?返回默认回复还是抛出异常让 ReAct 处理? - Q3: 反思-重规划是否需要人工确认?自动执行还是需要用户审批后才能重规划? ## System-Wide Impact - **API 层**:新增 5 个 REST 端点 + 1 个 WebSocket 端点 - **ReAct 引擎**:execute_stream() 扩展 token 事件类型,非破坏性变更 - **LLM Gateway**:新增 chat_stream() 方法,所有 Provider 需实现(可降级为非流式) - **Orchestrator**:新增 execute_adaptive(),现有 execute() 不变 - **PipelineEngine**:反思-重规划为可选增强,默认关闭 - **新模块**:`session/`, `bus/` — 两个新子包 - **配置**:新增 session/bus/adaptive 配置段