fischer-agentkit/docs/plans/2026-06-24-004-feat-long-ho...

32 KiB
Raw Blame History

title status date type origin
feat: 长程任务可靠性优化 — 中间件管道、循环检测、并发限制、检查点、状态卸载 active 2026-06-24 feat DeerFlow 2.0 SuperAgent Harness 架构对比分析2026-06-24 对话)

Summary

基于 DeerFlow 2.0(字节跳动开源 SuperAgent Harness74K+ Stars的架构对比分析为 AgentKit 补齐长程任务(持续数分钟到数小时)可靠性的 5 个关键缺口:

  1. 循环检测LoopDetection — ReAct 循环内滑动窗口 hash 检测重复工具调用,消除 ~30% 的 token 浪费。
  2. 子代理并发限制SubagentLimit — Expert Team 同层并行阶段加 Semaphore避免 LLM 限流洪峰。
  3. 主动压缩触发Headroom 压缩) — 基于 token 用量预测主动触发压缩,避免单次请求超限。
  4. SharedWorkspace Redis 化 + 状态卸载 — 阶段输出主动卸载到 Redis/磁盘,上下文只保留摘要,长程任务 token 降 50%+。
  5. 中间件管道架构 — 统一中间件协议(洋葱模型),将散落的横切关注点(压缩/计量/安全/循环检测)集中化,并行接入验证后移除旧路径。
  6. Pipeline 检查点与断点续跑 — 阶段级 checkpoint崩溃后可从最后完成阶段恢复。
  7. 技能加载审计 — 确认 disclosure_level 运行时是否真正按需加载,补齐渐进式加载缺口(若存在)。

明确做:全盘迁移 LangGraph自研架构保持灵活、重写现有编排逻辑、Docker 沙箱默认引入仅文档化边界、IM 原生集成标记为可选插件、ACP 集成MCP 已覆盖)。

Problem Frame

DeerFlow 2.0 的核心能力是让 Agent 自主执行持续数小时的复杂工作流而不"迷失"。其工程基础是 14 层有序中间件管道(洋葱模型)+ 文件系统优先的状态管理 + 沙箱隔离 + 循环检测 + 子代理并发限制。

对照 AgentKit 现状:

DeerFlow 能力 AgentKit 现状 差距
14 层有序中间件管道 横切关注点硬编码在 ReActEngine._execute_loopexecute_stream 内,无统一管道 缺失
LoopDetectionMiddleware滑动窗口 hash ReAct 循环无循环检测,只有 max_steps 兜底 缺失
SubagentLimitMiddlewaremax 3/轮) asyncio.gather 同层全部并行,无 Semaphore 缺失
SummarizationMiddleware主动触发 压缩在 ReAct 内被动调用,阈值 8000 tokens 硬编码 部分(被动 vs 主动)
文件系统优先状态管理 SharedWorkspace 默认内存 dict不持久化 缺失
Checkpoint节点级断点续跑 PipelineStateManager 只写不读,无 resume 逻辑 缺失
技能渐进式加载(先元数据,触发时加载完整) disclosure_level 字段存在但运行时不使用,启动时一次性全量加载 部分(字段有,逻辑无)

关键洞察AgentKit 的自研编排(拓扑排序 + Board 辩论 + 4 层记忆 + 自演化)比 DeerFlow 更丰富,但长程任务护栏(循环检测、并发限制、统一中间件、检查点)是明显短板。补齐这些护栏是让 AgentKit 也能支撑"数小时自主执行"的关键。

Requirements

  • R1ReAct 循环(非流式 _execute_loop + 流式 execute_stream)内加入循环检测,滑动窗口检测最近 N 步的 (tool_name, arguments_hash) 重复,触发时注入"你正在重复调用 X请改变策略"系统消息而非直接中断。
  • R2TeamOrchestrator 同层并行阶段(asyncio.gather)加 asyncio.Semaphore,默认 max 3 并发,可配置;辩论阶段并行同样加限制。
  • R3ContextCompressor 包装为基于 headroom 预测的主动压缩——当 token 用量 / 模型上限 > 80% 时主动触发压缩,而非固定阈值 8000。
  • R4SharedWorkspace 在 ExpertTeam 创建时传入 Redis client复用 app.state.working_redis_client),阶段输出主动写入 Redis上下文中只保留摘要 + 引用路径。
  • R5:新增 core/middleware.py,定义 Middleware 协议(before/after+ MiddlewareChain(洋葱模型,有序执行);将现有 ContextCompressor、终端安全、循环检测包装为中间件并行接入新旧共存验证后移除旧路径。
  • R6:新增 orchestrator/checkpoint.py,实现 PipelineCheckpoint阶段级Redis 存储,键 agentkit:pipeline:checkpoint:{plan_id}:{phase_id}TTL 7 天);TeamOrchestrator 在阶段完成后 save checkpoint新增 resume(plan_id) 方法从 checkpoint 恢复。
  • R7:审计 SkillRegistry 运行时加载路径,确认 disclosure_level 是否真正按需加载;若非,在 build_skill_system_prompt 中实现 Level 0 概要注入 + ReAct 循环内 skill_search 工具按需加载完整内容。
  • R8:所有优化项配置化,新增 agentkit.yamlpipeline 节(checkpoint.enabledmax_concurrent_phasesloop_detection.window_size 等),遵循 ServerConfig.from_dict 模式。
  • R9每个优化项附最小自检测试ponytail 规则),参考 test_pipeline_state.pyTestPipelineStateRedis 类模式AsyncMock + 直接注入 mock

Key Technical Decisions

KTD1中间件管道并行接入不直接替换现有请求流

决策:中间件管道与现有 ReActEngine._execute_loop / execute_stream 内的横切逻辑并行运行,通过 feature flag 控制(pipeline.middleware.enabled),验证稳定后再移除旧路径。

理由:直接替换风险高——_execute_loop(行 281-831execute_stream(行 833-1504各有 500+ 行横切逻辑压缩、trace、memory、telemetry深度交织。并行接入允许灰度验证单点故障不影响现有功能。

代价:短期内代码重复(新旧路径共存),工作量翻倍。可接受——长程任务可靠性是核心能力,不值得为省工作量冒回归风险。

KTD2循环检测不直接中断注入纠正消息

决策:循环检测触发时,不抛异常或 break 循环,而是向 conversation 注入系统消息"你正在重复调用 {tool_name},请改变策略",给 LLM 一次自我纠正机会。连续 2 次检测仍重复则强制中断。

理由DeerFlow 的 LoopDetectionMiddleware 也是先警告后中断。直接中断会丢失已有上下文,且 LLM 可能只是"卡顿"而非真正死循环(如等待外部资源)。纠正消息是最低成本的恢复机制。

代价:多消耗 1-2 轮 LLM 调用。可接受——比无限循环烧 token 好。

KTD3检查点粒度为阶段级Phase-level不做节点级

决策PipelineCheckpoint 只在 PlanPhase 完成后保存(_execute_execution_phase 行 633 后),保存 plan_idphase_idphase.resultplan.topological_sort() 当前进度。不做 ReAct 循环内的节点级 checkpoint。

理由:节点级 checkpoint类似 LangGraph需要状态模式强约束与 AgentKit 的 dict[str, Any] 上下文模型冲突,实现复杂度高。阶段级已满足"崩溃后从最后完成阶段恢复"的核心需求,覆盖 90% 的长程任务恢复场景。

代价:崩溃在阶段中间时,该阶段需重跑。可接受——单阶段通常 1-5 分钟,重跑成本低于全量重跑。

KTD4SharedWorkspace Redis 化,但不引入磁盘文件系统

决策ExpertTeam 创建时传入 Redis client复用 app.state.working_redis_client),阶段输出写入 Redis。不引入 DeerFlow 式的磁盘文件系统(/mnt/skills//workspace/)。

理由AgentKit 已有 Redis 基础设施(PipelineStateRedis_safe_redis_call + Memory fallback 模式成熟),复用成本最低。磁盘文件系统需要容器化部署配合,违反 ponytail"不引入新依赖")。

代价Redis 存储有 TTL 限制7 天),不适合永久归档。可接受——阶段输出是中间产物,永久归档应由 EpisodicMemoryPG+pgvector负责。

KTD5技能渐进式加载——先审计按需实现

决策U5 首先是审计单元——确认 disclosure_level 在运行时是否被使用。若已按需加载,该单元无代码产出(仅审计报告)。若未按需加载,在 build_skill_system_prompt 实现 Level 0 概要注入 + ReAct 循环内新增 skill_search 工具。

理由:研究显示 disclosure_level 字段存在但运行时默认为 1全量加载疑似未真正按需加载。但需审计确认后再决定是否实现避免过度工程。

代价U5 可能无代码产出。可接受——审计结论本身有价值(确认差距是否存在)。


Implementation Units

U1. 循环检测LoopDetection

Goal:在 ReAct 循环内加入滑动窗口 hash 检测,识别重复工具调用模式,触发纠正消息。

RequirementsR1

Dependencies:无(独立优化项)

Files

  • src/agentkit/core/react.py(修改:_execute_loop 行 388-392 + execute_stream 行 948-952 插入检测逻辑)
  • tests/unit/test_react_engine.py(修改:新增 TestLoopDetection 类)

Approach

  • ReActEngine.__init__ 增加 _loop_window: deque = deque(maxlen=5)_loop_threshold: int = 2
  • 检测逻辑:每步工具调用后,计算 (tool_name, json.dumps(arguments, sort_keys=True)) 的 hash存入 _loop_window
  • 若窗口内同一 hash 出现次数 ≥ _loop_threshold,向 conversation 注入系统消息
  • 连续 2 次检测仍重复,抛 LoopDetectedError(新增异常,继承 TaskCancelledError
  • 插入点:react.py 行 388step += 1 后)、行 948流式对应位置
  • 配置化:loop_detection.window_sizeloop_detection.thresholdagentkit.yaml pipeline 节)

Patterns to followcancellation_token.check()(行 391的协作式检查模式——检测但不阻塞给 LLM 纠正机会。

Test scenarios

  • Happy path正常工具调用不触发检测5 步内不同工具调用,无系统消息注入
  • Edge case:连续 3 次相同工具 + 相同参数 → 第 2 次注入纠正消息,第 3 次抛 LoopDetectedError
  • Edge case:相同工具不同参数 → 不触发检测(参数 hash 不同)
  • Error pathLoopDetectedError_execute_loop 的 try/except 捕获,返回 ReActResulterror 字段
  • Integration:流式路径 execute_stream 同样触发检测,通过 SSE 事件 step 广播纠正消息

Verification:运行 python3 -m pytest tests/unit/test_react_engine.py::TestLoopDetection -x -q 全部通过;模拟重复调用场景验证检测生效。


U2. 子代理并发限制SubagentLimit

GoalExpert Team 同层并行阶段加 Semaphore避免 LLM 限流洪峰。

RequirementsR2

Dependencies:无(独立优化项)

Files

  • src/agentkit/experts/orchestrator.py(修改:__init__ 行 75-84 + execute 行 205-208 + _execute_debate_phase 行 949-955
  • tests/unit/experts/test_team_orchestrator.py(修改:新增 TestConcurrencyLimit 类)

Approach

  • TeamOrchestrator.__init__ 增加 max_concurrent_phases: int = 3,创建 self._phase_semaphore = asyncio.Semaphore(max_concurrent_phases)
  • 行 205-208 的 asyncio.gather 改为包裹 _bounded_execute(phase)
    async def _bounded_execute(phase):
        async with self._phase_semaphore:
            return await self._execute_phase(phase, plan)
    
  • 辩论阶段行 949-955 同样包裹 _bounded_debate_arg(expert)
  • 配置化:pipeline.max_concurrent_phasesagentkit.yaml默认 3
  • MAX_PHASES = 10(行 67保持不变作为阶段总数上限

Patterns to followchat.py 行 761 _MAX_CONCURRENT_TASKS = 4 的 per-session 并发限制模式。

Test scenarios

  • Happy path3 个阶段同层全部并行执行semaphore 不阻塞)
  • Edge case5 个阶段同层,验证最多 3 个同时执行(用 asyncio.Event 同步验证并发数)
  • Edge casemax_concurrent_phases=1 时退化为串行执行
  • Error path:某阶段失败不影响 semaphore 释放(async with 保证释放)
  • Integration:辩论阶段 4 个专家并行,验证 semaphore 限制同样生效

Verification:运行 python3 -m pytest tests/unit/experts/test_team_orchestrator.py::TestConcurrencyLimit -x -q 全部通过。


U3. 主动压缩触发Headroom 压缩)

Goal:基于 token 用量预测主动触发压缩,避免单次请求超限。

RequirementsR3

Dependencies:无(独立优化项)

Files

  • src/agentkit/core/compressor.py(修改:新增 HeadroomCompressionMiddleware 类)
  • src/agentkit/core/react.py(修改:_should_compress 行 1652-1663 改为基于 headroom 预测)
  • tests/unit/test_react_engine.py(修改:新增 TestHeadroomCompression 类)

Approach

  • ContextCompressor 新增 model_context_limit: int 参数(默认 128000从 LLMConfig 读取)
  • _should_compress 改为:estimate_tokens(conversation) / model_context_limit > 0.8
  • 保留固定阈值 8000 作为下限(小模型场景)
  • 压缩触发后,将压缩前的 conversation checkpoint 到 SharedWorkspace与 U4 协同)
  • 配置化:pipeline.compression.headroom_threshold(默认 0.8)、pipeline.compression.min_tokens(默认 8000

Patterns to follow:现有 _should_compress(行 1652-1663的阈值判断模式只是阈值来源从硬编码改为 headroom 比例。

Test scenarios

  • Happy pathconversation 100K tokensmodel_limit 128K → 触发压缩100K/128K = 0.78 > 0.8? 否,不触发;调整测试数据为 110K → 0.86 > 0.8 触发)
  • Edge caseconversation 5K tokensmodel_limit 128K → 不触发(低于 min_tokens 8000
  • Edge casemodel_limit 8K小模型conversation 7K → 触发7K/8K = 0.875 > 0.8
  • Error path:压缩器不可用(is_available()=False)→ 跳过压缩,记录 warning

Verification:运行 python3 -m pytest tests/unit/test_react_engine.py::TestHeadroomCompression -x -q 全部通过。


U4. SharedWorkspace Redis 化 + 状态卸载

GoalExpertTeam 创建时传入 Redis client阶段输出主动写入 Redis上下文只保留摘要。

RequirementsR4

Dependencies:无(独立优化项,但与 U6 中间件管道协同)

Files

  • src/agentkit/experts/team.py(修改:__init__ 行 67 接收 redis_client 参数)
  • src/agentkit/server/routes/chat.py(修改:_execute_team_collab 行 407-410 传入 app.state.working_redis_client
  • src/agentkit/experts/orchestrator.py(修改:_execute_execution_phase 行 633 后增加上下文摘要替换)
  • tests/unit/test_orchestrator.py(修改:新增 TestSharedWorkspaceRedis 类)

Approach

  • ExpertTeam.__init__ 增加 redis_client: Any = None 参数,传给 SharedWorkspace(redis_client=redis_client)
  • chat.py 行 407 ExpertTeam(...) 传入 redis_client=app_state.working_redis_client
  • orchestrator.py 行 633 workspace.write 后,将 phase.result 的完整内容写入 Redis上下文中只保留摘要 + 引用路径:
    summary = result.get("content", "")[:500] + "..."
    ref_key = f"{plan.id}/phase/{phase.id}/output"
    # 后续阶段读取依赖输出时,从 Redis 读取完整内容
    
  • orchestrator.py 行 496-501 读取依赖输出的逻辑改为:先从内存 plan.phases 读,若不存在从 Redis workspace.read

Patterns to followPipelineStateRedis._safe_redis_call(行 183-223的 Redis + Memory fallback 模式——Redis 失败时降级到内存 dict不阻断执行。

Test scenarios

  • Happy path阶段完成后Redis 中存在 {plan_id}/phase/{phase_id}/output 键,值为完整 result
  • Edge caseRedis 不可用 → 降级到内存 dict执行不中断_safe_redis_call 模式)
  • Edge case:后续阶段读取依赖输出,内存无(模拟崩溃恢复)→ 从 Redis 读取成功
  • Integration5 阶段流水线,验证上下文中只保留摘要,完整输出在 Redis

Verification:运行 python3 -m pytest tests/unit/test_orchestrator.py::TestSharedWorkspaceRedis -x -q 全部通过;手动验证 Redis 中存在阶段输出键。


U5. 技能加载审计与渐进式加载

Goal:审计 disclosure_level 运行时使用情况;若未按需加载,实现 Level 0 概要注入 + skill_search 工具。

RequirementsR7

Dependencies:无(审计单元,可能无代码产出)

Files

  • src/agentkit/skills/registry.py(审计:get 方法 + 运行时调用路径)
  • src/agentkit/chat/skill_routing.py(审计 + 可能修改:build_skill_system_prompt 行 101-126
  • src/agentkit/core/react.py(可能修改:新增 skill_search 工具,参考 _maybe_add_tool_search 行 1600-1623
  • tests/unit/test_skill_routing.py(可能新增:TestProgressiveSkillLoading 类)

Approach

  • 审计阶段
    1. 确认 SkillLoader.load_from_skill_mdloader.py 行 89disclosure_level 默认值
    2. 确认 build_skill_system_promptskill_routing.py 行 101-126是否根据 disclosure_level 动态选择 sections
    3. 确认 ReAct 循环内是否有 skill_search 工具(类似 tool_search
    4. 输出审计报告:disclosure_level 是否真正按需加载
  • 实现阶段(若审计确认未按需加载)
    1. build_skill_system_prompt 根据 disclosure_level=0 只注入 name + description
    2. ReAct 循环内新增 skill_search(query: str) 工具,从 SkillRegistry 检索匹配 skill 的完整内容
    3. LLM 调用 skill_search 后,完整 skill 内容注入到 conversation
  • 若审计确认已按需加载:该单元仅输出审计报告,无代码改动

Patterns to follow_maybe_add_tool_searchreact.py 行 1600-1623tool_search 工具实现模式——动态注入工具 schemaLLM 按需调用。

Test scenarios(若实现阶段触发):

  • Happy pathdisclosure_level=0system_prompt 只含 skill name + description
  • Happy pathLLM 调用 skill_search("research") → 返回 research skill 的完整 instructions
  • Edge caseskill_search 查询无匹配 → 返回"无匹配 skill"提示
  • Edge casedisclosure_level=1(默认)→ 行为与现状一致(全量加载),向后兼容

Verification:审计报告完成;若实现,运行 python3 -m pytest tests/unit/test_skill_routing.py::TestProgressiveSkillLoading -x -q 全部通过。


U6. 中间件管道架构

Goal:定义统一中间件协议(洋葱模型),将横切关注点集中化,并行接入验证后移除旧路径。

RequirementsR5

DependenciesU1循环检测迁移为中间件、U3压缩迁移为中间件

Files

  • src/agentkit/core/middleware.py(新建:Middleware 协议 + MiddlewareChain + RequestContext
  • src/agentkit/core/react.py(修改:execute / execute_stream 入口接入中间件链feature flag 控制)
  • src/agentkit/server/routes/chat.py修改ReActEngine 创建时注入中间件链)
  • tests/unit/test_middleware.py(新建:TestMiddlewareChain 类)

Approach

  • 新建 core/middleware.py
    class RequestContext:  # 请求上下文,贯穿中间件链
        conversation, tools, system_prompt, trajectory, step, token_usage, ...
    
    class Middleware(Protocol):
        async def before(self, ctx: RequestContext) -> RequestContext: ...
        async def after(self, ctx: RequestContext, result: Any) -> Any: ...
    
    class MiddlewareChain:
        def __init__(self, middlewares: list[Middleware]): ...
        async def execute(self, ctx: RequestContext, handler: Callable) -> Any:
            # 洋葱模型before 由外到内after 由内到外
    
  • 初始中间件集精简版6 层,非 DeerFlow 的 14 层):
    1. ThreadDataMiddleware — 初始化 RequestContext
    2. SummarizationMiddleware — 包装现有 ContextCompressorU3 的 headroom 压缩)
    3. TokenUsageMiddleware — token 计量
    4. LoopDetectionMiddleware — 包装 U1 的循环检测
    5. SubagentLimitMiddleware — 包装 U2 的并发限制(作用于 Expert Team
    6. TerminalSecurityMiddleware — 包装现有 6 层终端安全
  • 并行接入
    • ReActEngine.__init__ 增加 middleware_chain: MiddlewareChain | None = None
    • execute / execute_stream 入口:若 middleware_chain 存在且 pipeline.middleware.enabled=True,走中间件路径;否则走现有路径
    • chat.py 行 1066-1068 ReActEngine 创建时注入中间件链
  • 迁移顺序(验证后逐步移除旧路径):
    1. 先接入 SummarizationMiddleware + TokenUsageMiddleware低风险
    2. 再接入 LoopDetectionMiddleware + SubagentLimitMiddleware中风险
    3. 最后接入 TerminalSecurityMiddleware高风险需充分回归
  • 配置化:pipeline.middleware.enabled(默认 False灰度开启

Patterns to followDeerFlow 的洋葱模型——before 由外到内、after 由内到外,顺序依赖通过 @Next/@Prev 装饰器声明(首版可用固定顺序列表简化)。

Test scenarios

  • Happy path3 个中间件按序执行,before 顺序 A→B→Cafter 顺序 C→B→A
  • Edge case:某中间件 before 抛异常 → 后续中间件不执行,after 链不触发
  • Edge casemiddleware_chain=None → 走现有路径,行为不变(向后兼容)
  • IntegrationSummarizationMiddleware 触发压缩后,LoopDetectionMiddleware 仍能检测(中间件间状态通过 RequestContext 传递)
  • Integrationfeature flag 开关切换,新旧路径行为一致

Verification:运行 python3 -m pytest tests/unit/test_middleware.py -x -q 全部通过;灰度开启中间件路径,运行现有 ReAct 测试套件验证无回归。


U7. Pipeline 检查点与断点续跑

Goal:阶段级 checkpoint崩溃后可从最后完成阶段恢复。

RequirementsR6

DependenciesU4SharedWorkspace Redis 化提供存储基础)

Files

  • src/agentkit/orchestrator/checkpoint.py(新建:PipelineCheckpoint 类)
  • src/agentkit/experts/orchestrator.py(修改:__init__ 注入 checkpoint + _execute_execution_phase 行 633 后 save + 新增 resume 方法)
  • src/agentkit/server/routes/tasks.py(修改:新增 POST /api/v1/tasks/{id}/resume 端点)
  • tests/unit/test_pipeline_checkpoint.py(新建:TestPipelineCheckpoint 类)

Approach

  • 新建 orchestrator/checkpoint.py
    class PipelineCheckpoint:
        def __init__(self, redis_client, prefix="agentkit:pipeline:checkpoint"): ...
        async def save(self, plan_id: str, phase: PlanPhase, plan_status: str) -> None:
            # 键:{prefix}:{plan_id}:{phase_id}
            # 值JSON {phase_id, phase_name, phase_result, phase_status, plan_status, saved_at}
            # TTL: 7 天(与 PipelineStateRedis._TTL_SECONDS 一致)
        async def load(self, plan_id: str) -> CheckpointData | None:
            # 返回最后一个 COMPLETED 阶段的 checkpoint
        async def list_checkpoints(self, plan_id: str) -> list[CheckpointData]: ...
        async def clear(self, plan_id: str) -> None: ...
    
  • 复用 PipelineStateRedis._safe_redis_call 模式Redis 失败降级到内存 dict不阻断执行
  • TeamOrchestrator 接入
    • __init__ 增加 checkpoint: PipelineCheckpoint | None = None
    • _execute_execution_phase 行 633phase COMPLETED 后)调用 checkpoint.save(plan.id, phase, plan.status)
    • 新增 async def resume(self, plan_id: str) -> OrchestrationResult
      1. checkpoint.load(plan_id) 获取最后完成阶段
      2. 重建 TeamPlan,标记已完成阶段为 COMPLETED
      3. 从下一未完成阶段继续执行
  • API 端点POST /api/v1/tasks/{id}/resume → 调用 TeamOrchestrator.resume(plan_id)
  • 配置化:pipeline.checkpoint.enabled(默认 Falsepipeline.checkpoint.ttl_seconds(默认 604800

Patterns to followPipelineStateRedispipeline_state.py 行 178-314的 Redis + Memory fallback + 键命名 + TTL + JSON 序列化模式。test_pipeline_state.pyTestPipelineStateRedis 类的测试模式AsyncMock + 直接注入 mock

Test scenarios

  • Happy pathsave(plan_id, phase, "executing")load(plan_id) 返回该 phase 的 checkpoint 数据
  • Happy path3 个阶段完成,load 返回第 3 个阶段的 checkpoint最后一个 COMPLETED
  • Edge caseload 不存在的 plan_id → 返回 None
  • Edge caseRedis 不可用 → 降级到内存 dictsave/load 仍工作
  • Error pathsave 时 Redis 异常 → 记录 warning不阻断阶段执行_safe_redis_call 模式)
  • Integrationresume(plan_id) 从 checkpoint 恢复,跳过已完成阶段,执行未完成阶段
  • Integrationcheckpoint TTL 过期后 load 返回 None

Verification:运行 python3 -m pytest tests/unit/test_pipeline_checkpoint.py -x -q 全部通过;手动模拟崩溃恢复场景验证 resume 生效。


Scope Boundaries

In Scope

  • ReAct 循环内循环检测U1
  • Expert Team 并发限制U2
  • 主动压缩触发U3
  • SharedWorkspace Redis 化 + 状态卸载U4
  • 技能加载审计与渐进式加载U5
  • 中间件管道架构U6并行接入
  • Pipeline 检查点与断点续跑U7阶段级
  • 所有优化项的配置化agentkit.yaml pipeline 节)
  • 每个优化项的最小自检测试

Out of Scope

  • 全盘迁移到 LangGraph明确不建议自研架构保持灵活
  • 重写现有编排逻辑拓扑排序、Board 辩论、4 层记忆等保持不变)
  • Docker 沙箱默认引入仅文档化命令级安全的边界Docker 沙箱作为可选插件未来考虑)
  • IM 原生集成Telegram/Slack/Feishu标记为可选插件非本期
  • ACP 集成MCP 已覆盖更通用的协议)
  • 节点级 checkpointKTD3 决策,阶段级已满足核心需求)
  • DeerFlow 式磁盘文件系统KTD4 决策,复用 Redis

Deferred to Follow-Up Work

  • 中间件管道的 @Next/@Prev 装饰器声明顺序依赖(首版用固定顺序列表)
  • 全局 LLM 并发限制(LLMGateway 内 semaphore本期只做 Expert Team 层)
  • ReAct conversation 的 checkpoint本期只做 Pipeline 阶段级)
  • 中间件路径移除旧路径(需灰度验证 1-2 周后单独执行)
  • skill_search 工具的语义检索(本期若实现,用关键词匹配,语义检索延后)

High-Level Technical Design

中间件管道洋葱模型

请求进入 ReActEngine.execute()
  ↓
MiddlewareChain.execute(ctx, handler)
  ↓
┌─ ThreadDataMiddleware.before ─────────────────────────┐
│  ┌─ SummarizationMiddleware.before ────────────────┐  │
│  │  ┌─ TokenUsageMiddleware.before ─────────────┐  │  │
│  │  │  ┌─ LoopDetectionMiddleware.before ────┐  │  │  │
│  │  │  │  ┌─ TerminalSecurityMiddleware.before ┐│  │  │  │
│  │  │  │  │     handler(ctx)  ← ReAct 循环     ││  │  │  │
│  │  │  │  └─ TerminalSecurityMiddleware.after ─┘│  │  │  │
│  │  │  └─ LoopDetectionMiddleware.after ────────┘  │  │  │
│  │  └─ TokenUsageMiddleware.after ─────────────────┘  │  │
│  └─ SummarizationMiddleware.after ────────────────────┘  │
└─ ThreadDataMiddleware.after ─────────────────────────────┘
  ↓
返回结果

检查点恢复流程

TeamOrchestrator.execute(plan)
  ↓
每阶段完成后: checkpoint.save(plan_id, phase, plan.status)
  ↓
[崩溃发生]
  ↓
用户调用 POST /api/v1/tasks/{id}/resume
  ↓
TeamOrchestrator.resume(plan_id)
  ├─ checkpoint.load(plan_id) → 获取最后 COMPLETED 阶段
  ├─ 重建 TeamPlan标记已完成阶段
  ├─ topological_sort() → 找到下一未完成层
  └─ 从下一层继续执行(与 execute 相同的并行+串行逻辑)

Risks & Dependencies

Risks

风险 概率 影响 缓解
中间件管道重构引入回归 ReAct 循环是核心路径) 并行接入 + feature flag 灰度KTD1
循环检测误判(合法重复调用被中断) 中(用户体验) 先警告后中断KTD2阈值可配置
SharedWorkspace Redis 化后性能下降 Redis 延迟 vs 内存 dict _safe_redis_call fallback 到内存,异步写入
检查点序列化大对象导致 Redis 膨胀 TTL 7 天自动清理) phase.result 只存摘要 + 引用,完整内容已在 SharedWorkspace
U5 审计确认无需改动,单元"无产出" 低(审计结论有价值) KTD5 已明确接受此可能性

Dependencies

  • Redis 基础设施U4、U7 依赖 Redis 可用(已有 app.state.working_redis_clientU4 接入)
  • 现有测试套件U6 中间件管道接入后需运行全量 ReAct/Team 测试验证无回归
  • agentkit.yaml 配置U1-U7 均需新增 pipeline 配置节U8 统一处理)

System-Wide Impact

影响方

  • 终端用户:长程任务(@team 模式)更稳定,不再因循环/限流崩溃;崩溃后可恢复
  • 开发者:新增中间件扩展点,横切逻辑可插拔;配置项增加 pipeline
  • 运维Redis 使用量增加checkpoint + SharedWorkspace需监控内存
  • API 消费者:新增 POST /api/v1/tasks/{id}/resume 端点

兼容性

  • 所有优化项默认关闭feature flag不影响现有行为
  • agentkit.yaml 新增 pipeline 节,旧配置无此节时取默认值
  • SharedWorkspace Redis 化后内存模式仍可用fallback
  • 中间件管道 middleware_chain=None 时走现有路径

Acceptance Examples

  • AE1:用户发起 @team 任务5 阶段),阶段 3 因 LLM 限流失败 → 系统自动重试U2 并发限制避免洪峰),任务完成
  • AE2:用户发起 ReAct 任务LLM 连续 3 次调用相同工具相同参数 → 第 2 次注入纠正消息LLM 改变策略任务完成U1 循环检测)
  • AE3:用户发起 @team 任务10 阶段),阶段 7 时服务崩溃 → 用户调用 /resume,从阶段 7 继续任务完成U7 检查点)
  • AE4用户发起长对话50 轮token 用量接近模型上限 → 系统主动压缩历史对话继续不中断U3 headroom 压缩)
  • AE5:开发者新增"审计日志"横切逻辑 → 实现 Middleware 协议,注入 MiddlewareChain,无需修改 ReActEngineU6 中间件管道)

Sources & Research

  • DeerFlow 2.0 架构分析2026-06-24 对话14 层中间件管道、Lead Agent + Sub-Agent 编排、沙箱隔离、长期记忆、技能渐进式加载
  • DeerFlow 2.0 源码拆解CSDN, 2026-05-1014 层 Middleware 严格有序、Sub-Agent 并发编排、结构化记忆
  • DeerFlow 2.0 Deep Diveguancyxx.cn, 2026-05-07Lead Agent + Subagent 模式、SubagentLimitMiddleware、LoopDetectionMiddleware
  • AgentKit 现有架构react.py ReAct 循环、orchestrator.py Team 编排、pipeline_state.py 状态持久化、shared_workspace.py 共享工作区
  • 外部研究 load-bearing是——DeerFlow 的中间件管道设计直接影响了 KTD1并行接入策略、KTD2循环检测不中断、U6中间件协议设计