fischer-agentkit/docs/plans/2026-06-15-001-feat-autonom...

17 KiB
Raw Permalink Blame History

date status origin
2026-06-15 active docs/brainstorms/2026-06-15-autonomous-task-execution-requirements.md

Summary

打通 PlanExecEngine 和 TeamOrchestrator 的执行层,将模拟代码替换为真实的 Agent/ReActEngine 调用,集成 SharedWorkspace 实现步骤间状态传递,并添加 WebSocket 进度事件。用多步研究任务端到端验证闭环。

Problem Frame

AgentKit 的任务规划框架骨架完整(四种推理引擎 + TeamOrchestrator + PipelineEngine但执行层未跑通_execute_phase 返回模拟字符串,_LLMStepAgent 只做单次 LLM 调用不支持工具SharedWorkspace 未集成到执行层。用户提出复杂需求后 Agent 无法真正拆解执行。本计划将已有框架跑通,而非新建能力。


Key Technical Decisions

KTD-1. _LLMStepAgent 替换为 ReActStepExecutor。 现有 _LLMStepAgent 只做单次 llm_gateway.chat() 调用,不支持工具。新建 ReActStepExecutor 类,内部创建 ReActEngine 实例执行步骤,支持工具调用和多步推理。保留 _LLMStepAgent 作为无工具场景的轻量回退。

KTD-2. SharedWorkspace 直接复用。 现有 SharedWorkspacecore/shared_workspace.py)是通用 key-value 存储支持版本控制和分布式锁。PlanExecEngine 直接注入 SharedWorkspace 实例,步骤结果写入 plan:{plan_id}:step:{step_id}:result,无需新建状态管理。

KTD-3. TeamOrchestrator 通过 Expert.agent.execute() 执行。 Expert.agentConfigDrivenAgent 实例,其 execute(TaskMessage) 是 final 方法,内部根据 execution_mode 选择 ReAct/PlanExec/ReWOO/Reflexion 引擎。直接调用即可,无需手动创建 ReActEngine。

KTD-4. 进度事件通过 HandoffTransport -> WebSocket 桥接。 TeamOrchestrator 已通过 _broadcast_event 向 HandoffTransport 发送事件。在 Chat WebSocket handler 中注册 HandoffTransport handler将 team 事件转发为 WebSocket 消息。PlanExecEngine 的步骤事件通过回调函数注入。


Requirements Trace

R-ID Implementation Units
R1 U1, U2
R2 U3
R3 U4
R4 U5
R5 U4
R6 U6
R7 U2
R8 U7
R9 U8

High-Level Technical Design

flowchart TB
    subgraph User Request
        A[用户输入复杂任务] --> B[CostAwareRouter]
    end

    subgraph Routing
        B -->|complexity > 0.7| C[TEAM_COLLAB]
        B -->|0.3-0.7| D[SKILL_REACT / REACT]
    end

    subgraph PlanExecEngine Path
        D --> E[GoalPlanner]
        E --> F[ExecutionPlan]
        F --> G[ReActStepExecutor]
        G -->|per step| H[ReActEngine.execute]
        H --> I[Tool Calls]
        I --> J[SharedWorkspace.write]
        J -->|next step| G
    end

    subgraph TeamOrchestrator Path
        C --> K[ExpertTeam.form]
        K --> L[CollaborationPlan]
        L --> M[_execute_phase]
        M -->|real call| N[Expert.agent.execute]
        N --> O[TaskResult]
        O --> P[_merge_results]
    end

    subgraph Events
        H --> Q[StepEvent callback]
        M --> R[HandoffTransport broadcast]
        Q --> S[WebSocket emit]
        R --> S
    end

Implementation Units

U1. TeamOrchestrator._execute_phase 真实执行

Goal:_execute_phase 从模拟代码改为调用 Expert.agent.execute(TaskMessage) 执行真实任务

Requirements: R1

Dependencies: None

Files:

  • src/agentkit/experts/orchestrator.py — 修改 _execute_phase_run_competitor
  • tests/unit/experts/test_orchestrator.py — 新增/更新测试

Approach:

  1. _execute_phase 中,获取 expert = self._team._experts.get(phase.assigned_expert)
  2. 构建 TaskMessagetask_id=phase.phase_id, agent_name=expert.config.name, task_type="team_phase", input_data={"phase_name": phase.name, "phase_description": phase.description, "team_id": self.team_id}
  3. 从 SharedWorkspace 读取前置阶段结果,注入 input_data["dependency_results"]
  4. 调用 result = await expert.agent.execute(task_msg)
  5. 处理 TaskResult:成功则写入 SharedWorkspace 并广播 phase_completed,失败则广播 phase_failed
  6. 同样修改 _run_competitor,调用 expert.agent.execute() 替代模拟返回

Patterns to follow: BaseAgent.execute() 的 final 方法模式(core/base.pyTaskMessage/TaskResult 协议(core/protocol.py

Test scenarios:

  • Happy path: _execute_phase 调用 expert.agent.execute() 并返回 TaskResult
  • Expert not found: assigned_expert 不在 _experts 中时回退到 lead_expert
  • Execution failure: agent.execute() 返回 FAILED 状态时广播 phase_failed
  • Covers AE3: 两个 Expert 竞争执行,各自调用 agent.execute()

Verification: 单元测试通过mock Agent 返回 TaskResult验证 _execute_phase 正确处理成功/失败


U2. TeamOrchestrator 合并策略从真实结果选择

Goal: COMPETITIVE_PARALLEL 模式下合并策略BEST/VOTE/FUSION从真实 TaskResult 中选择/融合

Requirements: R1, R7

Dependencies: U1

Files:

  • src/agentkit/experts/orchestrator.py — 修改 _merge_results
  • tests/unit/experts/test_orchestrator.py

Approach:

  1. _merge_results 当前接收 list[dict],改为接收 list[tuple[Expert, TaskResult]]
  2. BEST 策略Lead Expert 的 LLM 评估各 TaskResult.output_data选择最佳
  3. VOTE 策略:每个 Expert 的 LLM 对其他结果评分,最高分胜出
  4. FUSION 策略Lead Expert 的 LLM 融合所有 output_data
  5. 无 LLM Gateway 时回退到当前简化逻辑(选择第一个结果)

Patterns to follow: PipelineReflector 的 LLM 调用模式(orchestrator/reflection.py

Test scenarios:

  • BEST: 3 个 Expert 结果Lead Expert 选择最佳
  • VOTE: 3 个 Expert 结果,投票选择
  • FUSION: 3 个 Expert 结果Lead Expert 融合
  • No LLM Gateway: 回退到选择第一个结果

Verification: 单元测试验证三种合并策略从真实 TaskResult 中选择


U3. ReActStepExecutor 替代 _LLMStepAgent

Goal: 新建 ReActStepExecutor内部使用 ReActEngine 执行步骤,支持工具调用和多步推理

Requirements: R2

Dependencies: None

Files:

  • src/agentkit/core/plan_exec_engine.py — 新增 ReActStepExecutor 类,修改 PlanExecutor 使用新执行器
  • tests/unit/core/test_plan_exec_engine.py — 新增测试

Approach:

  1. 新建 ReActStepExecutor 类,构造函数接收 llm_gateway, tools, max_steps=5, model="default", system_prompt=None
  2. execute(task_msg: TaskMessage) -> TaskResult 方法:
    • task_msg.input_data 提取 step_name, step_description, dependency_results
    • 构建 messages[{"role": "user", "content": step_description}]
    • 如有 dependency_results,追加到 content
    • 创建 ReActEngine(llm_gateway, max_steps=max_steps)
    • 调用 react_engine.execute(messages, tools, model, system_prompt)
    • ReActResult.output 包装为 TaskResult(output_data={"content": result.output, "steps": result.total_steps, "tokens": result.total_tokens})
  3. PlanExecutor 新增 step_executor_type 参数:"react"(默认)或 "llm"(回退到 _LLMStepAgent
  4. PlanExecutor._execute_step 根据 step_executor_type 选择执行器

Patterns to follow: ReActEngine.execute() 的签名和返回值(core/react.py_LLMStepAgent 的接口(plan_exec_engine.py

Test scenarios:

  • Happy path: ReActStepExecutor 调用 ReActEngine返回包含工具调用结果的 TaskResult
  • No tools: 无工具时回退到纯 LLM 调用
  • Multi-step: ReActEngine 执行 3 步 think-act-observe 循环
  • Tool failure: 工具调用异常时 ReActEngine 返回 partial status

Verification: 单元测试 mock ReActEngine验证 ReActStepExecutor 正确调用和包装结果


U4. SharedWorkspace 集成到执行层

Goal: PlanExecEngine 和 TeamOrchestrator 通过 SharedWorkspace 传递步骤间状态

Requirements: R3, R5

Dependencies: U1, U3

Files:

  • src/agentkit/core/plan_exec_engine.py — 注入 SharedWorkspace步骤结果写入/读取
  • src/agentkit/experts/orchestrator.py — 阶段结果写入/读取 SharedWorkspace
  • tests/unit/core/test_plan_exec_engine.py
  • tests/unit/experts/test_orchestrator.py

Approach:

  1. PlanExecutor 构造函数新增 workspace: SharedWorkspace | None = None 参数
  2. 步骤完成后:workspace.write(f"plan:{plan_id}:step:{step_id}:result", result_data, agent_id=step_id)
  3. 步骤执行前:从 workspace 读取依赖步骤结果,注入 input_data["dependency_results"]
  4. TeamOrchestrator 构造函数新增 workspace: SharedWorkspace | None = None,默认使用 team._workspace
  5. 阶段完成后写入 workspace.write(f"team:{team_id}:phase:{phase_id}:result", ...)
  6. 阶段执行前读取前置阶段结果

Patterns to follow: ExpertTeam._workspace 的使用模式(experts/team.pySharedWorkspace.write/read APIcore/shared_workspace.py

Test scenarios:

  • PlanExecEngine: 步骤 A 完成后结果写入 workspace步骤 B 执行前从 workspace 读取
  • TeamOrchestrator: 阶段 A 结果写入 workspace阶段 B 读取
  • No workspace: workspace=None 时回退到原有 dependency_results 机制
  • Concurrent write: 两个并行步骤同时写入 workspace版本号递增

Verification: 单元测试验证 workspace 读写和依赖传递


U5. GoalPlanner prompt 调优

Goal: 提升 GoalPlanner 的任务分解质量,确保子任务可执行、依赖关系正确

Requirements: R4

Dependencies: None

Files:

  • src/agentkit/core/goal_planner.py — 优化 LLM prompt 和规则分解逻辑
  • tests/unit/core/test_goal_planner.py

Approach:

  1. 优化 _llm_decompose 的 prompt明确要求输出 JSON 格式,包含 step_id/name/description/dependencies/required_tools 字段
  2. 添加 few-shot 示例:展示"分析竞品并生成报告"的标准分解(搜索→分析→生成)
  3. 规则分解增强:识别"搜索/查找/分析/生成/报告/对比"等常见任务动词,映射到标准步骤模板
  4. 添加分解质量自检LLM 分解后,用第二次 LLM 调用验证步骤是否完整、依赖是否合理
  5. 添加 required_tools 字段到 PlanStep指定步骤需要的工具如搜索步骤需要 web_search

Patterns to follow: 现有 _rule_based_decompose_llm_decompose 模式

Test scenarios:

  • "分析竞品并生成报告" → 3 步分解(搜索→分析→生成),依赖关系正确
  • "搜索最新AI论文" → 1 步分解required_tools=["web_search"]
  • "对比A和B的优缺点" → 2 步分解(分别搜索→对比分析)
  • LLM 分解失败 → 回退到规则分解

Verification: 单元测试验证分解质量和依赖关系


U6. PlanExecEngine 失败重规划集成

Goal: 步骤执行失败时,集成 PipelineReflector/PipelineReplanner 触发自动重规划

Requirements: R6

Dependencies: U3

Files:

  • src/agentkit/core/plan_exec_engine.py — 修改 _execute_plan 失败处理逻辑
  • tests/unit/core/test_plan_exec_engine.py

Approach:

  1. PlanExecutor 已有 _plan_to_pipeline / _pipeline_to_plan 桥接方法plan_exec_engine.py 第549-664行
  2. _execute_plan 的步骤失败分支中:
    • 调用 reflector.reflect(pipeline, pipeline_result, replan_count) 获取 ReflectionReport
    • 调用 replanner.replan(pipeline, pipeline_result, reflection_report) 获取修正后的 Pipeline
    • 将修正后的 Pipeline 转回 ExecutionPlan
    • _merge_completed_results 保留已完成步骤的结果
    • 继续执行修正后的计划
  3. 添加 max_replan_attempts 参数(默认 2超过后回退到单 Agent 模式
  4. 广播 replanning 事件,包含失败原因和修正计划

Patterns to follow: PipelineEngine 的反思-重规划闭环(orchestrator/pipeline_engine.py),现有 _plan_to_pipeline 桥接

Test scenarios:

  • Covers AE2: 搜索步骤失败 → Reflector 分析原因 → Replanner 生成修正计划 → 重新执行成功
  • Max replan exceeded: 连续 2 次重规划仍失败 → 回退到单 Agent
  • Partial completion: 3 步中第 2 步失败,重规划后保留第 1 步结果

Verification: 单元测试 mock Reflector/Replanner验证重规划流程


U7. 多步研究任务端到端验证

Goal: 用"分析竞品并生成报告"场景验证完整闭环

Requirements: R8

Dependencies: U1, U3, U4, U5, U6

Files:

  • tests/integration/test_autonomous_research_task.py — 新增集成测试
  • src/agentkit/core/plan_exec_engine.py — 确保 ReActStepExecutor 与搜索工具集成
  • configs/skills/research.yaml — 新增研究任务 Skill 配置

Approach:

  1. 创建 research Skill 配置,绑定 web_search + web_crawl + ask_human 工具
  2. 集成测试mock LLM Gateway 返回预设响应mock 搜索工具返回预设结果
  3. 验证流程:用户输入 → GoalPlanner 分解 → PlanExecEngine 执行 → SharedWorkspace 状态传递 → 最终报告
  4. 验证步骤间依赖:搜索步骤结果被分析步骤读取
  5. 验证失败重规划:搜索工具返回空结果时触发重规划

Test scenarios:

  • Covers AE1: "分析飞书和钉钉的竞品对比" → 搜索→分析→生成完整报告
  • Tool integration: ReActStepExecutor 调用 web_search 工具
  • Dependency chain: 搜索结果传递到分析步骤
  • Failure recovery: 搜索失败 → 重规划 → 换关键词重新搜索

Verification: 集成测试通过,端到端输出包含搜索结果和分析报告


U8. WebSocket 进度事件

Goal: 执行过程通过 WebSocket 实时推送进度事件

Requirements: R9

Dependencies: U1, U3

Files:

  • src/agentkit/server/routes/chat.py — 注册 HandoffTransport handler转发 team 事件
  • src/agentkit/core/plan_exec_engine.py — 添加 step_event_callback 参数
  • src/agentkit/server/routes/portal.py — 添加 plan/step 事件类型
  • tests/unit/server/test_chat_ws_events.py — 新增测试

Approach:

  1. PlanExecEngine 新增 step_event_callback: Callable[[str, dict], Awaitable[None]] | None 参数
  2. 步骤状态变更时调用 callbackplan_created, step_started, step_completed, step_failed, plan_completed, replanning
  3. Chat WebSocket handler 中,当 ExpertTeam 模式激活时,注册 HandoffTransport handler 将 team 事件转发为 WebSocket 消息
  4. Portal WebSocket 添加新事件类型:plan_step(步骤进度)和 plan_update(计划变更)
  5. 前端 WsServerMessage 类型添加 plan_stepplan_update 事件支持

Patterns to follow: 现有 emit_team_event 模式(server/routes/chat.pyPortal WebSocket 事件格式

Test scenarios:

  • PlanExecEngine: 步骤开始/完成时 callback 被调用,事件类型正确
  • TeamOrchestrator: HandoffTransport 事件转发到 WebSocket
  • Portal: plan_step 事件包含 step_id, step_name, status
  • No callback: callback=None 时不影响执行

Verification: 单元测试验证事件回调被正确调用


Scope Boundaries

In scope:

  • 打通 PlanExecEngine 和 TeamOrchestrator 执行层
  • SharedWorkspace 集成
  • GoalPlanner prompt 调优
  • 失败重规划集成
  • WebSocket 进度事件
  • 多步研究任务验证

Deferred to follow-up work:

  • 执行持久化与断点恢复
  • 自适应执行监控token 预算、耗时趋势)
  • 人机协作规划 UI
  • 计划模板库
  • 前端进度可视化组件

Outside this scope:

  • 动态工具发现与运行时组合
  • 跨任务长期记忆
  • 多层级嵌套计划

Risks & Mitigations

Risk Impact Mitigation
ReActEngine 步骤级执行 token 消耗高 每步骤可能消耗大量 token ReActStepExecutor 默认 max_steps=5限制循环次数
GoalPlanner 分解质量不稳定 复杂任务可能分解不合理 添加分解质量自检 + few-shot 示例
SharedWorkspace 并发写入冲突 并行步骤同时写入可能冲突 SharedWorkspace 内置版本控制和分布式锁
HandoffTransport -> WebSocket 桥接延迟 事件转发可能增加延迟 InProcess 模式下延迟极低asyncio.Queue

Open Questions

  • Deferred to implementation: ReActStepExecutor 的 system_prompt 是否需要根据步骤类型动态生成(如搜索步骤 vs 分析步骤)
  • Deferred to implementation: 前端 WsServerMessage 类型更新是否需要同步修改 chat store 的事件处理逻辑

Sources & Research

  • src/agentkit/core/react.py — ReActEngine 完整实现
  • src/agentkit/core/plan_exec_engine.py — PlanExecEngine 和 _LLMStepAgent
  • src/agentkit/experts/orchestrator.py — TeamOrchestrator mock 实现
  • src/agentkit/experts/team.py — ExpertTeam 和 SharedWorkspace
  • src/agentkit/core/shared_workspace.py — SharedWorkspace API
  • src/agentkit/orchestrator/reflection.py — PipelineReflector / PipelineReplanner
  • src/agentkit/core/goal_planner.py — GoalPlanner
  • src/agentkit/core/protocol.py — TaskMessage / TaskResult 协议
  • src/agentkit/server/routes/chat.py — Chat WebSocket 和 emit_team_event