--- title: "U1-U7 长程可靠性优化代码审查修复" date: 2026-06-25 category: docs/solutions/logic-errors module: experts/orchestrator problem_type: logic_error component: service_object symptoms: - "All skills silently degraded to summary mode because SkillConfig.disclosure_level defaulted to 0 instead of 1" - "Task resume endpoint mismatched task_id and plan_id parameters, causing resume failures on long-horizon tasks" - "LoopDetectionMiddleware never triggered loop detection due to dead code (isinstance dict check on a dataclass)" - "TokenUsageMiddleware raised AttributeError by accessing token_usage instead of total_tokens on the usage object" - "Team synthesis used a truncated 500-char summary instead of the offloaded full content, losing critical context" root_cause: logic_error resolution_type: code_fix severity: high tags: [code-review, logic-error, long-horizon-reliability, expert-orchestrator, middleware, checkpoint, react-engine, skill-config] --- # U1-U7 长程可靠性优化代码审查修复 ## Problem U1-U7 长程可靠性优化特性分支的代码审查发现 14 个问题(2 P0、5 P1、7 P2),分布在 6 个源文件中,涵盖默认值回归、断点续跑标识符错配、中间件与数据类格式不兼容、状态卸载后内容未回读、检查点去重缺失、动态阶段未持久化、状态泄漏与类型守卫缺失等多类逻辑缺陷。其中 13 个已修复,1 个为误报。 ## Symptoms 各缺陷在运行时表现为以下可观测症状: - **技能降级(P0 #1)**:所有既有技能在加载时只注入名称与描述,完整 prompt 不再进入上下文,技能行为退化为"概要模式",输出质量明显下降但无报错。 - **断点续跑失败(P0 #2)**:调用 `POST /tasks/{task_id}/resume` 时返回 404「No checkpoint found」,尽管 checkpoint 实际已写入 Redis;并发 resume 还可能导致同一计划被重复恢复。 - **中间件静默失效(P1 #3、P1 #4)**:`LoopDetectionMiddleware` 的循环检测从不触发警告(`isinstance(step, dict)` 恒为 False),`TokenUsageMiddleware` 的 `token_usage_total` metadata 始终缺失(属性名错配)。 - **综合结果残缺(P1 #5)**:Lead Expert 综合阶段产出时只能看到每个阶段前 500 字符的截断摘要,长产出被腰斩,最终综合结果丢失关键细节。 - **检查点重复累积(P1 #6)**:内存降级模式下,同一阶段被多次 checkpoint 时在 list 中重复出现,`load()` 返回的"最后完成阶段"可能错位。 - **resume 看不到动态辩论(P1 #7)**:分歧检测插入的 DEBATE 阶段在崩溃后无法被 resume 恢复,因为 `save_plan` 只在流水线启动前调用过一次。 - **debate 限额失守(P2 #8)**:resume 后 `_debate_count` 归零,`MAX_DEBATES` 上限可被绕过,导致无限辩论。 - **循环检测状态泄漏(P2 #9)**:同一 `ReActEngine` 实例跨会话复用时,上一会话的 `_loop_window` 残留,新会话首步即可能误判循环。 - **FAILED 阶段被重跑(P2 #11)**:resume 把 FAILED 阶段当作 PENDING,导致已失败阶段被重新执行,可能再次失败或产生副作用。 - **检查点永不清理(P2 #13)**:成功完成后 checkpoint 数据仍留在 Redis/内存,7 天 TTL 内持续占用空间。 - **`_offload_result` 崩溃(P2 #14)**:当阶段产出为 dict 或 None 时,`len(content)` 与切片操作抛 `TypeError`,整条流水线中断。 ## What Didn't Work - **误报一项**:审查中标记的一条问题经核实为误报(false positive),未做改动。这提示在批量审查中需对每条 finding 单独验证,避免引入回归。 - **属性名猜测**:`TokenUsageMiddleware` 最初按"通用命名习惯"假设 `token_usage` 属性存在,但 `ReActResult` 数据类实际使用 `total_tokens`。仅靠阅读中间件自身代码无法发现,必须交叉核对结果数据类的字段定义。 - **格式假设**:`LoopDetectionMiddleware` 假设 trajectory 步骤是 dict,但 `ReActEngine` 实际存入的是 `ReActStep` dataclass。中间件与引擎分属不同模块,格式契约未显式文档化,导致 `isinstance` 检查静默失效而非报错。 ## Solution ### P0 #1:SkillConfig.disclosure_level 默认值 `src/agentkit/skills/base.py` — 将默认值从 0(概要模式)改为 1(全量加载),保持向后兼容。 ```python # Before disclosure_level: int = 0, # 概要模式 — 所有既有技能被降级 # After disclosure_level: int = 1, # 默认全量加载,向后兼容;0=概要模式需显式指定 ``` ### P0 #2:resume 端点 task_id/plan_id 错配 + 并发防护 `src/agentkit/server/routes/tasks.py` — 新增 `plan_id` 查询参数,并加 `asyncio.Lock` 防并发 resume(同时关闭 P2 #10)。 ```python # Before @router.post("/tasks/{task_id}/resume") async def resume_task(task_id: str, req: Request): ... plan_dict = await checkpoint.load_plan(task_id) # task_id 当 plan_id 用,永远 404 # After @router.post("/tasks/{task_id}/resume") async def resume_task(task_id: str, req: Request, plan_id: str | None = None): ... resolved_plan_id = plan_id or task_id # P2 #10: 并发 resume 防护 lock_attr = f"_resume_lock_{resolved_plan_id}" lock = getattr(app_state, lock_attr, None) if lock is None: lock = asyncio.Lock() setattr(app_state, lock_attr, lock) if lock.locked(): raise HTTPException(status_code=409, detail=f"Resume already in progress for plan '{resolved_plan_id}'") async with lock: plan_dict = await checkpoint.load_plan(resolved_plan_id) ``` ### P1 #3:LoopDetectionMiddleware 死代码 `src/agentkit/core/middleware.py` — 同时兼容 dataclass 与 dict。 ```python # Before for step in recent: if isinstance(step, dict): # ReActStep 是 dataclass,恒为 False name = step.get("tool_name", "") args = step.get("arguments", {}) # After for step in recent: if isinstance(step, dict): name = step.get("tool_name", "") args = step.get("arguments", {}) else: name = getattr(step, "tool_name", "") or "" args = getattr(step, "arguments", {}) or {} ``` ### P1 #4:TokenUsageMiddleware 属性名 `src/agentkit/core/middleware.py` — 改用 `ReActResult` 实际字段名。 ```python # Before usage = getattr(result, "token_usage", None) # 不存在,恒为 None # After usage = getattr(result, "total_tokens", None) # ReActResult.total_tokens ``` ### P1 #5:综合阶段使用截断摘要 `src/agentkit/experts/orchestrator.py` `_synthesize_results` — 对 offloaded 结果调用 `_read_dependency_output()` 从 SharedWorkspace 读回完整内容。 ```python # Before for i, ph in enumerate(completed_phases): r = ph.result or {} content = r.get("content", str(r)) # content 已被 _offload_result 截断到 500 字符 # After for i, ph in enumerate(completed_phases): r = ph.result or {} if isinstance(r, dict) and r.get("_offloaded"): content = await self._read_dependency_output(ph) # 从 workspace 读完整内容 else: content = r.get("content", str(r)) if isinstance(r, dict) else str(r) ``` ### P1 #6:检查点内存去重 `src/agentkit/orchestrator/checkpoint.py` — 内存降级存储由 list 改为 dict keyed by phase_id。 ```python # Before self._memory: dict[str, list[CheckpointData]] = {} ... self._memory.setdefault(plan_id, []).append(data) # 同一 phase_id 重复累积 # After self._memory: dict[str, dict[str, CheckpointData]] = {} ... self._memory.setdefault(plan_id, {})[phase_id] = data # 同一 phase_id 覆盖 ``` `list_checkpoints` 的内存降级分支相应改为 `.values()` 迭代。 ### P1 #7:动态 DEBATE 阶段未持久化 `src/agentkit/experts/orchestrator.py` `_check_divergence_and_insert_debates` — 插入辩论阶段后立即 `save_plan`。 ```python # Before debate = self._insert_debate_phase(plan, ph, topic, participants) if debate: await self._broadcast_event("plan_update", {...}) # 未调用 save_plan — resume 看不到该 DEBATE 阶段 # After debate = self._insert_debate_phase(plan, ph, topic, participants) if debate: await self._broadcast_event("plan_update", {...}) if self._checkpoint is not None: try: await self._checkpoint.save_plan(plan) # resume 可见 except Exception as e: logger.warning(f"Checkpoint save_plan (debate insert) failed: {e}") ``` ### P2 #8:resume 不恢复 _debate_count `src/agentkit/experts/orchestrator.py` `resume()` — 统计恢复计划中的 DEBATE 阶段数。 ```python # Before # resume() 不触碰 _debate_count,恢复后归零,MAX_DEBATES 可被绕过 # After self._debate_count = sum( 1 for ph in plan.phases if ph.phase_type == PhaseType.DEBATE ) ``` ### P2 #9:循环检测状态不自动重置 `src/agentkit/core/react.py` — 在 `execute()` 与 `execute_stream()` 入口调用 `self.reset()`。 ```python # Before async def execute(self, ...): effective_compressor = ... # 直接进入逻辑,_loop_window 残留上一会话 # After async def execute(self, ...): self.reset() # 清空 _loop_window、_loop_corrected effective_compressor = ... ``` ### P2 #11:resume 不处理 FAILED 阶段 `src/agentkit/experts/orchestrator.py` `resume()` — 追踪 `failed_phase_ids` 并恢复 FAILED 状态。 ```python # Before for cp in checkpoints: if cp.phase_status == "completed": completed_phase_ids.add(cp.phase_id) # FAILED 阶段被忽略,恢复后变 PENDING,会被重跑 # After failed_phase_ids: set[str] = set() for cp in checkpoints: if cp.phase_status == "completed": completed_phase_ids.add(cp.phase_id) if cp.phase_result: phase_results[cp.phase_id] = cp.phase_result elif cp.phase_status == "failed": failed_phase_ids.add(cp.phase_id) for ph in plan.phases: if ph.id in completed_phase_ids: ph.status = PhaseStatus.COMPLETED ... elif ph.id in failed_phase_ids: ph.status = PhaseStatus.FAILED # 不再重跑 ``` ### P2 #13:checkpoint.clear() 从不调用 `src/agentkit/experts/orchestrator.py` — 成功路径在 `team_dissolved` 事件后清理 checkpoint。 ```python # Before await self._broadcast_event("team_dissolved", {"team_id": self._team.team_id}) return {"status": "completed", ...} # checkpoint 永不清理 # After await self._broadcast_event("team_dissolved", {"team_id": self._team.team_id}) if self._checkpoint is not None: try: await self._checkpoint.clear(plan.id) except Exception as e: logger.warning(f"Checkpoint clear failed: {e}") return {"status": "completed", ...} ``` ### P2 #14:_offload_result 对非字符串内容崩溃 `src/agentkit/experts/orchestrator.py` `_offload_result` — 入口加 `str()` 守卫。 ```python # Before def _offload_result(self, content: str, ref_key: str) -> dict[str, Any]: summary = content[: self._OFFLOAD_SUMMARY_LIMIT] # content 若为 dict/None → TypeError # After def _offload_result(self, content: str, ref_key: str) -> dict[str, Any]: if not isinstance(content, str): content = str(content) if content is not None else "" summary = content[: self._OFFLOAD_SUMMARY_LIMIT] ``` ## Why This Works 各修复都针对具体根因,而非表面症状: - **默认值回归(P0 #1)**:U5 渐进式技能加载引入 `disclosure_level` 时,默认值设为 0(概要模式)是新功能的"安全默认",但忽略了既有技能契约——它们从未声明该字段,期望全量加载。改为 1 后,未显式声明的技能保持原有全量行为,只有显式选择概要模式的技能才降级。这是"新字段默认值应保持既有契约"的典型教训。 - **标识符错配(P0 #2)**:`TeamPlan.id` 是自动生成的 UUID,与外部 `task_id` 是不同命名空间。resume 端点假设二者相等,导致 checkpoint 永远查不到。引入显式 `plan_id` 参数并保留 `task_id` 回退,既支持新调用方也兼容旧路径。并发 Lock 则防止同一计划被两个请求同时恢复造成状态错乱。 - **格式契约不匹配(P1 #3、#4)**:中间件与核心数据类分属不同模块,中间件按"猜测"的格式/属性名访问结果。修复方式是显式兼容两种格式(dict + dataclass)并核对真实属性名。根因是跨模块契约未文档化——`ReActStep`/`ReActResult` 的字段定义应作为中间件的契约源。 - **状态卸载未回读(P1 #5)**:U4 把大产出卸载到 SharedWorkspace,内存只留 500 字符摘要。但综合阶段仍直接读 `result["content"]`(即摘要),未解析 `_offloaded` 标记。修复复用了已有的 `_read_dependency_output()`,该函数本就为解析 offloaded 内容而存在——综合阶段只是漏调。 - **去重数据结构选型(P1 #6)**:list 语义是"追加",而 checkpoint 的语义是"同一阶段最新状态覆盖旧状态"。改用 dict keyed by phase_id 后,重复 save 自然覆盖,`list_checkpoints` 返回唯一阶段集合。 - **动态修改需重新持久化(P1 #7)**:`save_plan` 只在流水线启动前调用一次,但 `_check_divergence_and_insert_debates` 会在执行中修改 `plan.phases`。resume 依赖持久化的 plan 重建,看不到动态插入的阶段就会丢失辩论环节。在每次动态修改后立即 `save_plan` 保证 resume 看到的是最新计划。 - **状态恢复完整性(P2 #8、#11)**:resume 是"状态重建",必须重建所有运行时状态——包括 `_debate_count`(限额计数器)和 FAILED 阶段状态。只恢复 COMPLETED 而忽略 FAILED,等于把已失败阶段当作待执行,违背了"resume 应精确还原崩溃前状态"的契约。 - **跨会话状态隔离(P2 #9)**:`ReActEngine` 实例可能被复用(如 AgentPool),`_loop_window` 是实例级状态。若不在每次 execute 入口重置,上一会话的工具调用哈希会污染新会话的循环检测。`reset()` 方法本就存在,只是未被调用——这是"有清理方法但未接入生命周期"的常见漏洞。 - **资源生命周期(P2 #13)**:checkpoint 是临时断点数据,成功完成后即失去价值。7 天 TTL 是兜底,但主动清理避免在 TTL 窗口内累积无意义数据。失败路径不清理,以便 resume 重试。 - **类型守卫(P2 #14)**:`_offload_result` 的类型注解写的是 `str`,但实际调用方可能传入 dict 或 None。Python 运行时不强制注解,`len()` 与切片对非字符串直接报错。入口 `str()` 守卫是最低成本的防御,符合"信任边界处做校验"的原则。 ## Prevention 1. **新字段默认值须保持既有契约**:为既有数据类新增字段时,默认值必须让未声明该字段的旧代码保持原有行为。U5 的 `disclosure_level` 应默认为"全量加载",而非新功能的"安全模式"。审查时对任何"新增字段改变既有行为"的默认值保持警惕。 2. **跨模块契约显式化**:中间件访问核心数据类(`ReActStep`、`ReActResult`)的字段时,应以数据类定义为契约源,而非凭命名习惯猜测。建议在数据类定义处注释"被中间件依赖的字段",或在中间件处注释"契约来源:core/react.py: ReActResult"。 3. **resume 是完整状态重建**:任何运行时计数器(`_debate_count`)、阶段状态(含 FAILED)、动态修改(插入的 DEBATE)都必须在 resume 中还原。审查 resume 逻辑时,逐项核对"execute 期间会修改哪些状态",确保每项都有对应的恢复路径。 4. **动态修改后立即持久化**:凡是在执行过程中修改了需被 resume 重建的对象(如 `plan.phases`),修改后必须立即 `save_plan`。不要依赖"结束时统一保存"——崩溃可能发生在保存之前。 5. **清理方法接入生命周期**:`reset()`、`clear()` 这类方法若存在,必须在对应生命周期入口(execute、完成、销毁)被调用。审查时搜索"定义了但未被调用的清理方法"。 6. **类型注解不是运行时保证**:Python 类型注解不强制,调用方可能传入任意类型。在 `len()`、切片、属性访问等对类型敏感的操作前,对来自外部(其他模块、LLM 产出、反序列化)的输入加 `isinstance` 守卫。 7. **审查 finding 逐条验证**:本次 14 条 finding 中 1 条为误报。批量审查时每条都需独立验证,避免按"看起来合理"就修改,引入回归。 8. **回归测试**:本次修复涉及的关键路径应有最小可运行检查——`disclosure_level` 默认值、resume 状态恢复、offloaded 内容回读、非字符串内容守卫。按项目规则(`python3 -m pytest tests/unit/ -x -q`)运行单元测试,确保上述路径不回归。 ## Related Issues - **上游计划**:[docs/plans/2026-06-24-004-feat-long-horizon-reliability-optimization-plan.md](file:///Users/Chiguyong/Code/Fischer/fischer-agentkit/docs/plans/2026-06-24-004-feat-long-horizon-reliability-optimization-plan.md) — U1-U7 实施计划,14 个 finding 均来自对该计划实现单元的代码审查 - **相邻领域**:[docs/solutions/architecture-patterns/bitable-companion-service-security-reliability-patterns.md](file:///Users/Chiguyong/Code/Fischer/fischer-agentkit/docs/solutions/architecture-patterns/bitable-companion-service-security-reliability-patterns.md) — 不同模块(bitable 安全 vs 编排器可靠性),无内容重叠