From be45fe42c59f9fdd2161b55657a67cff271e3daa Mon Sep 17 00:00:00 2001 From: chiguyong Date: Fri, 3 Jul 2026 07:32:57 +0800 Subject: [PATCH 1/3] feat(compressor): CJK-aware token estimation + linear compress flow U1: Add estimate_text_tokens() module-level function with CJK 1:1 / ASCII 4:1 heuristic. Update estimate_tokens(), _summarize() pre-truncation, and react.py _should_compress() fallback to use it. Fixes 4x token underestimation for Chinese/Japanese/Korean conversations. U3: Rewrite compress() from recursive _compression_depth to linear flow (summarize -> aggressive -> truncate). Add _log_compression() structured logging with tokens_before/after/ratio/strategy. Remove _compression_depth parameter from compress() and _compress_aggressive(). Per plan: docs/plans/2026-07-02-003-feat-context-compressor-cjk-prefix-enhancement-plan.md --- ...-compressor-cjk-prefix-enhancement-plan.md | 351 ++++++++++++++++++ src/agentkit/core/compressor.py | 108 ++++-- src/agentkit/core/react.py | 49 ++- tests/unit/test_context_compressor.py | 270 ++++++++++++-- 4 files changed, 718 insertions(+), 60 deletions(-) create mode 100644 docs/plans/2026-07-02-003-feat-context-compressor-cjk-prefix-enhancement-plan.md diff --git a/docs/plans/2026-07-02-003-feat-context-compressor-cjk-prefix-enhancement-plan.md b/docs/plans/2026-07-02-003-feat-context-compressor-cjk-prefix-enhancement-plan.md new file mode 100644 index 0000000..5cc2889 --- /dev/null +++ b/docs/plans/2026-07-02-003-feat-context-compressor-cjk-prefix-enhancement-plan.md @@ -0,0 +1,351 @@ +--- +date: 2026-07-02 +plan_id: "2026-07-02-003" +type: feat +status: draft +title: "Context Compressor CJK Token 估算 + Prefix 对齐 + 压缩简化" +origin: stash@{0} (feat/context-compressor-enhancement, 2026-06-23) +--- + +# Context Compressor CJK Token 估算 + Prefix 对齐 + 压缩简化 + +## Summary + +当前 `ContextCompressor` 在中文会话场景下存在三个实际问题: +(1) token 估算用 `len(content) // 4`(4 字符=1 token),但 CJK 字符实际 1 字符 ≈ 1 token,导致中文会话 token 被低估 4 倍,压缩触发过晚,可能超出模型 context window; +(2) system prompt 混有动态内容(时间戳、UUID、session_id),破坏 LLM provider(Anthropic/OpenAI)的 KV cache prefix 稳定性,导致 cache miss → 延迟增加 + 成本上升; +(3) `compress()` 用递归 `_compression_depth` 参数,逻辑复杂且缺乏结构化日志,难以监控压缩行为。 + +本 plan 基于 stash@{0} 的 WIP 实现,重建为可执行的 implementation plan。 + +## Problem Frame + +**当前状态**(main `6826ceb` 上的 `src/agentkit/core/compressor.py`): + +- `estimate_tokens()`: `len(str(content)) // 4` — 对中文严重低估 +- `should_compress()`: 基于 headroom ratio,依赖 `estimate_tokens` 的准确性 +- `compress()`: 递归 `_compression_depth` 参数,最多 2 层递归后强制 truncate +- `_compress_aggressive()`: 也接受 `_compression_depth` 参数 +- 无 prefix 对齐逻辑 — system prompt 中动态行位置每次变化 +- 无结构化日志 — 压缩发生时无 tokens_before/after/ratio 记录 + +**应用点**: +- `src/agentkit/core/react.py` `_should_compress()` (line 1705) — 已委托给 `compressor.should_compress()` (line 1716-1718),`// 4` fallback (line 1721) 是 live code(HeadroomCompressor 不实现 `should_compress()`,fallback 为其服务) +- `src/agentkit/core/config_driven.py` `_handle_direct()` (line 1066) — stash 新增了 LLM 调用前压缩(`len(rendered_messages) > 6` 触发,但 `rendered_messages` 实际 ≤ 2 条) + +**影响**:中文用户的长会话场景下,压缩触发时机过晚,可能触发模型 context limit 错误;KV cache 命中率低导致每次请求延迟和成本上升。 + +## Requirements + +- **R1**: CJK 字符(中日韩统一表意文字 + 假名 + 韩文谚文)按 1:1 估算 token,ASCII 仍按 4:1 +- **R2**: `estimate_text_tokens()` 作为模块级函数导出,供 `react.py` 等外部调用 +- **R3**: ~~`align_prefix()` 将 system prompt 中的动态行(时间戳、UUID、session_id、`当前时间`)移到末尾 `[Dynamic Context]` 段,保持前缀稳定~~ **(已取消 — OQ17/OQ18:`_build_system_message()` 已覆盖,且动态内容不存在)** +- **R4**: ~~`compress()` 首先调用 `align_prefix()`,提升 KV cache 命中率~~ **(已取消 — 随 U2 取消)** +- **R5**: 移除递归 `_compression_depth` 参数,改线性流程:compress → aggressive → truncate(修正:移除 align 步骤,U2 已取消) +- **R6**: 新增 `_log_compression()` 输出结构化日志(tokens_before/after/ratio/msg_count) +- **R7**: `react.py._should_compress()` 的 `// 4` fallback(line 1721)改用 `estimate_text_tokens()`(第三轮修正:fallback **不是死代码** — HeadroomCompressor 不实现 `should_compress()`,fallback 是 live code。改为用 `estimate_text_tokens()` 替换 `// 4` 而非移除,使 CJK 估算惠及 HeadroomCompressor 用户) +- **R8**: ~~`config_driven.py` 在 LLM 调用前对长 system prompt 执行压缩~~ **(已取消 — 第三轮 F-009:compress() 守卫使单条消息压缩无效;system prompt 是设计时 artifact)** +- **R9**: 测试覆盖 CJK 估算、~~prefix 对齐~~、压缩流程、~~工具结果压缩~~(修正:prefix 对齐和工具结果压缩随 U2/U4 缩减取消) + +> **R9 mapping**: R9 是横切需求,不映射到单一 Unit。由 U1(CJK 估算测试)、~~U2(TestAlignPrefix)~~(已取消)、U3(压缩流程测试)、~~U4(test_config_driven.py 压缩测试)~~(已取消)的 Test scenarios 与 Verification 共同满足。 + +## Key Technical Decisions + +### KTD1: CJK 估算用启发式,不引入 tiktoken + +**决策**:用 `_is_cjk(char)` + `estimate_text_tokens(text)` 启发式(CJK 1:1,ASCII 4:1),不引入 tiktoken 依赖,也不使用 `litellm.token_counter`。 + +**理由**(修正 OQ3:原"项目无 tiktoken 依赖"前提虚假 — `litellm>=1.50` 已是直接依赖 `pyproject.toml:30`,提供 `litellm.token_counter(model, messages)` API): +- ~~项目无 tiktoken 依赖(`pyproject.toml` 未列出),引入会增加安装负担~~ **修正**:`litellm>=1.50` 已是直接依赖,但 `litellm.token_counter` 需要 model 参数且需加载 tokenizer,开销大于纯字符遍历(第三轮修正:litellm.token_counter 是纯本地函数,不调用网络 API) +- 启发式对触发时机的判断足够准确(压缩阈值有 headroom 缓冲) +- 符合 ponytail 原则:用最小可行方案,避免在热路径上引入 litellm 调用开销 +- ponytail ceiling: 启发式对纯 CJK 文本可能仍低估 ~10-20%,但 headroom_threshold=0.8 的缓冲足以吸收;升级路径是引入 `litellm.token_counter` 或 provider 特定的 tokenizer + +### KTD2: ~~Prefix 对齐策略 — 动态行移到末尾~~ **(已取消 — OQ17/OQ18)** + +~~**决策**:`align_prefix()` 识别 system prompt 中的动态行(匹配时间戳/UUID/session_id/`当前时间` 模式),将其移到 system prompt 末尾的 `[Dynamic Context]` 段落。~~ + +~~**理由**:~~ +- ~~LLM provider 的 KV cache 基于 prefix 匹配,前缀越稳定 cache 命中率越高~~ +- ~~动态内容(时间戳等)每次请求都变,放在前缀中会破坏 cache~~ +- ~~移到末尾后,静态部分(agent identity、技能说明等)构成稳定前缀~~ + +> **取消原因**:`_build_system_message()` 已实现 stable/volatile 分离;system prompt 无动态内容。详见 U2 取消说明。 + +### KTD3: 压缩逻辑改线性,移除递归 + +**决策**:`compress()` 改为线性流程(~~align →~~ compress → aggressive → truncate),移除 `_compression_depth` 递归参数。 + +**理由**(修正 OQ20:原"递归难以追踪"理由薄弱 — 递归实际是 pseudo-linear,最多 2 层后强制 truncate): +- ~~递归逻辑难以追踪和调试~~ **修正**:递归虽是 pseudo-linear(max 2 层),但线性流程仍有日志可读性收益 +- 线性流程更易添加结构化日志(`_log_compression()` 在每步插入日志点) +- stash 已验证线性流程等价覆盖原递归的所有路径 + +### KTD4: ~~config_driven.py 压缩触发用消息条数~~ **(已取消 — 随 U4 取消)** + +~~**决策**:`config_driven.py` 用 `len(rendered_messages) > 6` 作为压缩触发条件(消息条数,非 token 估算)。~~ + +~~**重新决策**:`config_driven.py` 用 `estimate_text_tokens(system_prompt) > max_tokens * 0.8` 作为压缩触发条件(token 估算)。~~ + +> **取消原因**:U4 已在第三轮取消(F-009 — compress() 守卫使单条消息压缩无效)。KTD4 随之失效。 + +## Implementation Units + +### U1. CJK-aware token 估算 + +**Goal**: 新增 `estimate_text_tokens()` 模块级函数,CJK 1:1 / ASCII 4:1 估算;`react.py` 改用此函数。 + +**Requirements**: R1, R2, R7 + +**Dependencies**: 无 + +**Files**: +- `src/agentkit/core/compressor.py` — 新增 `_is_cjk()` + `estimate_text_tokens()` 模块级函数;`estimate_tokens()` 方法内部改用 `estimate_text_tokens`;**`_summarize()` line 152 的内联 `// 4` 也改用 `estimate_text_tokens()`(第三轮 NEW-F10:避免 CJK 摘要路径仍低估 4 倍导致 context overflow)** +- `src/agentkit/core/react.py` — `_should_compress()` 的 `// 4` fallback(line 1721)改用 `estimate_text_tokens()`(**不移除** — HeadroomCompressor 不实现 `should_compress()`,fallback 是 live code) +- `tests/unit/test_context_compressor.py` — 新增 CJK 估算测试 + +**Approach**: +- `_is_cjk(char)`: 检查字符是否在 CJK 统一表意文字(U+4E00-U+9FFF)、假名(U+3040-U+30FF)、韩文谚文(U+AC00-U+D7AF)范围内 +- `estimate_text_tokens(text)`: 遍历字符,CJK 计 1 token,其他按 4:1 估算 +- `estimate_tokens(messages)`: 内部调用 `estimate_text_tokens(str(content))` 求和 +- `_summarize()` line 152: `estimated_tokens = len(conversation_text) // 4` 改为 `estimate_text_tokens(conversation_text)`(第三轮 NEW-F10) +- `react.py._should_compress()`: 已委托给 `compressor.should_compress()`(line 1716-1718);`// 4` fallback(line 1721)**不是死代码** — HeadroomCompressor 不实现 `should_compress()`,fallback 是 live code。改为用 `estimate_text_tokens()` 替换 `// 4`,使 CJK 估算惠及 HeadroomCompressor 用户 + +**Patterns to follow**: 现有 `estimate_tokens()` 方法的签名和返回值约定 + +**Test scenarios**: +- 纯 CJK 文本:`estimate_text_tokens("你好世界")` == 4(1:1) +- 纯 ASCII:`estimate_text_tokens("hello world")` == 2(11 字符 / 4 = 2.75,向下取整) +- 混合文本:CJK 部分 1:1 + ASCII 部分 4:1 +- 日文假名:`estimate_text_tokens("こんにちは")` == 5 +- 韩文谚文:`estimate_text_tokens("안녕하세요")` == 5 +- `estimate_tokens()` 对包含 CJK 的消息列表给出合理估值(不再低估 4 倍) +- `react.py._should_compress()` 对中文长会话正确触发(通过 `compressor.should_compress()` 委托) +- `_summarize()` 对 CJK 文本的预截断正确触发(第三轮 NEW-F10:line 152 改用 `estimate_text_tokens` 后,CJK 摘要路径不再低估 4 倍) + +**Verification**: `pytest tests/unit/test_context_compressor.py -k "cjk or mixed or kana or hangul" -v` 通过 + +--- + +### U2. Prefix 对齐 ~~(已取消)~~ + +**~Goal~**: ~~新增 `align_prefix()` 方法,将 system prompt 中的动态行移到末尾,保持前缀稳定,提升 KV cache 命中率。~~ + +> **取消原因(ce-doc-review 复核第二轮 OQ17/OQ18)**: +> 1. `react.py:1511-1561` 的 `_build_system_message(stable, volatile)` 已实现 stable/volatile 双块分离(Anthropic 用 `cache_control: {"type": "ephemeral"}`,非 Anthropic 用字符串拼接保持 stable 前缀),在 compress 调用前(line 670-674)已执行。U2 提议的 `align_prefix()` 与此高度重叠。 +> 2. grep `当前时间`/`session_id`/timestamp 模式在 system prompt 构造点(`react.py`、`config_driven.py`、`prompts/template.py`)零匹配 — `PromptTemplate.render()` 只做 `${var}` 替换,不注入时间戳/UUID/session_id。U2 要对齐的动态内容在当前代码库中不存在。 + +**Requirements**: ~~R3, R4~~ (已随 U2 取消) + +**Dependencies**: ~~U1(同在 compressor.py,但逻辑独立)~~ + +**Files**: +- `src/agentkit/core/compressor.py` — 新增 `align_prefix(messages)` 方法 +- `tests/unit/test_context_compressor.py` — 新增 `TestAlignPrefix` 测试类 + +**Approach**: +- `align_prefix(messages)`: 遍历 messages,对 `role == "system"` 的消息: + - 识别动态行:匹配 `当前时间`、`2026-07-02` 格式时间戳、UUID(`[0-9a-f]{8}-...`)、`session_id` 等模式 + - 将动态行从原位置移除,追加到 content 末尾的 `[Dynamic Context]` 段落 + - 静态行保持原序构成稳定前缀 +- `compress()` 在执行压缩前首先调用 `align_prefix()` + +**Patterns to follow**: 现有 `compress()` 中 system message 处理逻辑 + +**Test scenarios**: +- 时间戳行移到末尾:含 `当前时间: 2026-07-02` 的 system prompt,对齐后该行在 `[Dynamic Context]` 段 +- UUID 行移到末尾:含 `session_id: abc12345-...` 的 system prompt,对齐后该行在 `[Dynamic Context]` 段 +- 静态 system prompt 不变:无动态行的 system prompt,`align_prefix()` 后内容不变 +- 多条 system message:每条都正确对齐 +- 非 system message 不受影响:user/assistant 消息不变 + +**Verification**: `pytest tests/unit/test_context_compressor.py::TestAlignPrefix -v` 通过 + +--- + +### U3. 压缩逻辑简化 + 结构化日志 + +**Goal**: 重写 `compress()` 为线性流程,移除递归 `_compression_depth`;新增 `_log_compression()` 结构化日志。 + +**Requirements**: R5, R6 + +**Dependencies**: ~~U2(`compress()` 调用 `align_prefix()`)~~ **(U2 已取消,U3 独立)** + +**Files**: +- `src/agentkit/core/compressor.py` — 重写 `compress()`,新增 `_log_compression()`,重写 `_compress_aggressive()` 移除递归参数 +- `tests/unit/test_context_compressor.py` — 新增压缩流程测试 + +**Approach**: +- `compress(messages)` 线性流程: + 1. ~~`align_prefix(messages)` — 对齐 prefix~~ **(U2 已取消,移除此步骤)** + 2. 检查 token 量,若未超阈值直接返回 + 3. 分离 system/old/recent,`_summarize(old)` 生成摘要 + 4. 若仍超阈值,调用 `_compress_aggressive(**original messages**)` — **第三轮修正 F-010**:必须传入 original `messages` 列表(非已压缩的 `compressed`),避免 summary-of-summary 行为变更 + 5. 若仍超阈值,`_truncate()` + 6. `_log_compression()` 记录压缩结果 +- `_log_compression(tokens_before, tokens_after, msg_count, strategy)` — 输出 `INFO` 级日志,包含压缩比 +- `_compress_aggressive(messages)` — 移除 `_compression_depth` 参数,改为只保留最后 1 条消息 + 摘要 + +**Patterns to follow**: 现有 `compress()` 的 system/old/recent 分离逻辑 + +**Test scenarios**: +- 短消息不压缩:`estimate_tokens <= max_tokens` 时返回原消息 +- 长消息触发压缩:超过阈值时生成摘要 + 保留 recent +- 压缩后仍超阈值 → aggressive 压缩:只保留最后 1 条 + 摘要 +- aggressive 后仍超阈值 → truncate:强制截断 +- `_log_compression` 输出结构化日志(可通过 `caplog` 验证) +- `compress()` 不再接受 `_compression_depth` 参数(签名变更) +- `_compress_aggressive` 接收 original `messages`(非已压缩的 `compressed`),避免 summary-of-summary(第三轮 F-010) + +**Verification**: `pytest tests/unit/test_context_compressor.py -k "compress or aggressive or truncate" -v` 通过 + +--- + +### U4. ~~config_driven.py LLM 调用前压缩~~ ~~+ 工具结果压缩辅助~~ **(已取消 — 第三轮 F-009)** + +~~**Goal**: `config_driven.py` 在 LLM 调用前对长 system prompt 执行压缩。~~ + +> **取消原因(ce-doc-review 复核第三轮 F-009/NEW-F9)**: +> 1. `ContextCompressor.compress()` 有守卫 `if len(non_system) <= keep_recent: return messages`(compressor.py:98-99)。对单条 system 消息,`non_system` 为空列表(len=0 ≤ 默认 keep_recent=3),compress() 立即返回不压缩。U4 的核心机制根本不会触发。 +> 2. `rendered_messages` 来自 `PromptTemplate.render()`,总是 ≤ 2 条(1 system + 1 user),即使压缩完整列表也无法触发 compress()。 +> 3. system prompt 是 agent 设计者精心构造的设计时 artifact,运行时 LLM 摘要会降低指令质量。超长 system prompt 应在设计时修正,而非运行时压缩。 +> +> **历史**:第二轮已取消工具结果压缩辅助(OQ19 — HeadroomCompressor 已覆盖)。第三轮取消整个 U4(F-009 — compress() 守卫使 U4 无效)。 + +**Approach**: +- `config_driven.py`: 在 `_handle_direct()` 中,LLM 调用前检查 `estimate_text_tokens(system_prompt)` 是否超过阈值(如 `max_tokens * 0.8`),若是则调用 `compressor.compress([{"role": "system", "content": system_prompt}])`,异常时 `logger.warning` 并继续 +- ~~`compressor.py` 工具结果压缩~~(已取消 — 委托 `HeadroomCompressor`) + +> **触发条件重新设计(OQ2/OQ5/OQ16)**:原 `len(rendered_messages) > 6` 永远不可能为 true(`PromptTemplate.render()` 返回 ≤ 2 条消息)。改为基于 token 估算的单条 system prompt 压缩检查。 + +**Patterns to follow**: 现有 `config_driven.py` 中 `self._compressor` 的使用模式 + +**Test scenarios**: +- `config_driven.py` 直接模式下长 system prompt(token 超阈值)触发压缩 +- `config_driven.py` 直接模式下短 system prompt 不压缩 +- 压缩失败时 warning 且不阻断执行 + +**Verification**: `pytest tests/unit/test_config_driven.py -k "compress" -v` 通过 + +## Scope Boundaries + +### In Scope + +- `src/agentkit/core/compressor.py` 的 CJK 估算 + ~~prefix 对齐~~(已取消)+ 压缩简化 + 日志 + ~~工具结果压缩辅助~~(已取消) +- `src/agentkit/core/react.py` 的 `_should_compress` fallback `// 4` 改用 `estimate_text_tokens()`(fallback 是 live code,服务 HeadroomCompressor) +- ~~`src/agentkit/core/config_driven.py` 的 LLM 调用前压缩(基于 token 估算,非消息条数)~~ **(已取消 — U4 取消)** +- `tests/unit/test_context_compressor.py` 测试覆盖 +- ~~`tests/unit/test_config_driven.py` 压缩测试~~ **(已取消 — U4 取消)** + +### Out of Scope + +- stash 中混入的无关改动:`portal.py`、`chat.ts`、`index.html`、`tauri.conf.json`、`skills/base.py`、`skills-lock.json`、`test_portal_routes.py`、`test_execution_modes.py` +- `test_compression_strategy.py` 的小幅调整(如签名变更导致的 fixture 更新,随 U3 自然处理) +- 引入 tiktoken 或 provider 特定 tokenizer(见 KTD1) +- 跨会话的持久化压缩状态 + +### Deferred to Follow-Up Work + +- 压缩比的运行时监控/告警(需接入 metrics 系统) +- 基于 provider 的 tokenizer 精确估算(KTD1 的升级路径) + +## Risks & Dependencies + +- **风险 1**: ~~`align_prefix()` 的动态行识别模式可能遗漏某些动态内容格式~~ **(已取消 — U2 取消)** +- **风险 2**:`compress()` 签名移除 `_compression_depth` 参数可能破坏外部调用 → 缓解:`_compression_depth` 是内部参数(下划线前缀),无外部调用者 +- **风险 3**: ~~`config_driven.py` 压缩异常时不应阻断主流程~~ **(已取消 — U4 取消)** +- **依赖**:~~U2 依赖 U1(同文件),U3 依赖 U2(`compress` 调用 `align_prefix`),U4 依赖 U1+U3~~ **修正**:U2 已取消;U3 独立(无依赖);~~U4 依赖 U1+U3~~ **U4 已取消** + +## Acceptance Examples + +- **AE1**: 中文长会话(100 条 CJK 消息)的 `estimate_tokens` 返回值 ≥ 旧实现的 4 倍(修正低估) +- **AE2**: ~~含时间戳的 system prompt 经 `align_prefix()` 后,时间戳行在 `[Dynamic Context]` 段,静态行位置不变~~ **(已取消 — U2 取消)** +- **AE3**: `compress()` 不再接受 `_compression_depth` 参数,对超长消息线性执行 compress → aggressive → truncate(修正:移除 align 步骤,U2 已取消) +- **AE4**: `react.py._should_compress()` 对中文会话在合理时机触发(第三轮修正:`// 4` fallback 是 live code 服务 HeadroomCompressor,改为用 `estimate_text_tokens()` 替换 `// 4` 而非移除) +- **AE5**: ~~`config_driven.py` 直接模式下长 system prompt(token 超阈值)时自动调用压缩,压缩失败不阻断~~ **(已取消 — U4 取消)** + +## Open Questions + +> 以下 findings 来自 ce-doc-review(coherence / feasibility / adversarial 三 persona 审查),于实现阶段处理。4 个 safe_auto fixes 已应用(U4 Goal 补 `_build_sampled_output`、R9 横切声明、R8 变量名修正、OQ14 ASCII 估算值修正)。 +> +> **复核第二轮决策结果(自动用最佳判断处理)**: +> - **已解决(16 项)**:OQ1/OQ4/OQ15(函数名修正为 `_should_compress`)、OQ2/OQ5/OQ16(触发条件改为 token 估算)、OQ3/OQ8(KTD1 理由修正)、OQ6/OQ7/OQ9/OQ12/OQ17/OQ18(U2 取消,premise 证伪)、OQ11/OQ19(U4 工具结果压缩辅助取消)、OQ14(safe_auto)、OQ20(KTD3 理由修正,维持 U3) +> - **仍开放(4 项)**:OQ10(aggressive 压缩质量退化)、OQ13(CJK 1:1 模型差异,FYI) +> +> **复核第三轮决策结果(自动用最佳判断处理)**: +> - **已应用(5 项)**: +> - NEW-F13(safe_auto):KTD1 修正 — `litellm.token_counter` 是纯本地函数,不调用网络 API +> - F-008(P0 反转):`// 4` fallback 是 live code(HeadroomCompressor 不实现 `should_compress()`)→ R7 改为"用 `estimate_text_tokens()` 替换 `// 4`"而非"移除死代码" +> - F-009(P0):U4 全部取消 — `compress()` 守卫使单条 system 消息压缩无效;R8/KTD4/AE5/Scope Boundaries 同步取消 +> - F-010(gated_auto):U3 Approach step 4 明确 `_compress_aggressive` 接收 original `messages`(非已压缩的 compressed) +> - NEW-F10(gated_auto):U1 scope 扩展 — `_summarize()` line 152 的内联 `// 4` 也改用 `estimate_text_tokens()` +> - **仍开放(3 项)**:OQ10(aggressive 压缩质量退化)、OQ13(CJK 1:1 模型差异,FYI)、OQ21(`_truncate()` `* 4` 一致性,P2 manual) + +### P0 — 阻断性(实现前必须解决) + +**OQ1** (feasibility, P0): U1/R7 引用的 `react.py._needs_incremental_compression()` **不存在**。实际函数是 `_should_compress()`(line 1705),已委托给 `compressor.should_compress()`(line 1716-1718),`// 4` fallback(line 1721)是死代码。 +- **需决策**: 修改 R7/U1 改为 `_should_compress()` 并移除死代码 fallback?还是从 scope 移除 R7(react.py 已委托给 compressor,U1 改 compressor 即可覆盖)? + +**OQ2** (feasibility, P0): U4/R8 引用的 `config_driven.py._execute_direct()` **不存在**。实际方法是 `_handle_direct()`(line 1066)。且 `rendered_messages` 来自 `PromptTemplate.render()`,总是 ≤ 2 条(1 system + 1 user),`len(rendered_messages) > 6` **永远不可能为 true**。 +- **需决策**: U4 的压缩触发条件需重新设计。是在 `_handle_direct()` 内对单条 system prompt 做基于 token 的压缩?还是压缩逻辑应放在其他位置(如 `PromptTemplate.render()` 之前)? + +### P1 — 重要(实现时需处理) + +**OQ3** (adversarial, P1): KTD1 前提虚假。声称"项目无 tiktoken 依赖",但 `litellm>=1.50` 已是直接依赖(`pyproject.toml` line 30),提供 `litellm.token_counter(model, messages)` API 可精确估算 token。 +- **需决策**: 是否改用 `litellm.token_counter` 替代启发式?或维持启发式但修正 KTD1 理由(理由改为"避免 litellm 调用开销/兼容性",而非"无依赖")? + +**OQ4** (adversarial, P1): R7 引用不存在的函数(同 OQ1)。 + +**OQ5** (adversarial, P1): KTD4 错误指标 — `len(rendered_messages) > 6` 永远不可能为 true(同 OQ2)。 + +**OQ6** (feasibility, P1): KV cache benefit 只在 `compress()` 触发时实现。大多数请求不触发 compress(因为未超阈值),此时 `align_prefix()` 不会被调用 → prefix 对齐的实际收益被高估。 +- **需决策**: 是否将 `align_prefix()` 提前到每次请求都执行(独立于 compress)?还是接受"只在 compress 时对齐"的有限收益? + +**OQ7** (feasibility, P1): 动态内容 premise 未在代码库中验证。Plan 假设 system prompt 含时间戳/UUID/session_id,但需确认这些动态行是否真的存在于当前 system prompt 构造逻辑中。 +- **需决策**: 实现阶段先 grep 验证 system prompt 构造点,确认动态内容确实存在再实现 U2。 + +### P2 — 值得修复(实现时酌情处理) + +**OQ8** (feasibility, P2): `align_prefix()` 幂等性。多次调用 `align_prefix()` 不应改变结果(避免 `[Dynamic Context]` 段重复追加)。需在测试场景中加幂等性测试。 + +**OQ9** (adversarial, P2): `align_prefix()` 正则误判风险。用户消息中可能含时间戳格式文本(如"会议定于 2026-07-02"),被误移到末尾。需限定只对 system message 的特定行(如"当前时间:"前缀)匹配。 + +**OQ10** (adversarial, P2): aggressive 压缩质量退化。`_compress_aggressive()` 只保留最后 1 条 + 摘要,可能丢失关键上下文。需评估是否保留更多 recent 消息(如最后 2-3 条)。 + +**OQ11** (adversarial, P2): U4 范围蔓延。U4 同时包含 config_driven 压缩 + 4 个工具结果压缩辅助方法,scope 较大。考虑是否拆分为 U4a(config_driven 压缩)+ U4b(工具结果压缩辅助)。 + +**OQ12** (adversarial, P2): 动态内容前提未验证(同 OQ7,adversarial 视角)。 + +### P3 — FYI(知情即可) + +**OQ13** (adversarial, P3): CJK 1:1 估算的模型差异。不同模型(GPT-4/Claude/Gemini)对 CJK 的实际 token 比例略有差异(0.8-1.2 之间),1:1 是平均值。headroom_threshold=0.8 的缓冲可吸收此差异。 + +### Coherence — 结构性(gated_auto/manual) + +**OQ14** (coherence, ~~gated_auto~~ safe_auto, **已解决**): U1 Test scenarios 中 `estimate_text_tokens("hello world")` == 2 或 3 不确定 — 应明确 ASCII 11 字符 / 4 = 2.75,向下取整为 2。**已修正为 == 2(11 字符 / 4 = 2.75,向下取整)**。 + +**OQ15** (coherence, manual): AE4 仍引用 `_needs_incremental_compression()`(同 OQ1),需随 OQ1 决策同步修正。 + +**OQ16** (coherence, manual): AE5 仍写">6 条消息触发",但 `rendered_messages` ≤ 2(同 OQ2),需随 OQ2 决策同步修正。 + +### 复核第二轮新增 — P0(阻断性) + +**OQ17** (feasibility+adversarial, P0, **新**): U2 与现有 `_build_system_message()` 功能重复。`react.py:1511-1561` 已实现 stable/volatile 双块分离(Anthropic 用 `cache_control: {"type": "ephemeral"}`,非 Anthropic 用字符串拼接保持 stable 前缀),在 compress 调用前(line 670-674)已执行。U2 提议的 `align_prefix()` 与此高度重叠。 +- **需决策**: 取消 U2?还是将 U2 改写为 spike(调研 `_build_system_message()` 是否已覆盖所有动态内容场景,若覆盖则删除 U2)? + +**OQ18** (feasibility+adversarial, P0, **新**): U2 核心前提被证伪。grep `当前时间`/`session_id`/timestamp 模式在 system prompt 构造点(`react.py`、`config_driven.py`、`prompts/template.py`)零匹配 — `PromptTemplate.render()` 只做 `${var}` 替换,不注入时间戳/UUID/session_id。U2 要对齐的动态内容在当前代码库中不存在。 +- **需决策**: 取消 U2?或保留 U2 作为防御性设计(未来可能添加动态内容)? + +**OQ19** (feasibility+adversarial, P0, **新**): U4 工具结果压缩辅助与 `HeadroomCompressor.compress_tool_result()` 重叠。`headroom_compressor.py:126-157` 已实现成熟版本(content type 检测、SmartCrusher/CodeCompressor 路由、CCR hash 存储、`min_length=500` 阈值、异常 fallback)。U4 提议的 `_compress_json_array`/`_compress_json_object`/`_compress_text`/`_build_sampled_output` 会与之竞争或重复造轮子。且 `compressor.py:237-239` 的 `compress_tool_result` 已是 no-op。 +- **需决策**: 取消 U4 工具结果压缩辅助部分?还是改为委托 `HeadroomCompressor`? + +### 复核第二轮新增 — P1(重要) + +**OQ20** (adversarial, P1, **新**): KTD3 前提薄弱。`compress()` 的递归 `_compression_depth` 实际是 pseudo-linear(最多 2 层递归后强制 truncate,line 118-129),并非真正的复杂递归。"递归难以追踪"的理由不够充分,线性化的收益被高估。 +- **需决策**: 维持 KTD3(线性化仍有日志可读性收益)?还是删除 U3 的线性化部分,仅保留 `_log_compression()`? + +### 复核第三轮新增 — P2(值得修复) + +**OQ21** (adversarial, P2, **新**): `_truncate()` 的 `* 4` 字符假设与 U1 的 CJK 估算逻辑不一致。`compressor.py:232-233` 中 `_truncate()` 用 `target_tokens * 4` 计算字符数(假设 4 字符=1 token),对 CJK 文本会截断过多(CJK 1 字符 ≈ 1 token,按 `* 4` 计算的字符数只够 1/4 的 CJK token)。 +- **当前状态**: U1 scope 已扩展到 `estimate_tokens()` 方法 + `_summarize()` line 152,但 `_truncate()` 的 `* 4` 未纳入。 +- **需决策**: 实现阶段评估是否将 `_truncate()` 也改用 CJK-aware 估算(基于 `estimate_text_tokens` 反推字符数),或保留 `* 4` 作为 truncate 路径的保守下界(截断过多总比超出 context window 安全)。第三轮建议:**保留 `* 4` 作为 manual 跟进**,因为它在 truncate 兜底路径上,保守截断是安全的;纳入 U1 scope 会让 U1 边界扩张到 truncate 路径,与"压缩触发时机"的核心目标偏离。 diff --git a/src/agentkit/core/compressor.py b/src/agentkit/core/compressor.py index e0d2a90..3d9ff2b 100644 --- a/src/agentkit/core/compressor.py +++ b/src/agentkit/core/compressor.py @@ -12,6 +12,40 @@ from typing import Any, Protocol, runtime_checkable logger = logging.getLogger(__name__) +def _is_cjk(char: str) -> bool: + """Check if a character is CJK (1 token ≈ 1 char). + + Covers CJK Unified Ideographs, Hiragana, Katakana, and Hangul Syllables. + """ + cp = ord(char) + return ( + 0x4E00 <= cp <= 0x9FFF # CJK Unified Ideographs + or 0x3040 <= cp <= 0x30FF # Hiragana + Katakana + or 0xAC00 <= cp <= 0xD7AF # Hangul Syllables + ) + + +def estimate_text_tokens(text: str) -> int: + """Estimate token count: CJK 1:1, other characters 4:1. + + CJK characters typically tokenize to ~1 token per character, while + ASCII/Latin text averages ~4 chars per token. Avoids the 4x + underestimation that ``len(text) // 4`` produces for CJK conversations. + + ponytail ceiling: pure CJK may still underestimate ~10-20%, but + headroom_threshold=0.8 absorbs this. Upgrade path: litellm.token_counter + or provider-specific tokenizer. + """ + cjk_count = 0 + non_cjk_count = 0 + for char in text: + if _is_cjk(char): + cjk_count += 1 + else: + non_cjk_count += 1 + return cjk_count + non_cjk_count // 4 + + @runtime_checkable class CompressionStrategy(Protocol): """压缩策略协议 — 所有压缩器必须实现此接口""" @@ -73,22 +107,21 @@ class ContextCompressor: return False def estimate_tokens(self, messages: list[dict]) -> int: - """Estimate total tokens in message list (rough: 4 chars = 1 token)""" + """Estimate total tokens in message list (CJK 1:1, ASCII 4:1)""" total = 0 for msg in messages: content = msg.get("content", "") - total += len(str(content)) // 4 + total += estimate_text_tokens(str(content)) return total - async def compress(self, messages: list[dict], _compression_depth: int = 0) -> list[dict]: - """Compress messages if they exceed token budget + async def compress(self, messages: list[dict]) -> list[dict]: + """Compress messages if they exceed token budget. - Strategy: - 1. Keep system messages unchanged - 2. Keep the most recent N messages unchanged - 3. Compress older messages into a summary using LLM + Linear flow: summarize -> aggressive -> truncate. + Each step only fires if the previous didn't bring tokens under budget. """ - if self.estimate_tokens(messages) <= self._max_tokens: + tokens_before = self.estimate_tokens(messages) + if tokens_before <= self._max_tokens: return messages # Separate system messages, old messages, and recent messages @@ -101,10 +134,8 @@ class ContextCompressor: old_msgs = non_system[: -self._keep_recent] recent_msgs = non_system[-self._keep_recent :] - # Compress old messages + # Step 1: Summarize old messages summary = await self._summarize(old_msgs) - - # Build compressed message list compressed = list(system_msgs) if summary: compressed.append( @@ -115,21 +146,44 @@ class ContextCompressor: ) compressed.extend(recent_msgs) - # Recursive check: if still over budget, compress again + # Step 2: If still over budget, aggressive compress + # F-010: pass original `messages` (not `compressed`) to avoid summary-of-summary + strategy = "summary" if self.estimate_tokens(compressed) > self._max_tokens: - if _compression_depth >= 1: - # Depth guard: force truncation instead of infinite recursion - return self._truncate(compressed) - if len(recent_msgs) > 1: - # Try keeping fewer recent messages - return await self._compress_aggressive( - messages, _compression_depth=_compression_depth + 1 - ) - # Last resort: truncate - return self._truncate(compressed) + compressed = await self._compress_aggressive(messages) + strategy = "aggressive" + + # Step 3: If still over budget, truncate as last resort + if self.estimate_tokens(compressed) > self._max_tokens: + compressed = self._truncate(compressed) + strategy = "truncate" + + # Step 4: Log compression result + tokens_after = self.estimate_tokens(compressed) + self._log_compression(tokens_before, tokens_after, len(messages), len(compressed), strategy) return compressed + def _log_compression( + self, + tokens_before: int, + tokens_after: int, + msg_count_before: int, + msg_count_after: int, + strategy: str, + ) -> None: + """Log structured compression info (tokens_before/after/ratio/msg_count).""" + ratio = tokens_after / tokens_before if tokens_before > 0 else 0.0 + logger.info( + "context compressed: %d -> %d tokens (%.1f%%), messages: %d -> %d, strategy: %s", + tokens_before, + tokens_after, + ratio * 100, + msg_count_before, + msg_count_after, + strategy, + ) + async def _summarize(self, messages: list[dict], max_input_tokens: int = 3200) -> str: """Summarize a list of messages using LLM. @@ -149,7 +203,7 @@ class ContextCompressor: ) # Pre-truncate if conversation_text exceeds safe token threshold - estimated_tokens = len(conversation_text) // 4 + estimated_tokens = estimate_text_tokens(conversation_text) if estimated_tokens > max_input_tokens: max_chars = max_input_tokens * 4 conversation_text = conversation_text[:max_chars] + "\n...[truncated]" @@ -201,10 +255,8 @@ class ContextCompressor: parts.append(f"[{role}]: {content}...") return "\n".join(parts) - async def _compress_aggressive( - self, messages: list[dict], _compression_depth: int = 0 - ) -> list[dict]: - """More aggressive compression when standard compression isn't enough""" + async def _compress_aggressive(self, messages: list[dict]) -> list[dict]: + """Aggressive compression: keep only last message + summary of the rest.""" system_msgs = [m for m in messages if m.get("role") == "system"] non_system = [m for m in messages if m.get("role") != "system"] diff --git a/src/agentkit/core/react.py b/src/agentkit/core/react.py index 2f71f7e..836c5ee 100644 --- a/src/agentkit/core/react.py +++ b/src/agentkit/core/react.py @@ -15,8 +15,14 @@ from dataclasses import dataclass, field from datetime import datetime, timezone from typing import TYPE_CHECKING, Awaitable, Callable -from agentkit.core.exceptions import LLMProviderError, LoopDetectedError, TaskCancelledError, TaskTimeoutError +from agentkit.core.exceptions import ( + LLMProviderError, + LoopDetectedError, + TaskCancelledError, + TaskTimeoutError, +) from agentkit.core.protocol import CancellationToken +from agentkit.core.compressor import estimate_text_tokens from agentkit.llm.gateway import LLMGateway from agentkit.llm.protocol import LLMResponse from agentkit.tools.base import Tool, ToolValidationError @@ -1072,7 +1078,12 @@ class ReActEngine: clean_args["_skip_dangerous_check"] = True try: tool_result = await tool.safe_execute(**clean_args) - except (ToolValidationError, ValueError, TypeError, RuntimeError) as e: + except ( + ToolValidationError, + ValueError, + TypeError, + RuntimeError, + ) as e: tool_result = { "error": f"Tool '{tc.name}' execution failed: {e}", "error_code": "tool_execution_failed", @@ -1090,7 +1101,12 @@ class ReActEngine: if tool else {"error": f"Tool '{tc.name}' not found"} ) - except (ToolValidationError, ValueError, TypeError, RuntimeError) as e: + except ( + ToolValidationError, + ValueError, + TypeError, + RuntimeError, + ) as e: tool_result = { "error": f"Tool '{tc.name}' execution failed: {e}", "error_code": "tool_execution_failed", @@ -1154,7 +1170,12 @@ class ReActEngine: if self._should_compress(conversation, compressor): try: conversation = await compressor.compress(conversation) - except (asyncio.TimeoutError, ConnectionError, LLMProviderError, RuntimeError) as e: + except ( + asyncio.TimeoutError, + ConnectionError, + LLMProviderError, + RuntimeError, + ) as e: logger.warning(f"Incremental compression failed: {e}") else: @@ -1225,7 +1246,12 @@ class ReActEngine: if self._should_compress(conversation, compressor): try: conversation = await compressor.compress(conversation) - except (asyncio.TimeoutError, ConnectionError, LLMProviderError, RuntimeError) as e: + except ( + asyncio.TimeoutError, + ConnectionError, + LLMProviderError, + RuntimeError, + ) as e: logger.warning(f"Incremental compression failed: {e}") else: # ponytail: 检查是否为畸形工具调用(含 但解析失败) @@ -1340,7 +1366,12 @@ class ReActEngine: reinjections, ) break - except (asyncio.TimeoutError, ConnectionError, LLMProviderError, RuntimeError) as e: + except ( + asyncio.TimeoutError, + ConnectionError, + LLMProviderError, + RuntimeError, + ) as e: logger.warning(f"Verification loop failed: {e}") # Yield final_answer event (legacy format for execute_stream consumers) @@ -1717,8 +1748,10 @@ class ReActEngine: if should_compress_fn is not None: return should_compress_fn(conversation) # Fallback: fixed threshold for compressors without headroom support - total_chars = sum(len(str(m.get("content", ""))) for m in conversation) - estimated_tokens = total_chars // 4 + # (e.g. HeadroomCompressor which doesn't implement should_compress) + estimated_tokens = sum( + estimate_text_tokens(str(m.get("content", ""))) for m in conversation + ) return estimated_tokens > self._DEFAULT_COMPRESS_THRESHOLD async def _build_tool_result_message( diff --git a/tests/unit/test_context_compressor.py b/tests/unit/test_context_compressor.py index 5973b7c..d696950 100644 --- a/tests/unit/test_context_compressor.py +++ b/tests/unit/test_context_compressor.py @@ -1,10 +1,10 @@ """Tests for ContextCompressor and PromptTemplate cache""" +import inspect +import logging from unittest.mock import AsyncMock, MagicMock -import pytest - -from agentkit.core.compressor import ContextCompressor +from agentkit.core.compressor import ContextCompressor, estimate_text_tokens from agentkit.llm.protocol import LLMResponse, TokenUsage from agentkit.prompts.section import PromptSection from agentkit.prompts.template import PromptTemplate @@ -31,14 +31,18 @@ def make_long_messages(count: int = 10, content_length: int = 2000) -> list[dict """生成长消息列表用于测试压缩""" messages = [{"role": "system", "content": "You are a helpful assistant."}] for i in range(count): - messages.append({ - "role": "user", - "content": "x" * content_length + f" message {i}", - }) - messages.append({ - "role": "assistant", - "content": "y" * content_length + f" reply {i}", - }) + messages.append( + { + "role": "user", + "content": "x" * content_length + f" message {i}", + } + ) + messages.append( + { + "role": "assistant", + "content": "y" * content_length + f" reply {i}", + } + ) return messages @@ -73,6 +77,56 @@ class TestEstimateTokens: assert compressor.estimate_tokens(messages) == 0 +class TestEstimateTextTokensCJK: + """estimate_text_tokens CJK 估算测试 (U1)""" + + def test_pure_cjk_chinese(self): + # 4 CJK chars = 4 tokens (1:1) + assert estimate_text_tokens("你好世界") == 4 + + def test_pure_ascii(self): + # 11 chars / 4 = 2.75, floor = 2 + assert estimate_text_tokens("hello world") == 2 + + def test_pure_cjk_japanese_kana(self): + # 5 Hiragana chars = 5 tokens (1:1) + assert estimate_text_tokens("こんにちは") == 5 + + def test_pure_cjk_korean_hangul(self): + # 5 Hangul chars = 5 tokens (1:1) + assert estimate_text_tokens("안녕하세요") == 5 + + def test_mixed_cjk_and_ascii(self): + # "你好" (2 CJK = 2 tokens) + " world" (6 ASCII = 1 token) = 3 + assert estimate_text_tokens("你好 world") == 3 + + def test_empty_string(self): + assert estimate_text_tokens("") == 0 + + def test_estimate_tokens_with_cjk_messages(self): + """estimate_tokens() 对 CJK 消息不再低估 4 倍""" + compressor = ContextCompressor() + messages = [{"role": "user", "content": "你好世界"}] # 4 CJK = 4 tokens + assert compressor.estimate_tokens(messages) == 4 + + def test_estimate_tokens_mixed_messages(self): + """estimate_tokens() 对混合消息给出合理估值""" + compressor = ContextCompressor() + messages = [ + {"role": "user", "content": "你好"}, # 2 CJK = 2 + {"role": "assistant", "content": "hello"}, # 5 ASCII = 1 + ] + assert compressor.estimate_tokens(messages) == 3 + + def test_cjk_not_underestimated_4x(self): + """AE1: 100 条 CJK 消息的 estimate_tokens >= 旧实现的 4 倍""" + compressor = ContextCompressor() + cjk_msg = [{"role": "user", "content": "你好" * 50}] # 100 CJK chars + new_estimate = compressor.estimate_tokens(cjk_msg) + old_estimate = len("你好" * 50) // 4 # old: len // 4 + assert new_estimate >= old_estimate * 4 + + class TestNoCompressionWhenUnderBudget: """Token 预算内不压缩""" @@ -181,7 +235,8 @@ class TestSummaryGenerationWithLLM: gateway.chat.assert_called_once() # 摘要应出现在结果中 summary_msgs = [ - m for m in result + m + for m in result if m.get("role") == "system" and "Conversation Summary" in m.get("content", "") ] assert len(summary_msgs) == 1 @@ -207,7 +262,8 @@ class TestFallbackToSimpleSummary: # 应该有摘要消息(简单截断模式) summary_msgs = [ - m for m in result + m + for m in result if m.get("role") == "system" and "Conversation Summary" in m.get("content", "") ] assert len(summary_msgs) == 1 @@ -232,7 +288,8 @@ class TestFallbackToSimpleSummary: # 应该有摘要消息(回退到简单摘要) summary_msgs = [ - m for m in result + m + for m in result if m.get("role") == "system" and "Conversation Summary" in m.get("content", "") ] assert len(summary_msgs) == 1 @@ -292,6 +349,167 @@ class TestTruncation: assert result[0]["content"] == "Short message" +class TestCompressLinearFlow: + """U3: compress() 线性流程 + 签名变更测试""" + + def test_compress_signature_no_compression_depth(self): + """compress() 不再接受 _compression_depth 参数""" + sig = inspect.signature(ContextCompressor.compress) + assert "_compression_depth" not in sig.parameters + + def test_compress_aggressive_signature_no_compression_depth(self): + """_compress_aggressive() 不再接受 _compression_depth 参数""" + sig = inspect.signature(ContextCompressor._compress_aggressive) + assert "_compression_depth" not in sig.parameters + + async def test_short_messages_not_compressed_linear(self): + """短消息不压缩(线性流程验证)""" + compressor = ContextCompressor(max_tokens=10000) + messages = [ + {"role": "system", "content": "You are helpful."}, + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there!"}, + ] + result = await compressor.compress(messages) + assert result == messages + + async def test_aggressive_receives_original_messages(self): + """F-010: _compress_aggressive 接收 original messages, 非 compressed""" + # First summary is very long (triggers aggressive), second is short + long_summary = LLMResponse( + content="x" * 5000, + model="test", + usage=TokenUsage(prompt_tokens=10, completion_tokens=10), + ) + short_summary = LLMResponse( + content="short summary", + model="test", + usage=TokenUsage(prompt_tokens=10, completion_tokens=10), + ) + gateway = MagicMock() + gateway.chat = AsyncMock(side_effect=[long_summary, short_summary]) + compressor = ContextCompressor( + llm_gateway=gateway, + max_tokens=10, + keep_recent=2, + ) + messages = [ + {"role": "user", "content": "ORIGINAL_MARKER_a" * 2000}, + {"role": "assistant", "content": "ORIGINAL_MARKER_b" * 2000}, + {"role": "user", "content": "Recent"}, + {"role": "assistant", "content": "Reply"}, + ] + await compressor.compress(messages) + + # Second call (aggressive) should receive original message content, + # not the first summary ("x" * 5000) + assert gateway.chat.call_count == 2 + second_call_content = gateway.chat.call_args_list[1].kwargs["messages"][0]["content"] + assert "ORIGINAL_MARKER" in second_call_content + # First summary content should NOT appear in the aggressive call + assert "xxxx" not in second_call_content + + async def test_truncate_triggered_when_aggressive_insufficient(self): + """aggressive 后仍超阈值 → truncate 强制截断""" + # Both summaries are very long, forcing truncate as last resort + long_summary = LLMResponse( + content="z" * 5000, + model="test", + usage=TokenUsage(prompt_tokens=10, completion_tokens=10), + ) + gateway = MagicMock() + gateway.chat = AsyncMock(side_effect=[long_summary, long_summary]) + compressor = ContextCompressor( + llm_gateway=gateway, + max_tokens=10, + keep_recent=2, + ) + messages = [ + {"role": "user", "content": "a" * 5000}, + {"role": "assistant", "content": "b" * 5000}, + {"role": "user", "content": "c" * 5000}, + {"role": "assistant", "content": "d" * 5000}, + {"role": "user", "content": "Recent"}, + {"role": "assistant", "content": "Reply"}, + ] + result = await compressor.compress(messages) + # Truncate should have cut message content + total_chars = sum(len(str(m.get("content", ""))) for m in result) + assert total_chars < sum(len(str(m.get("content", ""))) for m in messages) + + +class TestCompressionLogging: + """U3: _log_compression 结构化日志测试""" + + async def test_log_compression_outputs_structured_info(self, caplog): + """_log_compression 输出结构化日志(包含 tokens/ratio/strategy)""" + gateway = make_mock_gateway("Summary") + compressor = ContextCompressor( + llm_gateway=gateway, + max_tokens=100, + keep_recent=2, + ) + messages = [ + {"role": "user", "content": "a" * 2000}, + {"role": "assistant", "content": "b" * 2000}, + {"role": "user", "content": "Recent"}, + {"role": "assistant", "content": "Reply"}, + ] + with caplog.at_level(logging.INFO, logger="agentkit.core.compressor"): + await compressor.compress(messages) + + # 验证结构化日志包含压缩信息 + log_messages = [record.message for record in caplog.records] + assert any("context compressed" in msg for msg in log_messages) + assert any("strategy: summary" in msg for msg in log_messages) + # 日志应包含 token 数量和消息数量 + assert any("tokens" in msg for msg in log_messages) + assert any("messages:" in msg for msg in log_messages) + + async def test_no_log_when_not_compressed(self, caplog): + """未触发压缩时不输出日志""" + compressor = ContextCompressor(max_tokens=10000) + messages = [ + {"role": "user", "content": "Hello"}, + ] + with caplog.at_level(logging.INFO, logger="agentkit.core.compressor"): + await compressor.compress(messages) + + log_messages = [record.message for record in caplog.records] + assert not any("context compressed" in msg for msg in log_messages) + + async def test_log_strategy_aggressive(self, caplog): + """压缩策略为 aggressive 时日志记录正确""" + long_summary = LLMResponse( + content="x" * 5000, + model="test", + usage=TokenUsage(prompt_tokens=10, completion_tokens=10), + ) + short_summary = LLMResponse( + content="short", + model="test", + usage=TokenUsage(prompt_tokens=10, completion_tokens=10), + ) + gateway = MagicMock() + gateway.chat = AsyncMock(side_effect=[long_summary, short_summary]) + compressor = ContextCompressor( + llm_gateway=gateway, + max_tokens=10, + keep_recent=2, + ) + messages = [ + {"role": "user", "content": "a" * 2000}, + {"role": "assistant", "content": "b" * 2000}, + {"role": "user", "content": "Recent"}, + {"role": "assistant", "content": "Reply"}, + ] + with caplog.at_level(logging.INFO, logger="agentkit.core.compressor"): + await compressor.compress(messages) + + log_messages = [record.message for record in caplog.records] + assert any("strategy: aggressive" in msg for msg in log_messages) + + class TestNotEnoughMessagesToCompress: """消息数量不足时跳过压缩""" @@ -397,11 +615,13 @@ class TestReActEngineWithCompressor: from agentkit.llm.protocol import LLMResponse, TokenUsage gateway = MagicMock() - gateway.chat = AsyncMock(return_value=LLMResponse( - content="Final answer", - model="test", - usage=TokenUsage(prompt_tokens=10, completion_tokens=10), - )) + gateway.chat = AsyncMock( + return_value=LLMResponse( + content="Final answer", + model="test", + usage=TokenUsage(prompt_tokens=10, completion_tokens=10), + ) + ) compressor = ContextCompressor(max_tokens=10000) engine = ReActEngine(llm_gateway=gateway) @@ -418,11 +638,13 @@ class TestReActEngineWithCompressor: from agentkit.llm.protocol import LLMResponse, TokenUsage gateway = MagicMock() - gateway.chat = AsyncMock(return_value=LLMResponse( - content="Answer", - model="test", - usage=TokenUsage(prompt_tokens=10, completion_tokens=10), - )) + gateway.chat = AsyncMock( + return_value=LLMResponse( + content="Answer", + model="test", + usage=TokenUsage(prompt_tokens=10, completion_tokens=10), + ) + ) engine = ReActEngine(llm_gateway=gateway) -- 2.43.0 From 3a05c4d1e67865d7e95f85d62491d1d39cc56fe4 Mon Sep 17 00:00:00 2001 From: chiguyong Date: Fri, 3 Jul 2026 08:03:06 +0800 Subject: [PATCH 2/3] fix(review): CJK pre-truncate budget + simplify estimate_tokens + test gaps Apply 4 ce-code-review findings: - P1: _summarize() max_chars = max_input_tokens (was * 4, allowed 4x CJK budget) - P1: add test_summarize_cjk_pre_truncation (CJK truncation coverage) - P2: add test_should_compress_cjk_fallback_path (react.py fallback coverage) - P3: strengthen truncate test assertion (verify marker, not just length) Also apply ce-simplify-code: estimate_tokens() -> sum() generator one-liner. Tests: 99 passed. Ruff: clean. --- src/agentkit/core/compressor.py | 11 +++--- tests/unit/test_context_compressor.py | 55 +++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 6 deletions(-) diff --git a/src/agentkit/core/compressor.py b/src/agentkit/core/compressor.py index 3d9ff2b..e5e5cbe 100644 --- a/src/agentkit/core/compressor.py +++ b/src/agentkit/core/compressor.py @@ -108,11 +108,7 @@ class ContextCompressor: def estimate_tokens(self, messages: list[dict]) -> int: """Estimate total tokens in message list (CJK 1:1, ASCII 4:1)""" - total = 0 - for msg in messages: - content = msg.get("content", "") - total += estimate_text_tokens(str(content)) - return total + return sum(estimate_text_tokens(str(m.get("content", ""))) for m in messages) async def compress(self, messages: list[dict]) -> list[dict]: """Compress messages if they exceed token budget. @@ -205,7 +201,10 @@ class ContextCompressor: # Pre-truncate if conversation_text exceeds safe token threshold estimated_tokens = estimate_text_tokens(conversation_text) if estimated_tokens > max_input_tokens: - max_chars = max_input_tokens * 4 + # CJK-aware char limit: max_input_tokens chars is exact for CJK (1:1), + # conservative for ASCII (4:1, truncates to 1/4 budget but safe). + # Review fix #1: old `* 4` allowed 4x token budget for CJK text. + max_chars = max_input_tokens conversation_text = conversation_text[:max_chars] + "\n...[truncated]" prompt = ( diff --git a/tests/unit/test_context_compressor.py b/tests/unit/test_context_compressor.py index d696950..e6adf6f 100644 --- a/tests/unit/test_context_compressor.py +++ b/tests/unit/test_context_compressor.py @@ -126,6 +126,29 @@ class TestEstimateTextTokensCJK: old_estimate = len("你好" * 50) // 4 # old: len // 4 assert new_estimate >= old_estimate * 4 + async def test_summarize_cjk_pre_truncation(self): + """Review fix #2: _summarize() CJK 文本预截断正确触发 + + 构造 CJK 文本使 estimate_text_tokens > max_input_tokens 但 + len(text) < max_input_tokens * 4(验证旧 bug:* 4 假设允许 4x 超预算) + """ + gateway = make_mock_gateway("Summary result") + compressor = ContextCompressor(llm_gateway=gateway) + # 4000 CJK chars = 4000 tokens (1:1), > max_input_tokens=3200 + # But len=4000 < 3200 * 4 = 12800, so old `* 4` limit wouldn't truncate + cjk_content = "你" * 4000 + messages = [{"role": "user", "content": cjk_content}] + await compressor._summarize(messages, max_input_tokens=3200) + + # Verify LLM was called with truncated text (not full 4000 chars) + call_messages = gateway.chat.call_args.kwargs["messages"] + prompt_content = call_messages[0]["content"] + # The conversation_text in the prompt should be truncated to <= 3200 chars + # (plus truncation marker), not the full 4000 chars + assert "...[truncated]" in prompt_content + # Verify the CJK content was actually shortened + assert prompt_content.count("你") < 4000 + class TestNoCompressionWhenUnderBudget: """Token 预算内不压缩""" @@ -436,6 +459,8 @@ class TestCompressLinearFlow: # Truncate should have cut message content total_chars = sum(len(str(m.get("content", ""))) for m in result) assert total_chars < sum(len(str(m.get("content", ""))) for m in messages) + # Review fix #7: verify truncate actually triggered via truncation marker + assert any("...[truncated]" in str(m.get("content", "")) for m in result) class TestCompressionLogging: @@ -654,3 +679,33 @@ class TestReActEngineWithCompressor: ) assert result.output == "Answer" + + async def test_should_compress_cjk_fallback_path(self): + """Review fix #5: _should_compress() CJK fallback for compressors + without should_compress() method (e.g. HeadroomCompressor) + + Verifies R7: react.py fallback uses estimate_text_tokens, so CJK + long conversations correctly trigger compression. + """ + from agentkit.core.react import ReActEngine + + gateway = MagicMock() + engine = ReActEngine(llm_gateway=gateway) + + # Mock compressor WITHOUT should_compress() method + # (simulates HeadroomCompressor which doesn't implement it) + mock_compressor = MagicMock(spec=["is_available", "compress", "compress_tool_result"]) + mock_compressor.is_available.return_value = True + + # CJK long conversation: 10000 CJK chars = 10000 tokens > 8000 threshold + cjk_conversation = [ + {"role": "user", "content": "你" * 5000}, + {"role": "assistant", "content": "好" * 5000}, + ] + result = engine._should_compress(cjk_conversation, mock_compressor) + assert result is True + + # ASCII short conversation should not trigger + ascii_short = [{"role": "user", "content": "Hello"}] + result = engine._should_compress(ascii_short, mock_compressor) + assert result is False -- 2.43.0 From 027f7909aa00faf1719ff82b657b8cd69d9ac843 Mon Sep 17 00:00:00 2001 From: chiguyong Date: Fri, 3 Jul 2026 09:40:09 +0800 Subject: [PATCH 3/3] docs(solutions): CJK token estimation undercount fix Document the ContextCompressor CJK 4x underestimation bug and fix: - estimate_text_tokens() CJK 1:1 / ASCII 4:1 heuristic - _summarize() max_chars budget fix (P1: was * 4, allowed 4x CJK budget) - Linear compress flow + structured logging - Prevention: charset-aware heuristics, audit dependent truncation points --- ...context-compressor-cjk-token-estimation.md | 152 ++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 docs/solutions/logic-errors/context-compressor-cjk-token-estimation.md diff --git a/docs/solutions/logic-errors/context-compressor-cjk-token-estimation.md b/docs/solutions/logic-errors/context-compressor-cjk-token-estimation.md new file mode 100644 index 0000000..884d03f --- /dev/null +++ b/docs/solutions/logic-errors/context-compressor-cjk-token-estimation.md @@ -0,0 +1,152 @@ +--- +title: "ContextCompressor CJK token estimation undercounted by 4x" +date: 2026-07-03 +module: core/compressor +component: assistant +tags: + - cjk + - token-estimation + - context-compression + - react-engine + - heuristic +problem_type: logic_error +severity: high +symptoms: + - "estimate_tokens() uses len(content) // 4 (ASCII heuristic), undercounting CJK tokens by ~4x since CJK chars are ~1 char per token" + - "Context compression triggers too late for CJK-heavy conversations, risking context window overflow" + - "_summarize() pre-truncation uses max_chars = max_input_tokens * 4, allowing CJK text to send 4x the token budget to the LLM" + - "ReActEngine._should_compress() fallback inherits the same flawed len // 4 estimation for compressors without should_compress()" +root_cause: logic_error +resolution_type: code_fix +--- + +## Problem + +`ContextCompressor.estimate_tokens()` 使用 `len(content) // 4`(ASCII 启发式,4 字符 ≈ 1 token)估算 token 数。对 CJK(中文/日文/韩文)文本而言,1 个字符 ≈ 1 token,因此该估算器将 CJK token 数低估约 4 倍。这导致 CJK 为主的会话压缩触发过晚,存在上下文窗口溢出风险。`_summarize()` 的预截断 `max_chars = max_input_tokens * 4` 进一步放大了问题——允许 CJK 文本向 LLM 发送 4 倍 token 预算的输入。 + +## Symptoms + +- CJK 为主的会话在远超预期阈值(约 4 倍)后才触发压缩,`should_compress()` 返回 `False` 而实际 token 已超出 `model_context_limit` +- `_summarize()` 可能向 LLM 发送 4 倍 token 预算的 CJK 文本(P1 缺陷——可能触发上下文上限 / 400 错误) +- 中文长会话面临上下文窗口溢出 / 请求失败的高风险 +- 压缩流程依赖递归深度 `_compression_depth`,难以观测与调试,缺乏结构化日志 + +## What Didn't Work + +旧的 `len(content) // 4` 估算器基于 ASCII/拉丁语平均比例(约 4 字符/token)。对纯 CJK 文本(1 字符 ≈ 1 token),该估算产生约 4 倍偏低的估计。例如:4000 个中文字符会被估算为 1000 token,但实际约 4000 token。这导致: + +- `should_compress()` 的 headroom 阈值(`model_context_limit * 0.8`)直到实际 token 已 4 倍超出预期阈值才触发 +- `_summarize()` 中 `max_chars = max_input_tokens * 4` 在预算为 3200 token 时放行 12800 字符(≈12800 CJK token)— 此 P1 缺陷由 ce-code-review 捕获 (session history) +- 旧的递归式 `compress()` 依赖 `_compression_depth` 计数器,流程难以阅读,且 `_compress_aggressive()` 接收已压缩的 `compressed` 列表,存在 summary-of-summary(F-010)风险 + +## Solution + +**1. 新增 CJK 感知的 token 估算器**(`src/agentkit/core/compressor.py`): + +```python +def _is_cjk(char: str) -> bool: + """Check if a character is CJK (1 token ≈ 1 char).""" + cp = ord(char) + return ( + 0x4E00 <= cp <= 0x9FFF # CJK Unified Ideographs + or 0x3040 <= cp <= 0x30FF # Hiragana + Katakana + or 0xAC00 <= cp <= 0xD7AF # Hangul Syllables + ) + + +def estimate_text_tokens(text: str) -> int: + """Estimate token count: CJK 1:1, other characters 4:1.""" + cjk_count = 0 + non_cjk_count = 0 + for char in text: + if _is_cjk(char): + cjk_count += 1 + else: + non_cjk_count += 1 + return cjk_count + non_cjk_count // 4 +``` + +`estimate_tokens()` 改用 `estimate_text_tokens()`: + +```python +def estimate_tokens(self, messages: list[dict]) -> int: + """Estimate total tokens in message list (CJK 1:1, ASCII 4:1)""" + return sum(estimate_text_tokens(str(m.get("content", ""))) for m in messages) +``` + +**2. 修复 `_summarize()` 预截断**(P1 缺陷,由 ce-code-review 捕获): + +```python +# 修改前(CJK 可 4 倍超预算): +max_chars = max_input_tokens * 4 + +# 修改后(CJK 1:1 精确,ASCII 4:1 保守): +max_chars = max_input_tokens +conversation_text = conversation_text[:max_chars] + "\n...[truncated]" +``` + +**3. `ReActEngine._should_compress()` 回退路径同步使用 `estimate_text_tokens`**(`src/agentkit/core/react.py` 约 1750 行): + +```python +# Fallback: fixed threshold for compressors without headroom support +estimated_tokens = sum( + estimate_text_tokens(str(m.get("content", ""))) for m in conversation +) +return estimated_tokens > self._DEFAULT_COMPRESS_THRESHOLD +``` + +**4. 重写 `compress()` 为线性流程**,移除递归式 `_compression_depth`: + +```python +async def compress(self, messages: list[dict]) -> list[dict]: + """Linear flow: summarize -> aggressive -> truncate.""" + tokens_before = self.estimate_tokens(messages) + if tokens_before <= self._max_tokens: + return messages + # ... 分离 system/old/recent ... + # Step 1: summarize + # Step 2: aggressive (F-010: 传入原始 messages 而非 compressed,避免 summary-of-summary) + if self.estimate_tokens(compressed) > self._max_tokens: + compressed = await self._compress_aggressive(messages) + # Step 3: truncate as last resort + if self.estimate_tokens(compressed) > self._max_tokens: + compressed = self._truncate(compressed) + # Step 4: 结构化日志 + self._log_compression(tokens_before, tokens_after, len(messages), len(compressed), strategy) + return compressed +``` + +**5. 新增 `_log_compression()` 结构化日志**: + +```python +logger.info( + "context compressed: %d -> %d tokens (%.1f%%), messages: %d -> %d, strategy: %s", + tokens_before, tokens_after, ratio * 100, + msg_count_before, msg_count_after, strategy, +) +``` + +## Why This Works + +**根本原因**:CJK 字符在主流 tokenizer(BPE/WordPiece/SentencePiece)中近似 1:1 映射为 token,而 ASCII/拉丁文约 4 字符/token。`len(content) // 4` 把这 4 倍差异抹平了,导致 CJK 估算系统性偏低。 + +修复后的 `estimate_text_tokens()` 对 CJK 字符按 1:1 计数、对非 CJK 保留 4:1,既纠正了 CJK 偏差又维持了 ASCII 行为。`_summarize()` 的 `max_chars = max_input_tokens` 对 CJK 精确(1:1)、对 ASCII 保守(截断到 1/4 预算但安全),彻底消除了"4 倍超预算"路径。 + +`headroom_threshold=0.8` 吸收了纯 CJK 仍可能存在的 10-20% 估算偏差(`ponytail:` 注释已标注上限与升级路径——`litellm.token_counter` 或 provider 专用 tokenizer)。 + +**可维护性改进**:线性 `compress()` 流程(summarize → aggressive → truncate)移除了递归深度计数器,单次读取即可理解全部降级路径;`_compress_aggressive()` 接收原始 `messages` 而非已压缩的 `compressed`,规避了 F-010 的 summary-of-summary;`_log_compression()` 提供结构化观测字段(before/after/ratio/msg_count/strategy),使压缩行为可调试、可告警。 + +## Prevention + +- **字符集感知的估算启发式**:任何 token 估算逻辑必须考虑字符集差异。`len // 4` 仅对 ASCII 成立;CJK/emoji/其他多字节脚本需分别处理。涉及多语言输入时,优先使用 `litellm.token_counter` 或 provider 专用 tokenizer(当前 `estimate_text_tokens` 的 `ponytail` 注释已标注此升级路径) +- **修改估算逻辑时审计所有依赖的截断点**:本次修复同步审计了 `estimate_tokens()`、`_summarize()` 预截断、`ReActEngine._should_compress()` 回退路径。`_truncate()` 仍用 `len(content) > self._max_tokens * 4` 判断(OQ21 延期项),后续需同步迁移至字符集感知逻辑 +- **测试覆盖字符集矩阵**:新增测试覆盖纯 CJK、纯 ASCII、CJK+ASCII 混合、平假名/片假名、谚文等场景(`tests/unit/test_context_compressor.py`),验证估算器在各字符集下的正确性 +- **避免 summary-of-summary**:多级压缩时,后续阶段应接收原始输入而非前级压缩产物,防止信息逐级失真(F-010 教训) +- **结构化日志先行**:压缩是黑盒操作,必须输出 before/after token、压缩比、消息数、策略等结构化字段,便于线上问题定位 + +## References + +- Plan: `docs/plans/2026-07-02-003-feat-context-compressor-cjk-prefix-enhancement-plan.md`(3 轮 ce-doc-review,收敛到 U1 + U3) +- PR #21: `feat/context-compressor-cjk`(commits `be45fe4` + `3a05c4d`) +- Upstream context: `docs/plans/2026-06-24-004-feat-long-horizon-reliability-optimization-plan.md`(headroom 压缩引入点) +- Residual: OQ21(`_truncate()` `* 4` 一致性,P2 manual 跟进) -- 2.43.0