403 lines
17 KiB
Markdown
403 lines
17 KiB
Markdown
---
|
||
date: 2026-06-15
|
||
status: active
|
||
origin: docs/brainstorms/2026-06-15-autonomous-task-execution-requirements.md
|
||
---
|
||
|
||
## Summary
|
||
|
||
打通 PlanExecEngine 和 TeamOrchestrator 的执行层,将模拟代码替换为真实的 Agent/ReActEngine 调用,集成 SharedWorkspace 实现步骤间状态传递,并添加 WebSocket 进度事件。用多步研究任务端到端验证闭环。
|
||
|
||
## Problem Frame
|
||
|
||
AgentKit 的任务规划框架骨架完整(四种推理引擎 + TeamOrchestrator + PipelineEngine),但执行层未跑通:`_execute_phase` 返回模拟字符串,`_LLMStepAgent` 只做单次 LLM 调用不支持工具,SharedWorkspace 未集成到执行层。用户提出复杂需求后 Agent 无法真正拆解执行。本计划将已有框架跑通,而非新建能力。
|
||
|
||
---
|
||
|
||
## Key Technical Decisions
|
||
|
||
**KTD-1. _LLMStepAgent 替换为 ReActStepExecutor。** 现有 `_LLMStepAgent` 只做单次 `llm_gateway.chat()` 调用,不支持工具。新建 `ReActStepExecutor` 类,内部创建 `ReActEngine` 实例执行步骤,支持工具调用和多步推理。保留 `_LLMStepAgent` 作为无工具场景的轻量回退。
|
||
|
||
**KTD-2. SharedWorkspace 直接复用。** 现有 `SharedWorkspace`(`core/shared_workspace.py`)是通用 key-value 存储,支持版本控制和分布式锁。PlanExecEngine 直接注入 SharedWorkspace 实例,步骤结果写入 `plan:{plan_id}:step:{step_id}:result`,无需新建状态管理。
|
||
|
||
**KTD-3. TeamOrchestrator 通过 Expert.agent.execute() 执行。** `Expert.agent` 是 `ConfigDrivenAgent` 实例,其 `execute(TaskMessage)` 是 final 方法,内部根据 execution_mode 选择 ReAct/PlanExec/ReWOO/Reflexion 引擎。直接调用即可,无需手动创建 ReActEngine。
|
||
|
||
**KTD-4. 进度事件通过 HandoffTransport -> WebSocket 桥接。** TeamOrchestrator 已通过 `_broadcast_event` 向 HandoffTransport 发送事件。在 Chat WebSocket handler 中注册 HandoffTransport handler,将 team 事件转发为 WebSocket 消息。PlanExecEngine 的步骤事件通过回调函数注入。
|
||
|
||
---
|
||
|
||
## Requirements Trace
|
||
|
||
| R-ID | Implementation Units |
|
||
|------|---------------------|
|
||
| R1 | U1, U2 |
|
||
| R2 | U3 |
|
||
| R3 | U4 |
|
||
| R4 | U5 |
|
||
| R5 | U4 |
|
||
| R6 | U6 |
|
||
| R7 | U2 |
|
||
| R8 | U7 |
|
||
| R9 | U8 |
|
||
|
||
---
|
||
|
||
## High-Level Technical Design
|
||
|
||
```mermaid
|
||
flowchart TB
|
||
subgraph User Request
|
||
A[用户输入复杂任务] --> B[CostAwareRouter]
|
||
end
|
||
|
||
subgraph Routing
|
||
B -->|complexity > 0.7| C[TEAM_COLLAB]
|
||
B -->|0.3-0.7| D[SKILL_REACT / REACT]
|
||
end
|
||
|
||
subgraph PlanExecEngine Path
|
||
D --> E[GoalPlanner]
|
||
E --> F[ExecutionPlan]
|
||
F --> G[ReActStepExecutor]
|
||
G -->|per step| H[ReActEngine.execute]
|
||
H --> I[Tool Calls]
|
||
I --> J[SharedWorkspace.write]
|
||
J -->|next step| G
|
||
end
|
||
|
||
subgraph TeamOrchestrator Path
|
||
C --> K[ExpertTeam.form]
|
||
K --> L[CollaborationPlan]
|
||
L --> M[_execute_phase]
|
||
M -->|real call| N[Expert.agent.execute]
|
||
N --> O[TaskResult]
|
||
O --> P[_merge_results]
|
||
end
|
||
|
||
subgraph Events
|
||
H --> Q[StepEvent callback]
|
||
M --> R[HandoffTransport broadcast]
|
||
Q --> S[WebSocket emit]
|
||
R --> S
|
||
end
|
||
```
|
||
|
||
---
|
||
|
||
## Implementation Units
|
||
|
||
### U1. TeamOrchestrator._execute_phase 真实执行
|
||
|
||
**Goal:** 将 `_execute_phase` 从模拟代码改为调用 `Expert.agent.execute(TaskMessage)` 执行真实任务
|
||
|
||
**Requirements:** R1
|
||
|
||
**Dependencies:** None
|
||
|
||
**Files:**
|
||
- `src/agentkit/experts/orchestrator.py` — 修改 `_execute_phase` 和 `_run_competitor`
|
||
- `tests/unit/experts/test_orchestrator.py` — 新增/更新测试
|
||
|
||
**Approach:**
|
||
1. 在 `_execute_phase` 中,获取 `expert = self._team._experts.get(phase.assigned_expert)`
|
||
2. 构建 `TaskMessage`:`task_id=phase.phase_id`, `agent_name=expert.config.name`, `task_type="team_phase"`, `input_data={"phase_name": phase.name, "phase_description": phase.description, "team_id": self.team_id}`
|
||
3. 从 SharedWorkspace 读取前置阶段结果,注入 `input_data["dependency_results"]`
|
||
4. 调用 `result = await expert.agent.execute(task_msg)`
|
||
5. 处理 `TaskResult`:成功则写入 SharedWorkspace 并广播 `phase_completed`,失败则广播 `phase_failed`
|
||
6. 同样修改 `_run_competitor`,调用 `expert.agent.execute()` 替代模拟返回
|
||
|
||
**Patterns to follow:** `BaseAgent.execute()` 的 final 方法模式(`core/base.py`),`TaskMessage`/`TaskResult` 协议(`core/protocol.py`)
|
||
|
||
**Test scenarios:**
|
||
- Happy path: _execute_phase 调用 expert.agent.execute() 并返回 TaskResult
|
||
- Expert not found: assigned_expert 不在 _experts 中时回退到 lead_expert
|
||
- Execution failure: agent.execute() 返回 FAILED 状态时广播 phase_failed
|
||
- Covers AE3: 两个 Expert 竞争执行,各自调用 agent.execute()
|
||
|
||
**Verification:** 单元测试通过,mock Agent 返回 TaskResult,验证 _execute_phase 正确处理成功/失败
|
||
|
||
---
|
||
|
||
### U2. TeamOrchestrator 合并策略从真实结果选择
|
||
|
||
**Goal:** COMPETITIVE_PARALLEL 模式下,合并策略(BEST/VOTE/FUSION)从真实 TaskResult 中选择/融合
|
||
|
||
**Requirements:** R1, R7
|
||
|
||
**Dependencies:** U1
|
||
|
||
**Files:**
|
||
- `src/agentkit/experts/orchestrator.py` — 修改 `_merge_results`
|
||
- `tests/unit/experts/test_orchestrator.py`
|
||
|
||
**Approach:**
|
||
1. `_merge_results` 当前接收 `list[dict]`,改为接收 `list[tuple[Expert, TaskResult]]`
|
||
2. BEST 策略:Lead Expert 的 LLM 评估各 TaskResult.output_data,选择最佳
|
||
3. VOTE 策略:每个 Expert 的 LLM 对其他结果评分,最高分胜出
|
||
4. FUSION 策略:Lead Expert 的 LLM 融合所有 output_data
|
||
5. 无 LLM Gateway 时回退到当前简化逻辑(选择第一个结果)
|
||
|
||
**Patterns to follow:** `PipelineReflector` 的 LLM 调用模式(`orchestrator/reflection.py`)
|
||
|
||
**Test scenarios:**
|
||
- BEST: 3 个 Expert 结果,Lead Expert 选择最佳
|
||
- VOTE: 3 个 Expert 结果,投票选择
|
||
- FUSION: 3 个 Expert 结果,Lead Expert 融合
|
||
- No LLM Gateway: 回退到选择第一个结果
|
||
|
||
**Verification:** 单元测试验证三种合并策略从真实 TaskResult 中选择
|
||
|
||
---
|
||
|
||
### U3. ReActStepExecutor 替代 _LLMStepAgent
|
||
|
||
**Goal:** 新建 ReActStepExecutor,内部使用 ReActEngine 执行步骤,支持工具调用和多步推理
|
||
|
||
**Requirements:** R2
|
||
|
||
**Dependencies:** None
|
||
|
||
**Files:**
|
||
- `src/agentkit/core/plan_exec_engine.py` — 新增 `ReActStepExecutor` 类,修改 `PlanExecutor` 使用新执行器
|
||
- `tests/unit/core/test_plan_exec_engine.py` — 新增测试
|
||
|
||
**Approach:**
|
||
1. 新建 `ReActStepExecutor` 类,构造函数接收 `llm_gateway`, `tools`, `max_steps=5`, `model="default"`, `system_prompt=None`
|
||
2. `execute(task_msg: TaskMessage) -> TaskResult` 方法:
|
||
- 从 `task_msg.input_data` 提取 `step_name`, `step_description`, `dependency_results`
|
||
- 构建 messages:`[{"role": "user", "content": step_description}]`
|
||
- 如有 `dependency_results`,追加到 content
|
||
- 创建 `ReActEngine(llm_gateway, max_steps=max_steps)`
|
||
- 调用 `react_engine.execute(messages, tools, model, system_prompt)`
|
||
- 将 `ReActResult.output` 包装为 `TaskResult(output_data={"content": result.output, "steps": result.total_steps, "tokens": result.total_tokens})`
|
||
3. `PlanExecutor` 新增 `step_executor_type` 参数:`"react"`(默认)或 `"llm"`(回退到 _LLMStepAgent)
|
||
4. `PlanExecutor._execute_step` 根据 `step_executor_type` 选择执行器
|
||
|
||
**Patterns to follow:** `ReActEngine.execute()` 的签名和返回值(`core/react.py`),`_LLMStepAgent` 的接口(`plan_exec_engine.py`)
|
||
|
||
**Test scenarios:**
|
||
- Happy path: ReActStepExecutor 调用 ReActEngine,返回包含工具调用结果的 TaskResult
|
||
- No tools: 无工具时回退到纯 LLM 调用
|
||
- Multi-step: ReActEngine 执行 3 步 think-act-observe 循环
|
||
- Tool failure: 工具调用异常时 ReActEngine 返回 partial status
|
||
|
||
**Verification:** 单元测试 mock ReActEngine,验证 ReActStepExecutor 正确调用和包装结果
|
||
|
||
---
|
||
|
||
### U4. SharedWorkspace 集成到执行层
|
||
|
||
**Goal:** PlanExecEngine 和 TeamOrchestrator 通过 SharedWorkspace 传递步骤间状态
|
||
|
||
**Requirements:** R3, R5
|
||
|
||
**Dependencies:** U1, U3
|
||
|
||
**Files:**
|
||
- `src/agentkit/core/plan_exec_engine.py` — 注入 SharedWorkspace,步骤结果写入/读取
|
||
- `src/agentkit/experts/orchestrator.py` — 阶段结果写入/读取 SharedWorkspace
|
||
- `tests/unit/core/test_plan_exec_engine.py`
|
||
- `tests/unit/experts/test_orchestrator.py`
|
||
|
||
**Approach:**
|
||
1. `PlanExecutor` 构造函数新增 `workspace: SharedWorkspace | None = None` 参数
|
||
2. 步骤完成后:`workspace.write(f"plan:{plan_id}:step:{step_id}:result", result_data, agent_id=step_id)`
|
||
3. 步骤执行前:从 workspace 读取依赖步骤结果,注入 `input_data["dependency_results"]`
|
||
4. `TeamOrchestrator` 构造函数新增 `workspace: SharedWorkspace | None = None`,默认使用 `team._workspace`
|
||
5. 阶段完成后写入 `workspace.write(f"team:{team_id}:phase:{phase_id}:result", ...)`
|
||
6. 阶段执行前读取前置阶段结果
|
||
|
||
**Patterns to follow:** `ExpertTeam._workspace` 的使用模式(`experts/team.py`),`SharedWorkspace.write/read` API(`core/shared_workspace.py`)
|
||
|
||
**Test scenarios:**
|
||
- PlanExecEngine: 步骤 A 完成后结果写入 workspace,步骤 B 执行前从 workspace 读取
|
||
- TeamOrchestrator: 阶段 A 结果写入 workspace,阶段 B 读取
|
||
- No workspace: workspace=None 时回退到原有 dependency_results 机制
|
||
- Concurrent write: 两个并行步骤同时写入 workspace,版本号递增
|
||
|
||
**Verification:** 单元测试验证 workspace 读写和依赖传递
|
||
|
||
---
|
||
|
||
### U5. GoalPlanner prompt 调优
|
||
|
||
**Goal:** 提升 GoalPlanner 的任务分解质量,确保子任务可执行、依赖关系正确
|
||
|
||
**Requirements:** R4
|
||
|
||
**Dependencies:** None
|
||
|
||
**Files:**
|
||
- `src/agentkit/core/goal_planner.py` — 优化 LLM prompt 和规则分解逻辑
|
||
- `tests/unit/core/test_goal_planner.py`
|
||
|
||
**Approach:**
|
||
1. 优化 `_llm_decompose` 的 prompt:明确要求输出 JSON 格式,包含 step_id/name/description/dependencies/required_tools 字段
|
||
2. 添加 few-shot 示例:展示"分析竞品并生成报告"的标准分解(搜索→分析→生成)
|
||
3. 规则分解增强:识别"搜索/查找/分析/生成/报告/对比"等常见任务动词,映射到标准步骤模板
|
||
4. 添加分解质量自检:LLM 分解后,用第二次 LLM 调用验证步骤是否完整、依赖是否合理
|
||
5. 添加 `required_tools` 字段到 PlanStep,指定步骤需要的工具(如搜索步骤需要 web_search)
|
||
|
||
**Patterns to follow:** 现有 `_rule_based_decompose` 和 `_llm_decompose` 模式
|
||
|
||
**Test scenarios:**
|
||
- "分析竞品并生成报告" → 3 步分解(搜索→分析→生成),依赖关系正确
|
||
- "搜索最新AI论文" → 1 步分解,required_tools=["web_search"]
|
||
- "对比A和B的优缺点" → 2 步分解(分别搜索→对比分析)
|
||
- LLM 分解失败 → 回退到规则分解
|
||
|
||
**Verification:** 单元测试验证分解质量和依赖关系
|
||
|
||
---
|
||
|
||
### U6. PlanExecEngine 失败重规划集成
|
||
|
||
**Goal:** 步骤执行失败时,集成 PipelineReflector/PipelineReplanner 触发自动重规划
|
||
|
||
**Requirements:** R6
|
||
|
||
**Dependencies:** U3
|
||
|
||
**Files:**
|
||
- `src/agentkit/core/plan_exec_engine.py` — 修改 `_execute_plan` 失败处理逻辑
|
||
- `tests/unit/core/test_plan_exec_engine.py`
|
||
|
||
**Approach:**
|
||
1. `PlanExecutor` 已有 `_plan_to_pipeline` / `_pipeline_to_plan` 桥接方法(plan_exec_engine.py 第549-664行)
|
||
2. 在 `_execute_plan` 的步骤失败分支中:
|
||
- 调用 `reflector.reflect(pipeline, pipeline_result, replan_count)` 获取 ReflectionReport
|
||
- 调用 `replanner.replan(pipeline, pipeline_result, reflection_report)` 获取修正后的 Pipeline
|
||
- 将修正后的 Pipeline 转回 ExecutionPlan
|
||
- 用 `_merge_completed_results` 保留已完成步骤的结果
|
||
- 继续执行修正后的计划
|
||
3. 添加 `max_replan_attempts` 参数(默认 2),超过后回退到单 Agent 模式
|
||
4. 广播 `replanning` 事件,包含失败原因和修正计划
|
||
|
||
**Patterns to follow:** `PipelineEngine` 的反思-重规划闭环(`orchestrator/pipeline_engine.py`),现有 `_plan_to_pipeline` 桥接
|
||
|
||
**Test scenarios:**
|
||
- Covers AE2: 搜索步骤失败 → Reflector 分析原因 → Replanner 生成修正计划 → 重新执行成功
|
||
- Max replan exceeded: 连续 2 次重规划仍失败 → 回退到单 Agent
|
||
- Partial completion: 3 步中第 2 步失败,重规划后保留第 1 步结果
|
||
|
||
**Verification:** 单元测试 mock Reflector/Replanner,验证重规划流程
|
||
|
||
---
|
||
|
||
### U7. 多步研究任务端到端验证
|
||
|
||
**Goal:** 用"分析竞品并生成报告"场景验证完整闭环
|
||
|
||
**Requirements:** R8
|
||
|
||
**Dependencies:** U1, U3, U4, U5, U6
|
||
|
||
**Files:**
|
||
- `tests/integration/test_autonomous_research_task.py` — 新增集成测试
|
||
- `src/agentkit/core/plan_exec_engine.py` — 确保 ReActStepExecutor 与搜索工具集成
|
||
- `configs/skills/research.yaml` — 新增研究任务 Skill 配置
|
||
|
||
**Approach:**
|
||
1. 创建 `research` Skill 配置,绑定 `web_search` + `web_crawl` + `ask_human` 工具
|
||
2. 集成测试:mock LLM Gateway 返回预设响应,mock 搜索工具返回预设结果
|
||
3. 验证流程:用户输入 → GoalPlanner 分解 → PlanExecEngine 执行 → SharedWorkspace 状态传递 → 最终报告
|
||
4. 验证步骤间依赖:搜索步骤结果被分析步骤读取
|
||
5. 验证失败重规划:搜索工具返回空结果时触发重规划
|
||
|
||
**Test scenarios:**
|
||
- Covers AE1: "分析飞书和钉钉的竞品对比" → 搜索→分析→生成完整报告
|
||
- Tool integration: ReActStepExecutor 调用 web_search 工具
|
||
- Dependency chain: 搜索结果传递到分析步骤
|
||
- Failure recovery: 搜索失败 → 重规划 → 换关键词重新搜索
|
||
|
||
**Verification:** 集成测试通过,端到端输出包含搜索结果和分析报告
|
||
|
||
---
|
||
|
||
### U8. WebSocket 进度事件
|
||
|
||
**Goal:** 执行过程通过 WebSocket 实时推送进度事件
|
||
|
||
**Requirements:** R9
|
||
|
||
**Dependencies:** U1, U3
|
||
|
||
**Files:**
|
||
- `src/agentkit/server/routes/chat.py` — 注册 HandoffTransport handler,转发 team 事件
|
||
- `src/agentkit/core/plan_exec_engine.py` — 添加 step_event_callback 参数
|
||
- `src/agentkit/server/routes/portal.py` — 添加 plan/step 事件类型
|
||
- `tests/unit/server/test_chat_ws_events.py` — 新增测试
|
||
|
||
**Approach:**
|
||
1. PlanExecEngine 新增 `step_event_callback: Callable[[str, dict], Awaitable[None]] | None` 参数
|
||
2. 步骤状态变更时调用 callback:`plan_created`, `step_started`, `step_completed`, `step_failed`, `plan_completed`, `replanning`
|
||
3. Chat WebSocket handler 中,当 ExpertTeam 模式激活时,注册 HandoffTransport handler 将 team 事件转发为 WebSocket 消息
|
||
4. Portal WebSocket 添加新事件类型:`plan_step`(步骤进度)和 `plan_update`(计划变更)
|
||
5. 前端 `WsServerMessage` 类型添加 `plan_step` 和 `plan_update` 事件支持
|
||
|
||
**Patterns to follow:** 现有 `emit_team_event` 模式(`server/routes/chat.py`),Portal WebSocket 事件格式
|
||
|
||
**Test scenarios:**
|
||
- PlanExecEngine: 步骤开始/完成时 callback 被调用,事件类型正确
|
||
- TeamOrchestrator: HandoffTransport 事件转发到 WebSocket
|
||
- Portal: plan_step 事件包含 step_id, step_name, status
|
||
- No callback: callback=None 时不影响执行
|
||
|
||
**Verification:** 单元测试验证事件回调被正确调用
|
||
|
||
---
|
||
|
||
## Scope Boundaries
|
||
|
||
**In scope:**
|
||
- 打通 PlanExecEngine 和 TeamOrchestrator 执行层
|
||
- SharedWorkspace 集成
|
||
- GoalPlanner prompt 调优
|
||
- 失败重规划集成
|
||
- WebSocket 进度事件
|
||
- 多步研究任务验证
|
||
|
||
**Deferred to follow-up work:**
|
||
- 执行持久化与断点恢复
|
||
- 自适应执行监控(token 预算、耗时趋势)
|
||
- 人机协作规划 UI
|
||
- 计划模板库
|
||
- 前端进度可视化组件
|
||
|
||
**Outside this scope:**
|
||
- 动态工具发现与运行时组合
|
||
- 跨任务长期记忆
|
||
- 多层级嵌套计划
|
||
|
||
---
|
||
|
||
## Risks & Mitigations
|
||
|
||
| Risk | Impact | Mitigation |
|
||
|------|--------|-----------|
|
||
| ReActEngine 步骤级执行 token 消耗高 | 每步骤可能消耗大量 token | ReActStepExecutor 默认 max_steps=5,限制循环次数 |
|
||
| GoalPlanner 分解质量不稳定 | 复杂任务可能分解不合理 | 添加分解质量自检 + few-shot 示例 |
|
||
| SharedWorkspace 并发写入冲突 | 并行步骤同时写入可能冲突 | SharedWorkspace 内置版本控制和分布式锁 |
|
||
| HandoffTransport -> WebSocket 桥接延迟 | 事件转发可能增加延迟 | InProcess 模式下延迟极低(asyncio.Queue) |
|
||
|
||
---
|
||
|
||
## Open Questions
|
||
|
||
- **Deferred to implementation:** ReActStepExecutor 的 system_prompt 是否需要根据步骤类型动态生成(如搜索步骤 vs 分析步骤)
|
||
- **Deferred to implementation:** 前端 WsServerMessage 类型更新是否需要同步修改 chat store 的事件处理逻辑
|
||
|
||
---
|
||
|
||
## Sources & Research
|
||
|
||
- `src/agentkit/core/react.py` — ReActEngine 完整实现
|
||
- `src/agentkit/core/plan_exec_engine.py` — PlanExecEngine 和 _LLMStepAgent
|
||
- `src/agentkit/experts/orchestrator.py` — TeamOrchestrator mock 实现
|
||
- `src/agentkit/experts/team.py` — ExpertTeam 和 SharedWorkspace
|
||
- `src/agentkit/core/shared_workspace.py` — SharedWorkspace API
|
||
- `src/agentkit/orchestrator/reflection.py` — PipelineReflector / PipelineReplanner
|
||
- `src/agentkit/core/goal_planner.py` — GoalPlanner
|
||
- `src/agentkit/core/protocol.py` — TaskMessage / TaskResult 协议
|
||
- `src/agentkit/server/routes/chat.py` — Chat WebSocket 和 emit_team_event
|