522 lines
32 KiB
Markdown
522 lines
32 KiB
Markdown
---
|
||
title: "feat: 长程任务可靠性优化 — 中间件管道、循环检测、并发限制、检查点、状态卸载"
|
||
status: active
|
||
date: 2026-06-24
|
||
type: feat
|
||
origin: "DeerFlow 2.0 SuperAgent Harness 架构对比分析(2026-06-24 对话)"
|
||
---
|
||
|
||
## Summary
|
||
|
||
基于 DeerFlow 2.0(字节跳动开源 SuperAgent Harness,74K+ Stars)的架构对比分析,为 AgentKit 补齐长程任务(持续数分钟到数小时)可靠性的 5 个关键缺口:
|
||
|
||
1. **循环检测(LoopDetection)** — ReAct 循环内滑动窗口 hash 检测重复工具调用,消除 ~30% 的 token 浪费。
|
||
2. **子代理并发限制(SubagentLimit)** — Expert Team 同层并行阶段加 Semaphore,避免 LLM 限流洪峰。
|
||
3. **主动压缩触发(Headroom 压缩)** — 基于 token 用量预测主动触发压缩,避免单次请求超限。
|
||
4. **SharedWorkspace Redis 化 + 状态卸载** — 阶段输出主动卸载到 Redis/磁盘,上下文只保留摘要,长程任务 token 降 50%+。
|
||
5. **中间件管道架构** — 统一中间件协议(洋葱模型),将散落的横切关注点(压缩/计量/安全/循环检测)集中化,并行接入验证后移除旧路径。
|
||
6. **Pipeline 检查点与断点续跑** — 阶段级 checkpoint,崩溃后可从最后完成阶段恢复。
|
||
7. **技能加载审计** — 确认 disclosure_level 运行时是否真正按需加载,补齐渐进式加载缺口(若存在)。
|
||
|
||
明确**不**做:全盘迁移 LangGraph(自研架构保持灵活)、重写现有编排逻辑、Docker 沙箱默认引入(仅文档化边界)、IM 原生集成(标记为可选插件)、ACP 集成(MCP 已覆盖)。
|
||
|
||
## Problem Frame
|
||
|
||
DeerFlow 2.0 的核心能力是让 Agent 自主执行持续数小时的复杂工作流而不"迷失"。其工程基础是 14 层有序中间件管道(洋葱模型)+ 文件系统优先的状态管理 + 沙箱隔离 + 循环检测 + 子代理并发限制。
|
||
|
||
对照 AgentKit 现状:
|
||
|
||
| DeerFlow 能力 | AgentKit 现状 | 差距 |
|
||
|---|---|---|
|
||
| 14 层有序中间件管道 | 横切关注点硬编码在 `ReActEngine._execute_loop` 和 `execute_stream` 内,无统一管道 | **缺失** |
|
||
| LoopDetectionMiddleware(滑动窗口 hash) | ReAct 循环无循环检测,只有 `max_steps` 兜底 | **缺失** |
|
||
| SubagentLimitMiddleware(max 3/轮) | `asyncio.gather` 同层全部并行,无 Semaphore | **缺失** |
|
||
| SummarizationMiddleware(主动触发) | 压缩在 ReAct 内被动调用,阈值 8000 tokens 硬编码 | **部分**(被动 vs 主动) |
|
||
| 文件系统优先状态管理 | SharedWorkspace 默认内存 dict,不持久化 | **缺失** |
|
||
| Checkpoint(节点级,断点续跑) | PipelineStateManager 只写不读,无 resume 逻辑 | **缺失** |
|
||
| 技能渐进式加载(先元数据,触发时加载完整) | disclosure_level 字段存在但运行时不使用,启动时一次性全量加载 | **部分**(字段有,逻辑无) |
|
||
|
||
关键洞察:AgentKit 的自研编排(拓扑排序 + Board 辩论 + 4 层记忆 + 自演化)比 DeerFlow 更丰富,但**长程任务护栏**(循环检测、并发限制、统一中间件、检查点)是明显短板。补齐这些护栏是让 AgentKit 也能支撑"数小时自主执行"的关键。
|
||
|
||
## Requirements
|
||
|
||
- **R1**:ReAct 循环(非流式 `_execute_loop` + 流式 `execute_stream`)内加入循环检测,滑动窗口检测最近 N 步的 `(tool_name, arguments_hash)` 重复,触发时注入"你正在重复调用 X,请改变策略"系统消息而非直接中断。
|
||
- **R2**:`TeamOrchestrator` 同层并行阶段(`asyncio.gather`)加 `asyncio.Semaphore`,默认 max 3 并发,可配置;辩论阶段并行同样加限制。
|
||
- **R3**:`ContextCompressor` 包装为基于 headroom 预测的主动压缩——当 token 用量 / 模型上限 > 80% 时主动触发压缩,而非固定阈值 8000。
|
||
- **R4**:`SharedWorkspace` 在 ExpertTeam 创建时传入 Redis client(复用 `app.state.working_redis_client`),阶段输出主动写入 Redis;上下文中只保留摘要 + 引用路径。
|
||
- **R5**:新增 `core/middleware.py`,定义 `Middleware` 协议(`before`/`after`)+ `MiddlewareChain`(洋葱模型,有序执行);将现有 ContextCompressor、终端安全、循环检测包装为中间件;并行接入(新旧共存),验证后移除旧路径。
|
||
- **R6**:新增 `orchestrator/checkpoint.py`,实现 `PipelineCheckpoint`(阶段级,Redis 存储,键 `agentkit:pipeline:checkpoint:{plan_id}:{phase_id}`,TTL 7 天);`TeamOrchestrator` 在阶段完成后 save checkpoint;新增 `resume(plan_id)` 方法从 checkpoint 恢复。
|
||
- **R7**:审计 `SkillRegistry` 运行时加载路径,确认 `disclosure_level` 是否真正按需加载;若非,在 `build_skill_system_prompt` 中实现 Level 0 概要注入 + ReAct 循环内 `skill_search` 工具按需加载完整内容。
|
||
- **R8**:所有优化项配置化,新增 `agentkit.yaml` 的 `pipeline` 节(`checkpoint.enabled`、`max_concurrent_phases`、`loop_detection.window_size` 等),遵循 `ServerConfig.from_dict` 模式。
|
||
- **R9**:每个优化项附最小自检测试(ponytail 规则),参考 `test_pipeline_state.py` 的 `TestPipelineStateRedis` 类模式(AsyncMock + 直接注入 mock)。
|
||
|
||
## Key Technical Decisions
|
||
|
||
### KTD1:中间件管道并行接入,不直接替换现有请求流
|
||
|
||
**决策**:中间件管道与现有 `ReActEngine._execute_loop` / `execute_stream` 内的横切逻辑并行运行,通过 feature flag 控制(`pipeline.middleware.enabled`),验证稳定后再移除旧路径。
|
||
|
||
**理由**:直接替换风险高——`_execute_loop`(行 281-831)和 `execute_stream`(行 833-1504)各有 500+ 行,横切逻辑(压缩、trace、memory、telemetry)深度交织。并行接入允许灰度验证,单点故障不影响现有功能。
|
||
|
||
**代价**:短期内代码重复(新旧路径共存),工作量翻倍。可接受——长程任务可靠性是核心能力,不值得为省工作量冒回归风险。
|
||
|
||
### KTD2:循环检测不直接中断,注入纠正消息
|
||
|
||
**决策**:循环检测触发时,不抛异常或 break 循环,而是向 conversation 注入系统消息"你正在重复调用 {tool_name},请改变策略",给 LLM 一次自我纠正机会。连续 2 次检测仍重复则强制中断。
|
||
|
||
**理由**:DeerFlow 的 LoopDetectionMiddleware 也是先警告后中断。直接中断会丢失已有上下文,且 LLM 可能只是"卡顿"而非真正死循环(如等待外部资源)。纠正消息是最低成本的恢复机制。
|
||
|
||
**代价**:多消耗 1-2 轮 LLM 调用。可接受——比无限循环烧 token 好。
|
||
|
||
### KTD3:检查点粒度为阶段级(Phase-level),不做节点级
|
||
|
||
**决策**:`PipelineCheckpoint` 只在 `PlanPhase` 完成后保存(`_execute_execution_phase` 行 633 后),保存 `plan_id`、`phase_id`、`phase.result`、`plan.topological_sort()` 当前进度。不做 ReAct 循环内的节点级 checkpoint。
|
||
|
||
**理由**:节点级 checkpoint(类似 LangGraph)需要状态模式强约束,与 AgentKit 的 `dict[str, Any]` 上下文模型冲突,实现复杂度高。阶段级已满足"崩溃后从最后完成阶段恢复"的核心需求,覆盖 90% 的长程任务恢复场景。
|
||
|
||
**代价**:崩溃在阶段中间时,该阶段需重跑。可接受——单阶段通常 1-5 分钟,重跑成本低于全量重跑。
|
||
|
||
### KTD4:SharedWorkspace Redis 化,但不引入磁盘文件系统
|
||
|
||
**决策**:`ExpertTeam` 创建时传入 Redis client(复用 `app.state.working_redis_client`),阶段输出写入 Redis。不引入 DeerFlow 式的磁盘文件系统(`/mnt/skills/`、`/workspace/`)。
|
||
|
||
**理由**:AgentKit 已有 Redis 基础设施(`PipelineStateRedis` 的 `_safe_redis_call` + Memory fallback 模式成熟),复用成本最低。磁盘文件系统需要容器化部署配合,违反 ponytail("不引入新依赖")。
|
||
|
||
**代价**:Redis 存储有 TTL 限制(7 天),不适合永久归档。可接受——阶段输出是中间产物,永久归档应由 EpisodicMemory(PG+pgvector)负责。
|
||
|
||
### KTD5:技能渐进式加载——先审计,按需实现
|
||
|
||
**决策**:U5 首先是审计单元——确认 `disclosure_level` 在运行时是否被使用。若已按需加载,该单元无代码产出(仅审计报告)。若未按需加载,在 `build_skill_system_prompt` 实现 Level 0 概要注入 + ReAct 循环内新增 `skill_search` 工具。
|
||
|
||
**理由**:研究显示 `disclosure_level` 字段存在但运行时默认为 1(全量加载),疑似未真正按需加载。但需审计确认后再决定是否实现,避免过度工程。
|
||
|
||
**代价**:U5 可能无代码产出。可接受——审计结论本身有价值(确认差距是否存在)。
|
||
|
||
---
|
||
|
||
## Implementation Units
|
||
|
||
### U1. 循环检测(LoopDetection)
|
||
|
||
**Goal**:在 ReAct 循环内加入滑动窗口 hash 检测,识别重复工具调用模式,触发纠正消息。
|
||
|
||
**Requirements**:R1
|
||
|
||
**Dependencies**:无(独立优化项)
|
||
|
||
**Files**:
|
||
- `src/agentkit/core/react.py`(修改:`_execute_loop` 行 388-392 + `execute_stream` 行 948-952 插入检测逻辑)
|
||
- `tests/unit/test_react_engine.py`(修改:新增 `TestLoopDetection` 类)
|
||
|
||
**Approach**:
|
||
- 在 `ReActEngine.__init__` 增加 `_loop_window: deque = deque(maxlen=5)` 和 `_loop_threshold: int = 2`
|
||
- 检测逻辑:每步工具调用后,计算 `(tool_name, json.dumps(arguments, sort_keys=True))` 的 hash,存入 `_loop_window`
|
||
- 若窗口内同一 hash 出现次数 ≥ `_loop_threshold`,向 conversation 注入系统消息
|
||
- 连续 2 次检测仍重复,抛 `LoopDetectedError`(新增异常,继承 `TaskCancelledError`)
|
||
- 插入点:`react.py` 行 388(`step += 1` 后)、行 948(流式对应位置)
|
||
- 配置化:`loop_detection.window_size`、`loop_detection.threshold`(agentkit.yaml `pipeline` 节)
|
||
|
||
**Patterns to follow**:`cancellation_token.check()`(行 391)的协作式检查模式——检测但不阻塞,给 LLM 纠正机会。
|
||
|
||
**Test scenarios**:
|
||
- **Happy path**:正常工具调用不触发检测,5 步内不同工具调用,无系统消息注入
|
||
- **Edge case**:连续 3 次相同工具 + 相同参数 → 第 2 次注入纠正消息,第 3 次抛 `LoopDetectedError`
|
||
- **Edge case**:相同工具不同参数 → 不触发检测(参数 hash 不同)
|
||
- **Error path**:`LoopDetectedError` 被 `_execute_loop` 的 try/except 捕获,返回 `ReActResult` 带 `error` 字段
|
||
- **Integration**:流式路径 `execute_stream` 同样触发检测,通过 SSE 事件 `step` 广播纠正消息
|
||
|
||
**Verification**:运行 `python3 -m pytest tests/unit/test_react_engine.py::TestLoopDetection -x -q` 全部通过;模拟重复调用场景验证检测生效。
|
||
|
||
---
|
||
|
||
### U2. 子代理并发限制(SubagentLimit)
|
||
|
||
**Goal**:Expert Team 同层并行阶段加 Semaphore,避免 LLM 限流洪峰。
|
||
|
||
**Requirements**:R2
|
||
|
||
**Dependencies**:无(独立优化项)
|
||
|
||
**Files**:
|
||
- `src/agentkit/experts/orchestrator.py`(修改:`__init__` 行 75-84 + `execute` 行 205-208 + `_execute_debate_phase` 行 949-955)
|
||
- `tests/unit/experts/test_team_orchestrator.py`(修改:新增 `TestConcurrencyLimit` 类)
|
||
|
||
**Approach**:
|
||
- `TeamOrchestrator.__init__` 增加 `max_concurrent_phases: int = 3`,创建 `self._phase_semaphore = asyncio.Semaphore(max_concurrent_phases)`
|
||
- 行 205-208 的 `asyncio.gather` 改为包裹 `_bounded_execute(phase)`:
|
||
```
|
||
async def _bounded_execute(phase):
|
||
async with self._phase_semaphore:
|
||
return await self._execute_phase(phase, plan)
|
||
```
|
||
- 辩论阶段行 949-955 同样包裹 `_bounded_debate_arg(expert)`
|
||
- 配置化:`pipeline.max_concurrent_phases`(agentkit.yaml),默认 3
|
||
- `MAX_PHASES = 10`(行 67)保持不变,作为阶段总数上限
|
||
|
||
**Patterns to follow**:`chat.py` 行 761 `_MAX_CONCURRENT_TASKS = 4` 的 per-session 并发限制模式。
|
||
|
||
**Test scenarios**:
|
||
- **Happy path**:3 个阶段同层,全部并行执行(semaphore 不阻塞)
|
||
- **Edge case**:5 个阶段同层,验证最多 3 个同时执行(用 `asyncio.Event` 同步验证并发数)
|
||
- **Edge case**:`max_concurrent_phases=1` 时退化为串行执行
|
||
- **Error path**:某阶段失败不影响 semaphore 释放(`async with` 保证释放)
|
||
- **Integration**:辩论阶段 4 个专家并行,验证 semaphore 限制同样生效
|
||
|
||
**Verification**:运行 `python3 -m pytest tests/unit/experts/test_team_orchestrator.py::TestConcurrencyLimit -x -q` 全部通过。
|
||
|
||
---
|
||
|
||
### U3. 主动压缩触发(Headroom 压缩)
|
||
|
||
**Goal**:基于 token 用量预测主动触发压缩,避免单次请求超限。
|
||
|
||
**Requirements**:R3
|
||
|
||
**Dependencies**:无(独立优化项)
|
||
|
||
**Files**:
|
||
- `src/agentkit/core/compressor.py`(修改:新增 `HeadroomCompressionMiddleware` 类)
|
||
- `src/agentkit/core/react.py`(修改:`_should_compress` 行 1652-1663 改为基于 headroom 预测)
|
||
- `tests/unit/test_react_engine.py`(修改:新增 `TestHeadroomCompression` 类)
|
||
|
||
**Approach**:
|
||
- `ContextCompressor` 新增 `model_context_limit: int` 参数(默认 128000,从 LLMConfig 读取)
|
||
- `_should_compress` 改为:`estimate_tokens(conversation) / model_context_limit > 0.8`
|
||
- 保留固定阈值 8000 作为下限(小模型场景)
|
||
- 压缩触发后,将压缩前的 conversation checkpoint 到 SharedWorkspace(与 U4 协同)
|
||
- 配置化:`pipeline.compression.headroom_threshold`(默认 0.8)、`pipeline.compression.min_tokens`(默认 8000)
|
||
|
||
**Patterns to follow**:现有 `_should_compress`(行 1652-1663)的阈值判断模式,只是阈值来源从硬编码改为 headroom 比例。
|
||
|
||
**Test scenarios**:
|
||
- **Happy path**:conversation 100K tokens,model_limit 128K → 触发压缩(100K/128K = 0.78 > 0.8? 否,不触发;调整测试数据为 110K → 0.86 > 0.8 触发)
|
||
- **Edge case**:conversation 5K tokens,model_limit 128K → 不触发(低于 min_tokens 8000)
|
||
- **Edge case**:model_limit 8K(小模型),conversation 7K → 触发(7K/8K = 0.875 > 0.8)
|
||
- **Error path**:压缩器不可用(`is_available()=False`)→ 跳过压缩,记录 warning
|
||
|
||
**Verification**:运行 `python3 -m pytest tests/unit/test_react_engine.py::TestHeadroomCompression -x -q` 全部通过。
|
||
|
||
---
|
||
|
||
### U4. SharedWorkspace Redis 化 + 状态卸载
|
||
|
||
**Goal**:ExpertTeam 创建时传入 Redis client,阶段输出主动写入 Redis,上下文只保留摘要。
|
||
|
||
**Requirements**:R4
|
||
|
||
**Dependencies**:无(独立优化项,但与 U6 中间件管道协同)
|
||
|
||
**Files**:
|
||
- `src/agentkit/experts/team.py`(修改:`__init__` 行 67 接收 `redis_client` 参数)
|
||
- `src/agentkit/server/routes/chat.py`(修改:`_execute_team_collab` 行 407-410 传入 `app.state.working_redis_client`)
|
||
- `src/agentkit/experts/orchestrator.py`(修改:`_execute_execution_phase` 行 633 后增加上下文摘要替换)
|
||
- `tests/unit/test_orchestrator.py`(修改:新增 `TestSharedWorkspaceRedis` 类)
|
||
|
||
**Approach**:
|
||
- `ExpertTeam.__init__` 增加 `redis_client: Any = None` 参数,传给 `SharedWorkspace(redis_client=redis_client)`
|
||
- `chat.py` 行 407 `ExpertTeam(...)` 传入 `redis_client=app_state.working_redis_client`
|
||
- `orchestrator.py` 行 633 `workspace.write` 后,将 `phase.result` 的完整内容写入 Redis,上下文中只保留摘要 + 引用路径:
|
||
```
|
||
summary = result.get("content", "")[:500] + "..."
|
||
ref_key = f"{plan.id}/phase/{phase.id}/output"
|
||
# 后续阶段读取依赖输出时,从 Redis 读取完整内容
|
||
```
|
||
- `orchestrator.py` 行 496-501 读取依赖输出的逻辑改为:先从内存 `plan.phases` 读,若不存在从 Redis `workspace.read` 读
|
||
|
||
**Patterns to follow**:`PipelineStateRedis._safe_redis_call`(行 183-223)的 Redis + Memory fallback 模式——Redis 失败时降级到内存 dict,不阻断执行。
|
||
|
||
**Test scenarios**:
|
||
- **Happy path**:阶段完成后,Redis 中存在 `{plan_id}/phase/{phase_id}/output` 键,值为完整 result
|
||
- **Edge case**:Redis 不可用 → 降级到内存 dict,执行不中断(`_safe_redis_call` 模式)
|
||
- **Edge case**:后续阶段读取依赖输出,内存无(模拟崩溃恢复)→ 从 Redis 读取成功
|
||
- **Integration**:5 阶段流水线,验证上下文中只保留摘要,完整输出在 Redis
|
||
|
||
**Verification**:运行 `python3 -m pytest tests/unit/test_orchestrator.py::TestSharedWorkspaceRedis -x -q` 全部通过;手动验证 Redis 中存在阶段输出键。
|
||
|
||
---
|
||
|
||
### U5. 技能加载审计与渐进式加载
|
||
|
||
**Goal**:审计 `disclosure_level` 运行时使用情况;若未按需加载,实现 Level 0 概要注入 + `skill_search` 工具。
|
||
|
||
**Requirements**:R7
|
||
|
||
**Dependencies**:无(审计单元,可能无代码产出)
|
||
|
||
**Files**:
|
||
- `src/agentkit/skills/registry.py`(审计:`get` 方法 + 运行时调用路径)
|
||
- `src/agentkit/chat/skill_routing.py`(审计 + 可能修改:`build_skill_system_prompt` 行 101-126)
|
||
- `src/agentkit/core/react.py`(可能修改:新增 `skill_search` 工具,参考 `_maybe_add_tool_search` 行 1600-1623)
|
||
- `tests/unit/test_skill_routing.py`(可能新增:`TestProgressiveSkillLoading` 类)
|
||
|
||
**Approach**:
|
||
- **审计阶段**:
|
||
1. 确认 `SkillLoader.load_from_skill_md`(`loader.py` 行 89)的 `disclosure_level` 默认值
|
||
2. 确认 `build_skill_system_prompt`(`skill_routing.py` 行 101-126)是否根据 `disclosure_level` 动态选择 sections
|
||
3. 确认 ReAct 循环内是否有 `skill_search` 工具(类似 `tool_search`)
|
||
4. 输出审计报告:`disclosure_level` 是否真正按需加载
|
||
- **实现阶段(若审计确认未按需加载)**:
|
||
1. `build_skill_system_prompt` 根据 `disclosure_level=0` 只注入 name + description
|
||
2. ReAct 循环内新增 `skill_search(query: str)` 工具,从 SkillRegistry 检索匹配 skill 的完整内容
|
||
3. LLM 调用 `skill_search` 后,完整 skill 内容注入到 conversation
|
||
- **若审计确认已按需加载**:该单元仅输出审计报告,无代码改动
|
||
|
||
**Patterns to follow**:`_maybe_add_tool_search`(`react.py` 行 1600-1623)的 `tool_search` 工具实现模式——动态注入工具 schema,LLM 按需调用。
|
||
|
||
**Test scenarios**(若实现阶段触发):
|
||
- **Happy path**:`disclosure_level=0` 时,system_prompt 只含 skill name + description
|
||
- **Happy path**:LLM 调用 `skill_search("research")` → 返回 research skill 的完整 instructions
|
||
- **Edge case**:`skill_search` 查询无匹配 → 返回"无匹配 skill"提示
|
||
- **Edge case**:`disclosure_level=1`(默认)→ 行为与现状一致(全量加载),向后兼容
|
||
|
||
**Verification**:审计报告完成;若实现,运行 `python3 -m pytest tests/unit/test_skill_routing.py::TestProgressiveSkillLoading -x -q` 全部通过。
|
||
|
||
---
|
||
|
||
### U6. 中间件管道架构
|
||
|
||
**Goal**:定义统一中间件协议(洋葱模型),将横切关注点集中化,并行接入验证后移除旧路径。
|
||
|
||
**Requirements**:R5
|
||
|
||
**Dependencies**:U1(循环检测迁移为中间件)、U3(压缩迁移为中间件)
|
||
|
||
**Files**:
|
||
- `src/agentkit/core/middleware.py`(新建:`Middleware` 协议 + `MiddlewareChain` + `RequestContext`)
|
||
- `src/agentkit/core/react.py`(修改:`execute` / `execute_stream` 入口接入中间件链,feature flag 控制)
|
||
- `src/agentkit/server/routes/chat.py`(修改:ReActEngine 创建时注入中间件链)
|
||
- `tests/unit/test_middleware.py`(新建:`TestMiddlewareChain` 类)
|
||
|
||
**Approach**:
|
||
- **新建 `core/middleware.py`**:
|
||
```
|
||
class RequestContext: # 请求上下文,贯穿中间件链
|
||
conversation, tools, system_prompt, trajectory, step, token_usage, ...
|
||
|
||
class Middleware(Protocol):
|
||
async def before(self, ctx: RequestContext) -> RequestContext: ...
|
||
async def after(self, ctx: RequestContext, result: Any) -> Any: ...
|
||
|
||
class MiddlewareChain:
|
||
def __init__(self, middlewares: list[Middleware]): ...
|
||
async def execute(self, ctx: RequestContext, handler: Callable) -> Any:
|
||
# 洋葱模型:before 由外到内,after 由内到外
|
||
```
|
||
- **初始中间件集**(精简版,6 层,非 DeerFlow 的 14 层):
|
||
1. `ThreadDataMiddleware` — 初始化 RequestContext
|
||
2. `SummarizationMiddleware` — 包装现有 ContextCompressor(U3 的 headroom 压缩)
|
||
3. `TokenUsageMiddleware` — token 计量
|
||
4. `LoopDetectionMiddleware` — 包装 U1 的循环检测
|
||
5. `SubagentLimitMiddleware` — 包装 U2 的并发限制(作用于 Expert Team)
|
||
6. `TerminalSecurityMiddleware` — 包装现有 6 层终端安全
|
||
- **并行接入**:
|
||
- `ReActEngine.__init__` 增加 `middleware_chain: MiddlewareChain | None = None`
|
||
- `execute` / `execute_stream` 入口:若 `middleware_chain` 存在且 `pipeline.middleware.enabled=True`,走中间件路径;否则走现有路径
|
||
- `chat.py` 行 1066-1068 ReActEngine 创建时注入中间件链
|
||
- **迁移顺序**(验证后逐步移除旧路径):
|
||
1. 先接入 SummarizationMiddleware + TokenUsageMiddleware(低风险)
|
||
2. 再接入 LoopDetectionMiddleware + SubagentLimitMiddleware(中风险)
|
||
3. 最后接入 TerminalSecurityMiddleware(高风险,需充分回归)
|
||
- 配置化:`pipeline.middleware.enabled`(默认 False,灰度开启)
|
||
|
||
**Patterns to follow**:DeerFlow 的洋葱模型——`before` 由外到内、`after` 由内到外,顺序依赖通过 `@Next`/`@Prev` 装饰器声明(首版可用固定顺序列表简化)。
|
||
|
||
**Test scenarios**:
|
||
- **Happy path**:3 个中间件按序执行,`before` 顺序 A→B→C,`after` 顺序 C→B→A
|
||
- **Edge case**:某中间件 `before` 抛异常 → 后续中间件不执行,`after` 链不触发
|
||
- **Edge case**:`middleware_chain=None` → 走现有路径,行为不变(向后兼容)
|
||
- **Integration**:`SummarizationMiddleware` 触发压缩后,`LoopDetectionMiddleware` 仍能检测(中间件间状态通过 RequestContext 传递)
|
||
- **Integration**:feature flag 开关切换,新旧路径行为一致
|
||
|
||
**Verification**:运行 `python3 -m pytest tests/unit/test_middleware.py -x -q` 全部通过;灰度开启中间件路径,运行现有 ReAct 测试套件验证无回归。
|
||
|
||
---
|
||
|
||
### U7. Pipeline 检查点与断点续跑
|
||
|
||
**Goal**:阶段级 checkpoint,崩溃后可从最后完成阶段恢复。
|
||
|
||
**Requirements**:R6
|
||
|
||
**Dependencies**:U4(SharedWorkspace Redis 化提供存储基础)
|
||
|
||
**Files**:
|
||
- `src/agentkit/orchestrator/checkpoint.py`(新建:`PipelineCheckpoint` 类)
|
||
- `src/agentkit/experts/orchestrator.py`(修改:`__init__` 注入 checkpoint + `_execute_execution_phase` 行 633 后 save + 新增 `resume` 方法)
|
||
- `src/agentkit/server/routes/tasks.py`(修改:新增 `POST /api/v1/tasks/{id}/resume` 端点)
|
||
- `tests/unit/test_pipeline_checkpoint.py`(新建:`TestPipelineCheckpoint` 类)
|
||
|
||
**Approach**:
|
||
- **新建 `orchestrator/checkpoint.py`**:
|
||
```
|
||
class PipelineCheckpoint:
|
||
def __init__(self, redis_client, prefix="agentkit:pipeline:checkpoint"): ...
|
||
async def save(self, plan_id: str, phase: PlanPhase, plan_status: str) -> None:
|
||
# 键:{prefix}:{plan_id}:{phase_id}
|
||
# 值:JSON {phase_id, phase_name, phase_result, phase_status, plan_status, saved_at}
|
||
# TTL: 7 天(与 PipelineStateRedis._TTL_SECONDS 一致)
|
||
async def load(self, plan_id: str) -> CheckpointData | None:
|
||
# 返回最后一个 COMPLETED 阶段的 checkpoint
|
||
async def list_checkpoints(self, plan_id: str) -> list[CheckpointData]: ...
|
||
async def clear(self, plan_id: str) -> None: ...
|
||
```
|
||
- **复用 `PipelineStateRedis._safe_redis_call` 模式**:Redis 失败降级到内存 dict,不阻断执行
|
||
- **TeamOrchestrator 接入**:
|
||
- `__init__` 增加 `checkpoint: PipelineCheckpoint | None = None`
|
||
- `_execute_execution_phase` 行 633(phase COMPLETED 后)调用 `checkpoint.save(plan.id, phase, plan.status)`
|
||
- 新增 `async def resume(self, plan_id: str) -> OrchestrationResult`:
|
||
1. `checkpoint.load(plan_id)` 获取最后完成阶段
|
||
2. 重建 `TeamPlan`,标记已完成阶段为 COMPLETED
|
||
3. 从下一未完成阶段继续执行
|
||
- **API 端点**:`POST /api/v1/tasks/{id}/resume` → 调用 `TeamOrchestrator.resume(plan_id)`
|
||
- 配置化:`pipeline.checkpoint.enabled`(默认 False)、`pipeline.checkpoint.ttl_seconds`(默认 604800)
|
||
|
||
**Patterns to follow**:`PipelineStateRedis`(`pipeline_state.py` 行 178-314)的 Redis + Memory fallback + 键命名 + TTL + JSON 序列化模式。`test_pipeline_state.py` 的 `TestPipelineStateRedis` 类的测试模式(AsyncMock + 直接注入 mock)。
|
||
|
||
**Test scenarios**:
|
||
- **Happy path**:`save(plan_id, phase, "executing")` 后 `load(plan_id)` 返回该 phase 的 checkpoint 数据
|
||
- **Happy path**:3 个阶段完成,`load` 返回第 3 个阶段的 checkpoint(最后一个 COMPLETED)
|
||
- **Edge case**:`load` 不存在的 plan_id → 返回 None
|
||
- **Edge case**:Redis 不可用 → 降级到内存 dict,save/load 仍工作
|
||
- **Error path**:`save` 时 Redis 异常 → 记录 warning,不阻断阶段执行(`_safe_redis_call` 模式)
|
||
- **Integration**:`resume(plan_id)` 从 checkpoint 恢复,跳过已完成阶段,执行未完成阶段
|
||
- **Integration**:checkpoint TTL 过期后 `load` 返回 None
|
||
|
||
**Verification**:运行 `python3 -m pytest tests/unit/test_pipeline_checkpoint.py -x -q` 全部通过;手动模拟崩溃恢复场景验证 resume 生效。
|
||
|
||
---
|
||
|
||
## Scope Boundaries
|
||
|
||
### In Scope
|
||
|
||
- ReAct 循环内循环检测(U1)
|
||
- Expert Team 并发限制(U2)
|
||
- 主动压缩触发(U3)
|
||
- SharedWorkspace Redis 化 + 状态卸载(U4)
|
||
- 技能加载审计与渐进式加载(U5)
|
||
- 中间件管道架构(U6,并行接入)
|
||
- Pipeline 检查点与断点续跑(U7,阶段级)
|
||
- 所有优化项的配置化(agentkit.yaml `pipeline` 节)
|
||
- 每个优化项的最小自检测试
|
||
|
||
### Out of Scope
|
||
|
||
- 全盘迁移到 LangGraph(明确不建议,自研架构保持灵活)
|
||
- 重写现有编排逻辑(拓扑排序、Board 辩论、4 层记忆等保持不变)
|
||
- Docker 沙箱默认引入(仅文档化命令级安全的边界,Docker 沙箱作为可选插件未来考虑)
|
||
- IM 原生集成(Telegram/Slack/Feishu,标记为可选插件,非本期)
|
||
- ACP 集成(MCP 已覆盖更通用的协议)
|
||
- 节点级 checkpoint(KTD3 决策,阶段级已满足核心需求)
|
||
- DeerFlow 式磁盘文件系统(KTD4 决策,复用 Redis)
|
||
|
||
### Deferred to Follow-Up Work
|
||
|
||
- 中间件管道的 `@Next`/`@Prev` 装饰器声明顺序依赖(首版用固定顺序列表)
|
||
- 全局 LLM 并发限制(`LLMGateway` 内 semaphore,本期只做 Expert Team 层)
|
||
- ReAct conversation 的 checkpoint(本期只做 Pipeline 阶段级)
|
||
- 中间件路径移除旧路径(需灰度验证 1-2 周后单独执行)
|
||
- `skill_search` 工具的语义检索(本期若实现,用关键词匹配,语义检索延后)
|
||
|
||
---
|
||
|
||
## High-Level Technical Design
|
||
|
||
### 中间件管道洋葱模型
|
||
|
||
```
|
||
请求进入 ReActEngine.execute()
|
||
↓
|
||
MiddlewareChain.execute(ctx, handler)
|
||
↓
|
||
┌─ ThreadDataMiddleware.before ─────────────────────────┐
|
||
│ ┌─ SummarizationMiddleware.before ────────────────┐ │
|
||
│ │ ┌─ TokenUsageMiddleware.before ─────────────┐ │ │
|
||
│ │ │ ┌─ LoopDetectionMiddleware.before ────┐ │ │ │
|
||
│ │ │ │ ┌─ TerminalSecurityMiddleware.before ┐│ │ │ │
|
||
│ │ │ │ │ handler(ctx) ← ReAct 循环 ││ │ │ │
|
||
│ │ │ │ └─ TerminalSecurityMiddleware.after ─┘│ │ │ │
|
||
│ │ │ └─ LoopDetectionMiddleware.after ────────┘ │ │ │
|
||
│ │ └─ TokenUsageMiddleware.after ─────────────────┘ │ │
|
||
│ └─ SummarizationMiddleware.after ────────────────────┘ │
|
||
└─ ThreadDataMiddleware.after ─────────────────────────────┘
|
||
↓
|
||
返回结果
|
||
```
|
||
|
||
### 检查点恢复流程
|
||
|
||
```
|
||
TeamOrchestrator.execute(plan)
|
||
↓
|
||
每阶段完成后: checkpoint.save(plan_id, phase, plan.status)
|
||
↓
|
||
[崩溃发生]
|
||
↓
|
||
用户调用 POST /api/v1/tasks/{id}/resume
|
||
↓
|
||
TeamOrchestrator.resume(plan_id)
|
||
├─ checkpoint.load(plan_id) → 获取最后 COMPLETED 阶段
|
||
├─ 重建 TeamPlan,标记已完成阶段
|
||
├─ topological_sort() → 找到下一未完成层
|
||
└─ 从下一层继续执行(与 execute 相同的并行+串行逻辑)
|
||
```
|
||
|
||
---
|
||
|
||
## Risks & Dependencies
|
||
|
||
### Risks
|
||
|
||
| 风险 | 概率 | 影响 | 缓解 |
|
||
|---|---|---|---|
|
||
| 中间件管道重构引入回归 | 中 | 高(ReAct 循环是核心路径) | 并行接入 + feature flag 灰度(KTD1) |
|
||
| 循环检测误判(合法重复调用被中断) | 低 | 中(用户体验) | 先警告后中断(KTD2),阈值可配置 |
|
||
| SharedWorkspace Redis 化后性能下降 | 低 | 中(Redis 延迟 vs 内存 dict) | `_safe_redis_call` fallback 到内存,异步写入 |
|
||
| 检查点序列化大对象导致 Redis 膨胀 | 中 | 低(TTL 7 天自动清理) | phase.result 只存摘要 + 引用,完整内容已在 SharedWorkspace |
|
||
| U5 审计确认无需改动,单元"无产出" | 中 | 低(审计结论有价值) | KTD5 已明确接受此可能性 |
|
||
|
||
### Dependencies
|
||
|
||
- **Redis 基础设施**:U4、U7 依赖 Redis 可用(已有 `app.state.working_redis_client`,U4 接入)
|
||
- **现有测试套件**:U6 中间件管道接入后需运行全量 ReAct/Team 测试验证无回归
|
||
- **agentkit.yaml 配置**:U1-U7 均需新增 `pipeline` 配置节(U8 统一处理)
|
||
|
||
---
|
||
|
||
## System-Wide Impact
|
||
|
||
### 影响方
|
||
|
||
- **终端用户**:长程任务(@team 模式)更稳定,不再因循环/限流崩溃;崩溃后可恢复
|
||
- **开发者**:新增中间件扩展点,横切逻辑可插拔;配置项增加 `pipeline` 节
|
||
- **运维**:Redis 使用量增加(checkpoint + SharedWorkspace),需监控内存
|
||
- **API 消费者**:新增 `POST /api/v1/tasks/{id}/resume` 端点
|
||
|
||
### 兼容性
|
||
|
||
- 所有优化项默认关闭(feature flag),不影响现有行为
|
||
- `agentkit.yaml` 新增 `pipeline` 节,旧配置无此节时取默认值
|
||
- `SharedWorkspace` Redis 化后,内存模式仍可用(fallback)
|
||
- 中间件管道 `middleware_chain=None` 时走现有路径
|
||
|
||
---
|
||
|
||
## Acceptance Examples
|
||
|
||
- **AE1**:用户发起 @team 任务(5 阶段),阶段 3 因 LLM 限流失败 → 系统自动重试(U2 并发限制避免洪峰),任务完成
|
||
- **AE2**:用户发起 ReAct 任务,LLM 连续 3 次调用相同工具相同参数 → 第 2 次注入纠正消息,LLM 改变策略,任务完成(U1 循环检测)
|
||
- **AE3**:用户发起 @team 任务(10 阶段),阶段 7 时服务崩溃 → 用户调用 `/resume`,从阶段 7 继续,任务完成(U7 检查点)
|
||
- **AE4**:用户发起长对话(50 轮),token 用量接近模型上限 → 系统主动压缩历史,对话继续不中断(U3 headroom 压缩)
|
||
- **AE5**:开发者新增"审计日志"横切逻辑 → 实现 `Middleware` 协议,注入 `MiddlewareChain`,无需修改 ReActEngine(U6 中间件管道)
|
||
|
||
---
|
||
|
||
## Sources & Research
|
||
|
||
- **DeerFlow 2.0 架构分析**(2026-06-24 对话):14 层中间件管道、Lead Agent + Sub-Agent 编排、沙箱隔离、长期记忆、技能渐进式加载
|
||
- **DeerFlow 2.0 源码拆解**(CSDN, 2026-05-10):14 层 Middleware 严格有序、Sub-Agent 并发编排、结构化记忆
|
||
- **DeerFlow 2.0 Deep Dive**(guancyxx.cn, 2026-05-07):Lead Agent + Subagent 模式、SubagentLimitMiddleware、LoopDetectionMiddleware
|
||
- **AgentKit 现有架构**:`react.py` ReAct 循环、`orchestrator.py` Team 编排、`pipeline_state.py` 状态持久化、`shared_workspace.py` 共享工作区
|
||
- **外部研究 load-bearing**:是——DeerFlow 的中间件管道设计直接影响了 KTD1(并行接入策略)、KTD2(循环检测不中断)、U6(中间件协议设计)
|