fischer-agentkit/docs/solutions/logic-errors/long-horizon-reliability-co...

17 KiB
Raw Permalink Blame History

title date category module problem_type component symptoms root_cause resolution_type severity tags
U1-U7 长程可靠性优化代码审查修复 2026-06-25 docs/solutions/logic-errors experts/orchestrator logic_error service_object
All skills silently degraded to summary mode because SkillConfig.disclosure_level defaulted to 0 instead of 1
Task resume endpoint mismatched task_id and plan_id parameters, causing resume failures on long-horizon tasks
LoopDetectionMiddleware never triggered loop detection due to dead code (isinstance dict check on a dataclass)
TokenUsageMiddleware raised AttributeError by accessing token_usage instead of total_tokens on the usage object
Team synthesis used a truncated 500-char summary instead of the offloaded full content, losing critical context
logic_error code_fix high
code-review
logic-error
long-horizon-reliability
expert-orchestrator
middleware
checkpoint
react-engine
skill-config

U1-U7 长程可靠性优化代码审查修复

Problem

U1-U7 长程可靠性优化特性分支的代码审查发现 14 个问题2 P0、5 P1、7 P2分布在 6 个源文件中,涵盖默认值回归、断点续跑标识符错配、中间件与数据类格式不兼容、状态卸载后内容未回读、检查点去重缺失、动态阶段未持久化、状态泄漏与类型守卫缺失等多类逻辑缺陷。其中 13 个已修复1 个为误报。

Symptoms

各缺陷在运行时表现为以下可观测症状:

  • 技能降级P0 #1:所有既有技能在加载时只注入名称与描述,完整 prompt 不再进入上下文,技能行为退化为"概要模式",输出质量明显下降但无报错。
  • 断点续跑失败P0 #2:调用 POST /tasks/{task_id}/resume 时返回 404「No checkpoint found」尽管 checkpoint 实际已写入 Redis并发 resume 还可能导致同一计划被重复恢复。
  • 中间件静默失效P1 #3、P1 #4LoopDetectionMiddleware 的循环检测从不触发警告(isinstance(step, dict) 恒为 FalseTokenUsageMiddlewaretoken_usage_total metadata 始终缺失(属性名错配)。
  • 综合结果残缺P1 #5Lead Expert 综合阶段产出时只能看到每个阶段前 500 字符的截断摘要,长产出被腰斩,最终综合结果丢失关键细节。
  • 检查点重复累积P1 #6:内存降级模式下,同一阶段被多次 checkpoint 时在 list 中重复出现,load() 返回的"最后完成阶段"可能错位。
  • resume 看不到动态辩论P1 #7:分歧检测插入的 DEBATE 阶段在崩溃后无法被 resume 恢复,因为 save_plan 只在流水线启动前调用过一次。
  • debate 限额失守P2 #8resume 后 _debate_count 归零,MAX_DEBATES 上限可被绕过,导致无限辩论。
  • 循环检测状态泄漏P2 #9:同一 ReActEngine 实例跨会话复用时,上一会话的 _loop_window 残留,新会话首步即可能误判循环。
  • FAILED 阶段被重跑P2 #11resume 把 FAILED 阶段当作 PENDING导致已失败阶段被重新执行可能再次失败或产生副作用。
  • 检查点永不清理P2 #13:成功完成后 checkpoint 数据仍留在 Redis/内存7 天 TTL 内持续占用空间。
  • _offload_result 崩溃P2 #14:当阶段产出为 dict 或 None 时,len(content) 与切片操作抛 TypeError,整条流水线中断。

What Didn't Work

  • 误报一项审查中标记的一条问题经核实为误报false positive未做改动。这提示在批量审查中需对每条 finding 单独验证,避免引入回归。
  • 属性名猜测TokenUsageMiddleware 最初按"通用命名习惯"假设 token_usage 属性存在,但 ReActResult 数据类实际使用 total_tokens。仅靠阅读中间件自身代码无法发现,必须交叉核对结果数据类的字段定义。
  • 格式假设LoopDetectionMiddleware 假设 trajectory 步骤是 dictReActEngine 实际存入的是 ReActStep dataclass。中间件与引擎分属不同模块格式契约未显式文档化导致 isinstance 检查静默失效而非报错。

Solution

P0 #1SkillConfig.disclosure_level 默认值

src/agentkit/skills/base.py — 将默认值从 0概要模式改为 1全量加载保持向后兼容。

# Before
disclosure_level: int = 0,  # 概要模式 — 所有既有技能被降级

# After
disclosure_level: int = 1,  # 默认全量加载向后兼容0=概要模式需显式指定

P0 #2resume 端点 task_id/plan_id 错配 + 并发防护

src/agentkit/server/routes/tasks.py — 新增 plan_id 查询参数,并加 asyncio.Lock 防并发 resume同时关闭 P2 #10

# Before
@router.post("/tasks/{task_id}/resume")
async def resume_task(task_id: str, req: Request):
    ...
    plan_dict = await checkpoint.load_plan(task_id)  # task_id 当 plan_id 用,永远 404

# After
@router.post("/tasks/{task_id}/resume")
async def resume_task(task_id: str, req: Request, plan_id: str | None = None):
    ...
    resolved_plan_id = plan_id or task_id
    # P2 #10: 并发 resume 防护
    lock_attr = f"_resume_lock_{resolved_plan_id}"
    lock = getattr(app_state, lock_attr, None)
    if lock is None:
        lock = asyncio.Lock()
        setattr(app_state, lock_attr, lock)
    if lock.locked():
        raise HTTPException(status_code=409, detail=f"Resume already in progress for plan '{resolved_plan_id}'")
    async with lock:
        plan_dict = await checkpoint.load_plan(resolved_plan_id)

P1 #3LoopDetectionMiddleware 死代码

src/agentkit/core/middleware.py — 同时兼容 dataclass 与 dict。

# Before
for step in recent:
    if isinstance(step, dict):           # ReActStep 是 dataclass恒为 False
        name = step.get("tool_name", "")
        args = step.get("arguments", {})

# After
for step in recent:
    if isinstance(step, dict):
        name = step.get("tool_name", "")
        args = step.get("arguments", {})
    else:
        name = getattr(step, "tool_name", "") or ""
        args = getattr(step, "arguments", {}) or {}

P1 #4TokenUsageMiddleware 属性名

src/agentkit/core/middleware.py — 改用 ReActResult 实际字段名。

# Before
usage = getattr(result, "token_usage", None)   # 不存在,恒为 None

# After
usage = getattr(result, "total_tokens", None)  # ReActResult.total_tokens

P1 #5综合阶段使用截断摘要

src/agentkit/experts/orchestrator.py _synthesize_results — 对 offloaded 结果调用 _read_dependency_output() 从 SharedWorkspace 读回完整内容。

# Before
for i, ph in enumerate(completed_phases):
    r = ph.result or {}
    content = r.get("content", str(r))  # content 已被 _offload_result 截断到 500 字符

# After
for i, ph in enumerate(completed_phases):
    r = ph.result or {}
    if isinstance(r, dict) and r.get("_offloaded"):
        content = await self._read_dependency_output(ph)  # 从 workspace 读完整内容
    else:
        content = r.get("content", str(r)) if isinstance(r, dict) else str(r)

P1 #6检查点内存去重

src/agentkit/orchestrator/checkpoint.py — 内存降级存储由 list 改为 dict keyed by phase_id。

# Before
self._memory: dict[str, list[CheckpointData]] = {}
...
self._memory.setdefault(plan_id, []).append(data)  # 同一 phase_id 重复累积

# After
self._memory: dict[str, dict[str, CheckpointData]] = {}
...
self._memory.setdefault(plan_id, {})[phase_id] = data  # 同一 phase_id 覆盖

list_checkpoints 的内存降级分支相应改为 .values() 迭代。

P1 #7动态 DEBATE 阶段未持久化

src/agentkit/experts/orchestrator.py _check_divergence_and_insert_debates — 插入辩论阶段后立即 save_plan

# Before
debate = self._insert_debate_phase(plan, ph, topic, participants)
if debate:
    await self._broadcast_event("plan_update", {...})
    # 未调用 save_plan — resume 看不到该 DEBATE 阶段

# After
debate = self._insert_debate_phase(plan, ph, topic, participants)
if debate:
    await self._broadcast_event("plan_update", {...})
    if self._checkpoint is not None:
        try:
            await self._checkpoint.save_plan(plan)  # resume 可见
        except Exception as e:
            logger.warning(f"Checkpoint save_plan (debate insert) failed: {e}")

P2 #8resume 不恢复 _debate_count

src/agentkit/experts/orchestrator.py resume() — 统计恢复计划中的 DEBATE 阶段数。

# Before
# resume() 不触碰 _debate_count恢复后归零MAX_DEBATES 可被绕过

# After
self._debate_count = sum(
    1 for ph in plan.phases if ph.phase_type == PhaseType.DEBATE
)

P2 #9循环检测状态不自动重置

src/agentkit/core/react.py — 在 execute()execute_stream() 入口调用 self.reset()

# Before
async def execute(self, ...):
    effective_compressor = ...  # 直接进入逻辑_loop_window 残留上一会话

# After
async def execute(self, ...):
    self.reset()  # 清空 _loop_window、_loop_corrected
    effective_compressor = ...

P2 #11resume 不处理 FAILED 阶段

src/agentkit/experts/orchestrator.py resume() — 追踪 failed_phase_ids 并恢复 FAILED 状态。

# Before
for cp in checkpoints:
    if cp.phase_status == "completed":
        completed_phase_ids.add(cp.phase_id)
        # FAILED 阶段被忽略,恢复后变 PENDING会被重跑

# After
failed_phase_ids: set[str] = set()
for cp in checkpoints:
    if cp.phase_status == "completed":
        completed_phase_ids.add(cp.phase_id)
        if cp.phase_result:
            phase_results[cp.phase_id] = cp.phase_result
    elif cp.phase_status == "failed":
        failed_phase_ids.add(cp.phase_id)

for ph in plan.phases:
    if ph.id in completed_phase_ids:
        ph.status = PhaseStatus.COMPLETED
        ...
    elif ph.id in failed_phase_ids:
        ph.status = PhaseStatus.FAILED  # 不再重跑

P2 #13checkpoint.clear() 从不调用

src/agentkit/experts/orchestrator.py — 成功路径在 team_dissolved 事件后清理 checkpoint。

# Before
await self._broadcast_event("team_dissolved", {"team_id": self._team.team_id})
return {"status": "completed", ...}  # checkpoint 永不清理

# After
await self._broadcast_event("team_dissolved", {"team_id": self._team.team_id})
if self._checkpoint is not None:
    try:
        await self._checkpoint.clear(plan.id)
    except Exception as e:
        logger.warning(f"Checkpoint clear failed: {e}")
return {"status": "completed", ...}

P2 #14_offload_result 对非字符串内容崩溃

src/agentkit/experts/orchestrator.py _offload_result — 入口加 str() 守卫。

# Before
def _offload_result(self, content: str, ref_key: str) -> dict[str, Any]:
    summary = content[: self._OFFLOAD_SUMMARY_LIMIT]  # content 若为 dict/None → TypeError

# After
def _offload_result(self, content: str, ref_key: str) -> dict[str, Any]:
    if not isinstance(content, str):
        content = str(content) if content is not None else ""
    summary = content[: self._OFFLOAD_SUMMARY_LIMIT]

Why This Works

各修复都针对具体根因,而非表面症状:

  • 默认值回归P0 #1U5 渐进式技能加载引入 disclosure_level 时,默认值设为 0概要模式是新功能的"安全默认",但忽略了既有技能契约——它们从未声明该字段,期望全量加载。改为 1 后,未显式声明的技能保持原有全量行为,只有显式选择概要模式的技能才降级。这是"新字段默认值应保持既有契约"的典型教训。

  • 标识符错配P0 #2TeamPlan.id 是自动生成的 UUID与外部 task_id 是不同命名空间。resume 端点假设二者相等,导致 checkpoint 永远查不到。引入显式 plan_id 参数并保留 task_id 回退,既支持新调用方也兼容旧路径。并发 Lock 则防止同一计划被两个请求同时恢复造成状态错乱。

  • 格式契约不匹配P1 #3、#4:中间件与核心数据类分属不同模块,中间件按"猜测"的格式/属性名访问结果。修复方式是显式兼容两种格式dict + dataclass并核对真实属性名。根因是跨模块契约未文档化——ReActStep/ReActResult 的字段定义应作为中间件的契约源。

  • 状态卸载未回读P1 #5U4 把大产出卸载到 SharedWorkspace内存只留 500 字符摘要。但综合阶段仍直接读 result["content"](即摘要),未解析 _offloaded 标记。修复复用了已有的 _read_dependency_output(),该函数本就为解析 offloaded 内容而存在——综合阶段只是漏调。

  • 去重数据结构选型P1 #6list 语义是"追加",而 checkpoint 的语义是"同一阶段最新状态覆盖旧状态"。改用 dict keyed by phase_id 后,重复 save 自然覆盖,list_checkpoints 返回唯一阶段集合。

  • 动态修改需重新持久化P1 #7save_plan 只在流水线启动前调用一次,但 _check_divergence_and_insert_debates 会在执行中修改 plan.phases。resume 依赖持久化的 plan 重建,看不到动态插入的阶段就会丢失辩论环节。在每次动态修改后立即 save_plan 保证 resume 看到的是最新计划。

  • 状态恢复完整性P2 #8、#11resume 是"状态重建",必须重建所有运行时状态——包括 _debate_count(限额计数器)和 FAILED 阶段状态。只恢复 COMPLETED 而忽略 FAILED等于把已失败阶段当作待执行违背了"resume 应精确还原崩溃前状态"的契约。

  • 跨会话状态隔离P2 #9ReActEngine 实例可能被复用(如 AgentPool_loop_window 是实例级状态。若不在每次 execute 入口重置,上一会话的工具调用哈希会污染新会话的循环检测。reset() 方法本就存在,只是未被调用——这是"有清理方法但未接入生命周期"的常见漏洞。

  • 资源生命周期P2 #13checkpoint 是临时断点数据成功完成后即失去价值。7 天 TTL 是兜底,但主动清理避免在 TTL 窗口内累积无意义数据。失败路径不清理,以便 resume 重试。

  • 类型守卫P2 #14_offload_result 的类型注解写的是 str,但实际调用方可能传入 dict 或 None。Python 运行时不强制注解,len() 与切片对非字符串直接报错。入口 str() 守卫是最低成本的防御,符合"信任边界处做校验"的原则。

Prevention

  1. 新字段默认值须保持既有契约为既有数据类新增字段时默认值必须让未声明该字段的旧代码保持原有行为。U5 的 disclosure_level 应默认为"全量加载",而非新功能的"安全模式"。审查时对任何"新增字段改变既有行为"的默认值保持警惕。

  2. 跨模块契约显式化:中间件访问核心数据类(ReActStepReActResult)的字段时,应以数据类定义为契约源,而非凭命名习惯猜测。建议在数据类定义处注释"被中间件依赖的字段",或在中间件处注释"契约来源core/react.py: ReActResult"。

  3. resume 是完整状态重建:任何运行时计数器(_debate_count)、阶段状态(含 FAILED、动态修改插入的 DEBATE都必须在 resume 中还原。审查 resume 逻辑时,逐项核对"execute 期间会修改哪些状态",确保每项都有对应的恢复路径。

  4. 动态修改后立即持久化:凡是在执行过程中修改了需被 resume 重建的对象(如 plan.phases),修改后必须立即 save_plan。不要依赖"结束时统一保存"——崩溃可能发生在保存之前。

  5. 清理方法接入生命周期reset()clear() 这类方法若存在必须在对应生命周期入口execute、完成、销毁被调用。审查时搜索"定义了但未被调用的清理方法"。

  6. 类型注解不是运行时保证Python 类型注解不强制,调用方可能传入任意类型。在 len()、切片、属性访问等对类型敏感的操作前对来自外部其他模块、LLM 产出、反序列化)的输入加 isinstance 守卫。

  7. 审查 finding 逐条验证:本次 14 条 finding 中 1 条为误报。批量审查时每条都需独立验证,避免按"看起来合理"就修改,引入回归。

  8. 回归测试:本次修复涉及的关键路径应有最小可运行检查——disclosure_level 默认值、resume 状态恢复、offloaded 内容回读、非字符串内容守卫。按项目规则(python3 -m pytest tests/unit/ -x -q)运行单元测试,确保上述路径不回归。