fischer-agentkit/docs/plans/2026-06-24-004-feat-long-ho...

522 lines
32 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
title: "feat: 长程任务可靠性优化 — 中间件管道、循环检测、并发限制、检查点、状态卸载"
status: active
date: 2026-06-24
type: feat
origin: "DeerFlow 2.0 SuperAgent Harness 架构对比分析2026-06-24 对话)"
---
## Summary
基于 DeerFlow 2.0(字节跳动开源 SuperAgent Harness74K+ Stars的架构对比分析为 AgentKit 补齐长程任务(持续数分钟到数小时)可靠性的 5 个关键缺口:
1. **循环检测LoopDetection** — ReAct 循环内滑动窗口 hash 检测重复工具调用,消除 ~30% 的 token 浪费。
2. **子代理并发限制SubagentLimit** — Expert Team 同层并行阶段加 Semaphore避免 LLM 限流洪峰。
3. **主动压缩触发Headroom 压缩)** — 基于 token 用量预测主动触发压缩,避免单次请求超限。
4. **SharedWorkspace Redis 化 + 状态卸载** — 阶段输出主动卸载到 Redis/磁盘,上下文只保留摘要,长程任务 token 降 50%+。
5. **中间件管道架构** — 统一中间件协议(洋葱模型),将散落的横切关注点(压缩/计量/安全/循环检测)集中化,并行接入验证后移除旧路径。
6. **Pipeline 检查点与断点续跑** — 阶段级 checkpoint崩溃后可从最后完成阶段恢复。
7. **技能加载审计** — 确认 disclosure_level 运行时是否真正按需加载,补齐渐进式加载缺口(若存在)。
明确**不**做:全盘迁移 LangGraph自研架构保持灵活、重写现有编排逻辑、Docker 沙箱默认引入仅文档化边界、IM 原生集成标记为可选插件、ACP 集成MCP 已覆盖)。
## Problem Frame
DeerFlow 2.0 的核心能力是让 Agent 自主执行持续数小时的复杂工作流而不"迷失"。其工程基础是 14 层有序中间件管道(洋葱模型)+ 文件系统优先的状态管理 + 沙箱隔离 + 循环检测 + 子代理并发限制。
对照 AgentKit 现状:
| DeerFlow 能力 | AgentKit 现状 | 差距 |
|---|---|---|
| 14 层有序中间件管道 | 横切关注点硬编码在 `ReActEngine._execute_loop``execute_stream` 内,无统一管道 | **缺失** |
| LoopDetectionMiddleware滑动窗口 hash | ReAct 循环无循环检测,只有 `max_steps` 兜底 | **缺失** |
| SubagentLimitMiddlewaremax 3/轮) | `asyncio.gather` 同层全部并行,无 Semaphore | **缺失** |
| SummarizationMiddleware主动触发 | 压缩在 ReAct 内被动调用,阈值 8000 tokens 硬编码 | **部分**(被动 vs 主动) |
| 文件系统优先状态管理 | SharedWorkspace 默认内存 dict不持久化 | **缺失** |
| Checkpoint节点级断点续跑 | PipelineStateManager 只写不读,无 resume 逻辑 | **缺失** |
| 技能渐进式加载(先元数据,触发时加载完整) | disclosure_level 字段存在但运行时不使用,启动时一次性全量加载 | **部分**(字段有,逻辑无) |
关键洞察AgentKit 的自研编排(拓扑排序 + Board 辩论 + 4 层记忆 + 自演化)比 DeerFlow 更丰富,但**长程任务护栏**(循环检测、并发限制、统一中间件、检查点)是明显短板。补齐这些护栏是让 AgentKit 也能支撑"数小时自主执行"的关键。
## Requirements
- **R1**ReAct 循环(非流式 `_execute_loop` + 流式 `execute_stream`)内加入循环检测,滑动窗口检测最近 N 步的 `(tool_name, arguments_hash)` 重复,触发时注入"你正在重复调用 X请改变策略"系统消息而非直接中断。
- **R2**`TeamOrchestrator` 同层并行阶段(`asyncio.gather`)加 `asyncio.Semaphore`,默认 max 3 并发,可配置;辩论阶段并行同样加限制。
- **R3**`ContextCompressor` 包装为基于 headroom 预测的主动压缩——当 token 用量 / 模型上限 > 80% 时主动触发压缩,而非固定阈值 8000。
- **R4**`SharedWorkspace` 在 ExpertTeam 创建时传入 Redis client复用 `app.state.working_redis_client`),阶段输出主动写入 Redis上下文中只保留摘要 + 引用路径。
- **R5**:新增 `core/middleware.py`,定义 `Middleware` 协议(`before`/`after`+ `MiddlewareChain`(洋葱模型,有序执行);将现有 ContextCompressor、终端安全、循环检测包装为中间件并行接入新旧共存验证后移除旧路径。
- **R6**:新增 `orchestrator/checkpoint.py`,实现 `PipelineCheckpoint`阶段级Redis 存储,键 `agentkit:pipeline:checkpoint:{plan_id}:{phase_id}`TTL 7 天);`TeamOrchestrator` 在阶段完成后 save checkpoint新增 `resume(plan_id)` 方法从 checkpoint 恢复。
- **R7**:审计 `SkillRegistry` 运行时加载路径,确认 `disclosure_level` 是否真正按需加载;若非,在 `build_skill_system_prompt` 中实现 Level 0 概要注入 + ReAct 循环内 `skill_search` 工具按需加载完整内容。
- **R8**:所有优化项配置化,新增 `agentkit.yaml``pipeline` 节(`checkpoint.enabled`、`max_concurrent_phases`、`loop_detection.window_size` 等),遵循 `ServerConfig.from_dict` 模式。
- **R9**每个优化项附最小自检测试ponytail 规则),参考 `test_pipeline_state.py``TestPipelineStateRedis` 类模式AsyncMock + 直接注入 mock
## Key Technical Decisions
### KTD1中间件管道并行接入不直接替换现有请求流
**决策**:中间件管道与现有 `ReActEngine._execute_loop` / `execute_stream` 内的横切逻辑并行运行,通过 feature flag 控制(`pipeline.middleware.enabled`),验证稳定后再移除旧路径。
**理由**:直接替换风险高——`_execute_loop`(行 281-831`execute_stream`(行 833-1504各有 500+ 行横切逻辑压缩、trace、memory、telemetry深度交织。并行接入允许灰度验证单点故障不影响现有功能。
**代价**:短期内代码重复(新旧路径共存),工作量翻倍。可接受——长程任务可靠性是核心能力,不值得为省工作量冒回归风险。
### KTD2循环检测不直接中断注入纠正消息
**决策**:循环检测触发时,不抛异常或 break 循环,而是向 conversation 注入系统消息"你正在重复调用 {tool_name},请改变策略",给 LLM 一次自我纠正机会。连续 2 次检测仍重复则强制中断。
**理由**DeerFlow 的 LoopDetectionMiddleware 也是先警告后中断。直接中断会丢失已有上下文,且 LLM 可能只是"卡顿"而非真正死循环(如等待外部资源)。纠正消息是最低成本的恢复机制。
**代价**:多消耗 1-2 轮 LLM 调用。可接受——比无限循环烧 token 好。
### KTD3检查点粒度为阶段级Phase-level不做节点级
**决策**`PipelineCheckpoint` 只在 `PlanPhase` 完成后保存(`_execute_execution_phase` 行 633 后),保存 `plan_id`、`phase_id`、`phase.result`、`plan.topological_sort()` 当前进度。不做 ReAct 循环内的节点级 checkpoint。
**理由**:节点级 checkpoint类似 LangGraph需要状态模式强约束与 AgentKit 的 `dict[str, Any]` 上下文模型冲突,实现复杂度高。阶段级已满足"崩溃后从最后完成阶段恢复"的核心需求,覆盖 90% 的长程任务恢复场景。
**代价**:崩溃在阶段中间时,该阶段需重跑。可接受——单阶段通常 1-5 分钟,重跑成本低于全量重跑。
### KTD4SharedWorkspace Redis 化,但不引入磁盘文件系统
**决策**`ExpertTeam` 创建时传入 Redis client复用 `app.state.working_redis_client`),阶段输出写入 Redis。不引入 DeerFlow 式的磁盘文件系统(`/mnt/skills/`、`/workspace/`)。
**理由**AgentKit 已有 Redis 基础设施(`PipelineStateRedis` 的 `_safe_redis_call` + Memory fallback 模式成熟),复用成本最低。磁盘文件系统需要容器化部署配合,违反 ponytail"不引入新依赖")。
**代价**Redis 存储有 TTL 限制7 天),不适合永久归档。可接受——阶段输出是中间产物,永久归档应由 EpisodicMemoryPG+pgvector负责。
### KTD5技能渐进式加载——先审计按需实现
**决策**U5 首先是审计单元——确认 `disclosure_level` 在运行时是否被使用。若已按需加载,该单元无代码产出(仅审计报告)。若未按需加载,在 `build_skill_system_prompt` 实现 Level 0 概要注入 + ReAct 循环内新增 `skill_search` 工具。
**理由**:研究显示 `disclosure_level` 字段存在但运行时默认为 1全量加载疑似未真正按需加载。但需审计确认后再决定是否实现避免过度工程。
**代价**U5 可能无代码产出。可接受——审计结论本身有价值(确认差距是否存在)。
---
## Implementation Units
### U1. 循环检测LoopDetection
**Goal**:在 ReAct 循环内加入滑动窗口 hash 检测,识别重复工具调用模式,触发纠正消息。
**Requirements**R1
**Dependencies**:无(独立优化项)
**Files**
- `src/agentkit/core/react.py`(修改:`_execute_loop` 行 388-392 + `execute_stream` 行 948-952 插入检测逻辑)
- `tests/unit/test_react_engine.py`(修改:新增 `TestLoopDetection` 类)
**Approach**
-`ReActEngine.__init__` 增加 `_loop_window: deque = deque(maxlen=5)``_loop_threshold: int = 2`
- 检测逻辑:每步工具调用后,计算 `(tool_name, json.dumps(arguments, sort_keys=True))` 的 hash存入 `_loop_window`
- 若窗口内同一 hash 出现次数 ≥ `_loop_threshold`,向 conversation 注入系统消息
- 连续 2 次检测仍重复,抛 `LoopDetectedError`(新增异常,继承 `TaskCancelledError`
- 插入点:`react.py` 行 388`step += 1` 后)、行 948流式对应位置
- 配置化:`loop_detection.window_size`、`loop_detection.threshold`agentkit.yaml `pipeline` 节)
**Patterns to follow**`cancellation_token.check()`(行 391的协作式检查模式——检测但不阻塞给 LLM 纠正机会。
**Test scenarios**
- **Happy path**正常工具调用不触发检测5 步内不同工具调用,无系统消息注入
- **Edge case**:连续 3 次相同工具 + 相同参数 → 第 2 次注入纠正消息,第 3 次抛 `LoopDetectedError`
- **Edge case**:相同工具不同参数 → 不触发检测(参数 hash 不同)
- **Error path**`LoopDetectedError` 被 `_execute_loop` 的 try/except 捕获,返回 `ReActResult``error` 字段
- **Integration**:流式路径 `execute_stream` 同样触发检测,通过 SSE 事件 `step` 广播纠正消息
**Verification**:运行 `python3 -m pytest tests/unit/test_react_engine.py::TestLoopDetection -x -q` 全部通过;模拟重复调用场景验证检测生效。
---
### U2. 子代理并发限制SubagentLimit
**Goal**Expert Team 同层并行阶段加 Semaphore避免 LLM 限流洪峰。
**Requirements**R2
**Dependencies**:无(独立优化项)
**Files**
- `src/agentkit/experts/orchestrator.py`(修改:`__init__` 行 75-84 + `execute` 行 205-208 + `_execute_debate_phase` 行 949-955
- `tests/unit/experts/test_team_orchestrator.py`(修改:新增 `TestConcurrencyLimit` 类)
**Approach**
- `TeamOrchestrator.__init__` 增加 `max_concurrent_phases: int = 3`,创建 `self._phase_semaphore = asyncio.Semaphore(max_concurrent_phases)`
- 行 205-208 的 `asyncio.gather` 改为包裹 `_bounded_execute(phase)`
```
async def _bounded_execute(phase):
async with self._phase_semaphore:
return await self._execute_phase(phase, plan)
```
- 辩论阶段行 949-955 同样包裹 `_bounded_debate_arg(expert)`
- 配置化:`pipeline.max_concurrent_phases`agentkit.yaml默认 3
- `MAX_PHASES = 10`(行 67保持不变作为阶段总数上限
**Patterns to follow**`chat.py` 行 761 `_MAX_CONCURRENT_TASKS = 4` 的 per-session 并发限制模式。
**Test scenarios**
- **Happy path**3 个阶段同层全部并行执行semaphore 不阻塞)
- **Edge case**5 个阶段同层,验证最多 3 个同时执行(用 `asyncio.Event` 同步验证并发数)
- **Edge case**`max_concurrent_phases=1` 时退化为串行执行
- **Error path**:某阶段失败不影响 semaphore 释放(`async with` 保证释放)
- **Integration**:辩论阶段 4 个专家并行,验证 semaphore 限制同样生效
**Verification**:运行 `python3 -m pytest tests/unit/experts/test_team_orchestrator.py::TestConcurrencyLimit -x -q` 全部通过。
---
### U3. 主动压缩触发Headroom 压缩)
**Goal**:基于 token 用量预测主动触发压缩,避免单次请求超限。
**Requirements**R3
**Dependencies**:无(独立优化项)
**Files**
- `src/agentkit/core/compressor.py`(修改:新增 `HeadroomCompressionMiddleware` 类)
- `src/agentkit/core/react.py`(修改:`_should_compress` 行 1652-1663 改为基于 headroom 预测)
- `tests/unit/test_react_engine.py`(修改:新增 `TestHeadroomCompression` 类)
**Approach**
- `ContextCompressor` 新增 `model_context_limit: int` 参数(默认 128000从 LLMConfig 读取)
- `_should_compress` 改为:`estimate_tokens(conversation) / model_context_limit > 0.8`
- 保留固定阈值 8000 作为下限(小模型场景)
- 压缩触发后,将压缩前的 conversation checkpoint 到 SharedWorkspace与 U4 协同)
- 配置化:`pipeline.compression.headroom_threshold`(默认 0.8)、`pipeline.compression.min_tokens`(默认 8000
**Patterns to follow**:现有 `_should_compress`(行 1652-1663的阈值判断模式只是阈值来源从硬编码改为 headroom 比例。
**Test scenarios**
- **Happy path**conversation 100K tokensmodel_limit 128K → 触发压缩100K/128K = 0.78 > 0.8? 否,不触发;调整测试数据为 110K → 0.86 > 0.8 触发)
- **Edge case**conversation 5K tokensmodel_limit 128K → 不触发(低于 min_tokens 8000
- **Edge case**model_limit 8K小模型conversation 7K → 触发7K/8K = 0.875 > 0.8
- **Error path**:压缩器不可用(`is_available()=False`)→ 跳过压缩,记录 warning
**Verification**:运行 `python3 -m pytest tests/unit/test_react_engine.py::TestHeadroomCompression -x -q` 全部通过。
---
### U4. SharedWorkspace Redis 化 + 状态卸载
**Goal**ExpertTeam 创建时传入 Redis client阶段输出主动写入 Redis上下文只保留摘要。
**Requirements**R4
**Dependencies**:无(独立优化项,但与 U6 中间件管道协同)
**Files**
- `src/agentkit/experts/team.py`(修改:`__init__` 行 67 接收 `redis_client` 参数)
- `src/agentkit/server/routes/chat.py`(修改:`_execute_team_collab` 行 407-410 传入 `app.state.working_redis_client`
- `src/agentkit/experts/orchestrator.py`(修改:`_execute_execution_phase` 行 633 后增加上下文摘要替换)
- `tests/unit/test_orchestrator.py`(修改:新增 `TestSharedWorkspaceRedis` 类)
**Approach**
- `ExpertTeam.__init__` 增加 `redis_client: Any = None` 参数,传给 `SharedWorkspace(redis_client=redis_client)`
- `chat.py` 行 407 `ExpertTeam(...)` 传入 `redis_client=app_state.working_redis_client`
- `orchestrator.py` 行 633 `workspace.write` 后,将 `phase.result` 的完整内容写入 Redis上下文中只保留摘要 + 引用路径:
```
summary = result.get("content", "")[:500] + "..."
ref_key = f"{plan.id}/phase/{phase.id}/output"
# 后续阶段读取依赖输出时,从 Redis 读取完整内容
```
- `orchestrator.py` 行 496-501 读取依赖输出的逻辑改为:先从内存 `plan.phases` 读,若不存在从 Redis `workspace.read`
**Patterns to follow**`PipelineStateRedis._safe_redis_call`(行 183-223的 Redis + Memory fallback 模式——Redis 失败时降级到内存 dict不阻断执行。
**Test scenarios**
- **Happy path**阶段完成后Redis 中存在 `{plan_id}/phase/{phase_id}/output` 键,值为完整 result
- **Edge case**Redis 不可用 → 降级到内存 dict执行不中断`_safe_redis_call` 模式)
- **Edge case**:后续阶段读取依赖输出,内存无(模拟崩溃恢复)→ 从 Redis 读取成功
- **Integration**5 阶段流水线,验证上下文中只保留摘要,完整输出在 Redis
**Verification**:运行 `python3 -m pytest tests/unit/test_orchestrator.py::TestSharedWorkspaceRedis -x -q` 全部通过;手动验证 Redis 中存在阶段输出键。
---
### U5. 技能加载审计与渐进式加载
**Goal**:审计 `disclosure_level` 运行时使用情况;若未按需加载,实现 Level 0 概要注入 + `skill_search` 工具。
**Requirements**R7
**Dependencies**:无(审计单元,可能无代码产出)
**Files**
- `src/agentkit/skills/registry.py`(审计:`get` 方法 + 运行时调用路径)
- `src/agentkit/chat/skill_routing.py`(审计 + 可能修改:`build_skill_system_prompt` 行 101-126
- `src/agentkit/core/react.py`(可能修改:新增 `skill_search` 工具,参考 `_maybe_add_tool_search` 行 1600-1623
- `tests/unit/test_skill_routing.py`(可能新增:`TestProgressiveSkillLoading` 类)
**Approach**
- **审计阶段**
1. 确认 `SkillLoader.load_from_skill_md``loader.py` 行 89`disclosure_level` 默认值
2. 确认 `build_skill_system_prompt``skill_routing.py` 行 101-126是否根据 `disclosure_level` 动态选择 sections
3. 确认 ReAct 循环内是否有 `skill_search` 工具(类似 `tool_search`
4. 输出审计报告:`disclosure_level` 是否真正按需加载
- **实现阶段(若审计确认未按需加载)**
1. `build_skill_system_prompt` 根据 `disclosure_level=0` 只注入 name + description
2. ReAct 循环内新增 `skill_search(query: str)` 工具,从 SkillRegistry 检索匹配 skill 的完整内容
3. LLM 调用 `skill_search` 后,完整 skill 内容注入到 conversation
- **若审计确认已按需加载**:该单元仅输出审计报告,无代码改动
**Patterns to follow**`_maybe_add_tool_search``react.py` 行 1600-1623`tool_search` 工具实现模式——动态注入工具 schemaLLM 按需调用。
**Test scenarios**(若实现阶段触发):
- **Happy path**`disclosure_level=0` 时system_prompt 只含 skill name + description
- **Happy path**LLM 调用 `skill_search("research")` → 返回 research skill 的完整 instructions
- **Edge case**`skill_search` 查询无匹配 → 返回"无匹配 skill"提示
- **Edge case**`disclosure_level=1`(默认)→ 行为与现状一致(全量加载),向后兼容
**Verification**:审计报告完成;若实现,运行 `python3 -m pytest tests/unit/test_skill_routing.py::TestProgressiveSkillLoading -x -q` 全部通过。
---
### U6. 中间件管道架构
**Goal**:定义统一中间件协议(洋葱模型),将横切关注点集中化,并行接入验证后移除旧路径。
**Requirements**R5
**Dependencies**U1循环检测迁移为中间件、U3压缩迁移为中间件
**Files**
- `src/agentkit/core/middleware.py`(新建:`Middleware` 协议 + `MiddlewareChain` + `RequestContext`
- `src/agentkit/core/react.py`(修改:`execute` / `execute_stream` 入口接入中间件链feature flag 控制)
- `src/agentkit/server/routes/chat.py`修改ReActEngine 创建时注入中间件链)
- `tests/unit/test_middleware.py`(新建:`TestMiddlewareChain` 类)
**Approach**
- **新建 `core/middleware.py`**
```
class RequestContext: # 请求上下文,贯穿中间件链
conversation, tools, system_prompt, trajectory, step, token_usage, ...
class Middleware(Protocol):
async def before(self, ctx: RequestContext) -> RequestContext: ...
async def after(self, ctx: RequestContext, result: Any) -> Any: ...
class MiddlewareChain:
def __init__(self, middlewares: list[Middleware]): ...
async def execute(self, ctx: RequestContext, handler: Callable) -> Any:
# 洋葱模型before 由外到内after 由内到外
```
- **初始中间件集**精简版6 层,非 DeerFlow 的 14 层):
1. `ThreadDataMiddleware` — 初始化 RequestContext
2. `SummarizationMiddleware` — 包装现有 ContextCompressorU3 的 headroom 压缩)
3. `TokenUsageMiddleware` — token 计量
4. `LoopDetectionMiddleware` — 包装 U1 的循环检测
5. `SubagentLimitMiddleware` — 包装 U2 的并发限制(作用于 Expert Team
6. `TerminalSecurityMiddleware` — 包装现有 6 层终端安全
- **并行接入**
- `ReActEngine.__init__` 增加 `middleware_chain: MiddlewareChain | None = None`
- `execute` / `execute_stream` 入口:若 `middleware_chain` 存在且 `pipeline.middleware.enabled=True`,走中间件路径;否则走现有路径
- `chat.py` 行 1066-1068 ReActEngine 创建时注入中间件链
- **迁移顺序**(验证后逐步移除旧路径):
1. 先接入 SummarizationMiddleware + TokenUsageMiddleware低风险
2. 再接入 LoopDetectionMiddleware + SubagentLimitMiddleware中风险
3. 最后接入 TerminalSecurityMiddleware高风险需充分回归
- 配置化:`pipeline.middleware.enabled`(默认 False灰度开启
**Patterns to follow**DeerFlow 的洋葱模型——`before` 由外到内、`after` 由内到外,顺序依赖通过 `@Next`/`@Prev` 装饰器声明(首版可用固定顺序列表简化)。
**Test scenarios**
- **Happy path**3 个中间件按序执行,`before` 顺序 A→B→C`after` 顺序 C→B→A
- **Edge case**:某中间件 `before` 抛异常 → 后续中间件不执行,`after` 链不触发
- **Edge case**`middleware_chain=None` → 走现有路径,行为不变(向后兼容)
- **Integration**`SummarizationMiddleware` 触发压缩后,`LoopDetectionMiddleware` 仍能检测(中间件间状态通过 RequestContext 传递)
- **Integration**feature flag 开关切换,新旧路径行为一致
**Verification**:运行 `python3 -m pytest tests/unit/test_middleware.py -x -q` 全部通过;灰度开启中间件路径,运行现有 ReAct 测试套件验证无回归。
---
### U7. Pipeline 检查点与断点续跑
**Goal**:阶段级 checkpoint崩溃后可从最后完成阶段恢复。
**Requirements**R6
**Dependencies**U4SharedWorkspace Redis 化提供存储基础)
**Files**
- `src/agentkit/orchestrator/checkpoint.py`(新建:`PipelineCheckpoint` 类)
- `src/agentkit/experts/orchestrator.py`(修改:`__init__` 注入 checkpoint + `_execute_execution_phase` 行 633 后 save + 新增 `resume` 方法)
- `src/agentkit/server/routes/tasks.py`(修改:新增 `POST /api/v1/tasks/{id}/resume` 端点)
- `tests/unit/test_pipeline_checkpoint.py`(新建:`TestPipelineCheckpoint` 类)
**Approach**
- **新建 `orchestrator/checkpoint.py`**
```
class PipelineCheckpoint:
def __init__(self, redis_client, prefix="agentkit:pipeline:checkpoint"): ...
async def save(self, plan_id: str, phase: PlanPhase, plan_status: str) -> None:
# 键:{prefix}:{plan_id}:{phase_id}
# 值JSON {phase_id, phase_name, phase_result, phase_status, plan_status, saved_at}
# TTL: 7 天(与 PipelineStateRedis._TTL_SECONDS 一致)
async def load(self, plan_id: str) -> CheckpointData | None:
# 返回最后一个 COMPLETED 阶段的 checkpoint
async def list_checkpoints(self, plan_id: str) -> list[CheckpointData]: ...
async def clear(self, plan_id: str) -> None: ...
```
- **复用 `PipelineStateRedis._safe_redis_call` 模式**Redis 失败降级到内存 dict不阻断执行
- **TeamOrchestrator 接入**
- `__init__` 增加 `checkpoint: PipelineCheckpoint | None = None`
- `_execute_execution_phase` 行 633phase COMPLETED 后)调用 `checkpoint.save(plan.id, phase, plan.status)`
- 新增 `async def resume(self, plan_id: str) -> OrchestrationResult`
1. `checkpoint.load(plan_id)` 获取最后完成阶段
2. 重建 `TeamPlan`,标记已完成阶段为 COMPLETED
3. 从下一未完成阶段继续执行
- **API 端点**`POST /api/v1/tasks/{id}/resume` → 调用 `TeamOrchestrator.resume(plan_id)`
- 配置化:`pipeline.checkpoint.enabled`(默认 False、`pipeline.checkpoint.ttl_seconds`(默认 604800
**Patterns to follow**`PipelineStateRedis``pipeline_state.py` 行 178-314的 Redis + Memory fallback + 键命名 + TTL + JSON 序列化模式。`test_pipeline_state.py` 的 `TestPipelineStateRedis` 类的测试模式AsyncMock + 直接注入 mock
**Test scenarios**
- **Happy path**`save(plan_id, phase, "executing")` 后 `load(plan_id)` 返回该 phase 的 checkpoint 数据
- **Happy path**3 个阶段完成,`load` 返回第 3 个阶段的 checkpoint最后一个 COMPLETED
- **Edge case**`load` 不存在的 plan_id → 返回 None
- **Edge case**Redis 不可用 → 降级到内存 dictsave/load 仍工作
- **Error path**`save` 时 Redis 异常 → 记录 warning不阻断阶段执行`_safe_redis_call` 模式)
- **Integration**`resume(plan_id)` 从 checkpoint 恢复,跳过已完成阶段,执行未完成阶段
- **Integration**checkpoint TTL 过期后 `load` 返回 None
**Verification**:运行 `python3 -m pytest tests/unit/test_pipeline_checkpoint.py -x -q` 全部通过;手动模拟崩溃恢复场景验证 resume 生效。
---
## Scope Boundaries
### In Scope
- ReAct 循环内循环检测U1
- Expert Team 并发限制U2
- 主动压缩触发U3
- SharedWorkspace Redis 化 + 状态卸载U4
- 技能加载审计与渐进式加载U5
- 中间件管道架构U6并行接入
- Pipeline 检查点与断点续跑U7阶段级
- 所有优化项的配置化agentkit.yaml `pipeline` 节)
- 每个优化项的最小自检测试
### Out of Scope
- 全盘迁移到 LangGraph明确不建议自研架构保持灵活
- 重写现有编排逻辑拓扑排序、Board 辩论、4 层记忆等保持不变)
- Docker 沙箱默认引入仅文档化命令级安全的边界Docker 沙箱作为可选插件未来考虑)
- IM 原生集成Telegram/Slack/Feishu标记为可选插件非本期
- ACP 集成MCP 已覆盖更通用的协议)
- 节点级 checkpointKTD3 决策,阶段级已满足核心需求)
- DeerFlow 式磁盘文件系统KTD4 决策,复用 Redis
### Deferred to Follow-Up Work
- 中间件管道的 `@Next`/`@Prev` 装饰器声明顺序依赖(首版用固定顺序列表)
- 全局 LLM 并发限制(`LLMGateway` 内 semaphore本期只做 Expert Team 层)
- ReAct conversation 的 checkpoint本期只做 Pipeline 阶段级)
- 中间件路径移除旧路径(需灰度验证 1-2 周后单独执行)
- `skill_search` 工具的语义检索(本期若实现,用关键词匹配,语义检索延后)
---
## High-Level Technical Design
### 中间件管道洋葱模型
```
请求进入 ReActEngine.execute()
MiddlewareChain.execute(ctx, handler)
┌─ ThreadDataMiddleware.before ─────────────────────────┐
│ ┌─ SummarizationMiddleware.before ────────────────┐ │
│ │ ┌─ TokenUsageMiddleware.before ─────────────┐ │ │
│ │ │ ┌─ LoopDetectionMiddleware.before ────┐ │ │ │
│ │ │ │ ┌─ TerminalSecurityMiddleware.before ┐│ │ │ │
│ │ │ │ │ handler(ctx) ← ReAct 循环 ││ │ │ │
│ │ │ │ └─ TerminalSecurityMiddleware.after ─┘│ │ │ │
│ │ │ └─ LoopDetectionMiddleware.after ────────┘ │ │ │
│ │ └─ TokenUsageMiddleware.after ─────────────────┘ │ │
│ └─ SummarizationMiddleware.after ────────────────────┘ │
└─ ThreadDataMiddleware.after ─────────────────────────────┘
返回结果
```
### 检查点恢复流程
```
TeamOrchestrator.execute(plan)
每阶段完成后: checkpoint.save(plan_id, phase, plan.status)
[崩溃发生]
用户调用 POST /api/v1/tasks/{id}/resume
TeamOrchestrator.resume(plan_id)
├─ checkpoint.load(plan_id) → 获取最后 COMPLETED 阶段
├─ 重建 TeamPlan标记已完成阶段
├─ topological_sort() → 找到下一未完成层
└─ 从下一层继续执行(与 execute 相同的并行+串行逻辑)
```
---
## Risks & Dependencies
### Risks
| 风险 | 概率 | 影响 | 缓解 |
|---|---|---|---|
| 中间件管道重构引入回归 | 中 | 高ReAct 循环是核心路径) | 并行接入 + feature flag 灰度KTD1 |
| 循环检测误判(合法重复调用被中断) | 低 | 中(用户体验) | 先警告后中断KTD2阈值可配置 |
| SharedWorkspace Redis 化后性能下降 | 低 | 中Redis 延迟 vs 内存 dict | `_safe_redis_call` fallback 到内存,异步写入 |
| 检查点序列化大对象导致 Redis 膨胀 | 中 | 低TTL 7 天自动清理) | phase.result 只存摘要 + 引用,完整内容已在 SharedWorkspace |
| U5 审计确认无需改动,单元"无产出" | 中 | 低(审计结论有价值) | KTD5 已明确接受此可能性 |
### Dependencies
- **Redis 基础设施**U4、U7 依赖 Redis 可用(已有 `app.state.working_redis_client`U4 接入)
- **现有测试套件**U6 中间件管道接入后需运行全量 ReAct/Team 测试验证无回归
- **agentkit.yaml 配置**U1-U7 均需新增 `pipeline` 配置节U8 统一处理)
---
## System-Wide Impact
### 影响方
- **终端用户**:长程任务(@team 模式)更稳定,不再因循环/限流崩溃;崩溃后可恢复
- **开发者**:新增中间件扩展点,横切逻辑可插拔;配置项增加 `pipeline`
- **运维**Redis 使用量增加checkpoint + SharedWorkspace需监控内存
- **API 消费者**:新增 `POST /api/v1/tasks/{id}/resume` 端点
### 兼容性
- 所有优化项默认关闭feature flag不影响现有行为
- `agentkit.yaml` 新增 `pipeline` 节,旧配置无此节时取默认值
- `SharedWorkspace` Redis 化后内存模式仍可用fallback
- 中间件管道 `middleware_chain=None` 时走现有路径
---
## Acceptance Examples
- **AE1**:用户发起 @team 任务5 阶段),阶段 3 因 LLM 限流失败 → 系统自动重试U2 并发限制避免洪峰),任务完成
- **AE2**:用户发起 ReAct 任务LLM 连续 3 次调用相同工具相同参数 → 第 2 次注入纠正消息LLM 改变策略任务完成U1 循环检测)
- **AE3**:用户发起 @team 任务10 阶段),阶段 7 时服务崩溃 → 用户调用 `/resume`,从阶段 7 继续任务完成U7 检查点)
- **AE4**用户发起长对话50 轮token 用量接近模型上限 → 系统主动压缩历史对话继续不中断U3 headroom 压缩)
- **AE5**:开发者新增"审计日志"横切逻辑 → 实现 `Middleware` 协议,注入 `MiddlewareChain`,无需修改 ReActEngineU6 中间件管道)
---
## Sources & Research
- **DeerFlow 2.0 架构分析**2026-06-24 对话14 层中间件管道、Lead Agent + Sub-Agent 编排、沙箱隔离、长期记忆、技能渐进式加载
- **DeerFlow 2.0 源码拆解**CSDN, 2026-05-1014 层 Middleware 严格有序、Sub-Agent 并发编排、结构化记忆
- **DeerFlow 2.0 Deep Dive**guancyxx.cn, 2026-05-07Lead Agent + Subagent 模式、SubagentLimitMiddleware、LoopDetectionMiddleware
- **AgentKit 现有架构**`react.py` ReAct 循环、`orchestrator.py` Team 编排、`pipeline_state.py` 状态持久化、`shared_workspace.py` 共享工作区
- **外部研究 load-bearing**是——DeerFlow 的中间件管道设计直接影响了 KTD1并行接入策略、KTD2循环检测不中断、U6中间件协议设计