feat(compressor): CJK-aware token estimation + linear compress flow #21

Merged
fischer merged 3 commits from feat/context-compressor-cjk into main 2026-07-03 09:40:29 +08:00
5 changed files with 929 additions and 65 deletions

View File

@ -0,0 +1,351 @@
---
date: 2026-07-02
plan_id: "2026-07-02-003"
type: feat
status: draft
title: "Context Compressor CJK Token 估算 + Prefix 对齐 + 压缩简化"
origin: stash@{0} (feat/context-compressor-enhancement, 2026-06-23)
---
# Context Compressor CJK Token 估算 + Prefix 对齐 + 压缩简化
## Summary
当前 `ContextCompressor` 在中文会话场景下存在三个实际问题:
(1) token 估算用 `len(content) // 4`4 字符=1 token但 CJK 字符实际 1 字符 ≈ 1 token导致中文会话 token 被低估 4 倍,压缩触发过晚,可能超出模型 context window
(2) system prompt 混有动态内容时间戳、UUID、session_id破坏 LLM providerAnthropic/OpenAI的 KV cache prefix 稳定性,导致 cache miss → 延迟增加 + 成本上升;
(3) `compress()` 用递归 `_compression_depth` 参数,逻辑复杂且缺乏结构化日志,难以监控压缩行为。
本 plan 基于 stash@{0} 的 WIP 实现,重建为可执行的 implementation plan。
## Problem Frame
**当前状态**main `6826ceb` 上的 `src/agentkit/core/compressor.py`
- `estimate_tokens()`: `len(str(content)) // 4` — 对中文严重低估
- `should_compress()`: 基于 headroom ratio依赖 `estimate_tokens` 的准确性
- `compress()`: 递归 `_compression_depth` 参数,最多 2 层递归后强制 truncate
- `_compress_aggressive()`: 也接受 `_compression_depth` 参数
- 无 prefix 对齐逻辑 — system prompt 中动态行位置每次变化
- 无结构化日志 — 压缩发生时无 tokens_before/after/ratio 记录
**应用点**
- `src/agentkit/core/react.py` `_should_compress()` (line 1705) — 已委托给 `compressor.should_compress()` (line 1716-1718)`// 4` fallback (line 1721) 是 live codeHeadroomCompressor 不实现 `should_compress()`fallback 为其服务)
- `src/agentkit/core/config_driven.py` `_handle_direct()` (line 1066) — stash 新增了 LLM 调用前压缩(`len(rendered_messages) > 6` 触发,但 `rendered_messages` 实际 ≤ 2 条)
**影响**:中文用户的长会话场景下,压缩触发时机过晚,可能触发模型 context limit 错误KV cache 命中率低导致每次请求延迟和成本上升。
## Requirements
- **R1**: CJK 字符(中日韩统一表意文字 + 假名 + 韩文谚文)按 1:1 估算 tokenASCII 仍按 4:1
- **R2**: `estimate_text_tokens()` 作为模块级函数导出,供 `react.py` 等外部调用
- **R3**: ~~`align_prefix()` 将 system prompt 中的动态行时间戳、UUID、session_id、`当前时间`)移到末尾 `[Dynamic Context]` 段,保持前缀稳定~~ **(已取消 — OQ17/OQ18`_build_system_message()` 已覆盖,且动态内容不存在)**
- **R4**: ~~`compress()` 首先调用 `align_prefix()`,提升 KV cache 命中率~~ **(已取消 — 随 U2 取消)**
- **R5**: 移除递归 `_compression_depth` 参数改线性流程compress → aggressive → truncate修正移除 align 步骤U2 已取消)
- **R6**: 新增 `_log_compression()` 输出结构化日志tokens_before/after/ratio/msg_count
- **R7**: `react.py._should_compress()``// 4` fallbackline 1721改用 `estimate_text_tokens()`第三轮修正fallback **不是死代码** — HeadroomCompressor 不实现 `should_compress()`fallback 是 live code。改为用 `estimate_text_tokens()` 替换 `// 4` 而非移除,使 CJK 估算惠及 HeadroomCompressor 用户)
- **R8**: ~~`config_driven.py` 在 LLM 调用前对长 system prompt 执行压缩~~ **(已取消 — 第三轮 F-009compress() 守卫使单条消息压缩无效system prompt 是设计时 artifact**
- **R9**: 测试覆盖 CJK 估算、~~prefix 对齐~~、压缩流程、~~工具结果压缩~~修正prefix 对齐和工具结果压缩随 U2/U4 缩减取消)
> **R9 mapping**: R9 是横切需求,不映射到单一 Unit。由 U1CJK 估算测试)、~~U2TestAlignPrefix~~已取消、U3压缩流程测试、~~U4test_config_driven.py 压缩测试)~~(已取消)的 Test scenarios 与 Verification 共同满足。
## Key Technical Decisions
### KTD1: CJK 估算用启发式,不引入 tiktoken
**决策**:用 `_is_cjk(char)` + `estimate_text_tokens(text)` 启发式CJK 1:1ASCII 4:1不引入 tiktoken 依赖,也不使用 `litellm.token_counter`
**理由**(修正 OQ3原"项目无 tiktoken 依赖"前提虚假 — `litellm>=1.50` 已是直接依赖 `pyproject.toml:30`,提供 `litellm.token_counter(model, messages)` API
- ~~项目无 tiktoken 依赖(`pyproject.toml` 未列出),引入会增加安装负担~~ **修正**`litellm>=1.50` 已是直接依赖,但 `litellm.token_counter` 需要 model 参数且需加载 tokenizer开销大于纯字符遍历第三轮修正litellm.token_counter 是纯本地函数,不调用网络 API
- 启发式对触发时机的判断足够准确(压缩阈值有 headroom 缓冲)
- 符合 ponytail 原则:用最小可行方案,避免在热路径上引入 litellm 调用开销
- ponytail ceiling: 启发式对纯 CJK 文本可能仍低估 ~10-20%,但 headroom_threshold=0.8 的缓冲足以吸收;升级路径是引入 `litellm.token_counter` 或 provider 特定的 tokenizer
### KTD2: ~~Prefix 对齐策略 — 动态行移到末尾~~ **(已取消 — OQ17/OQ18**
~~**决策**`align_prefix()` 识别 system prompt 中的动态行(匹配时间戳/UUID/session_id/`当前时间` 模式),将其移到 system prompt 末尾的 `[Dynamic Context]` 段落。~~
~~**理由**~~
- ~~LLM provider 的 KV cache 基于 prefix 匹配,前缀越稳定 cache 命中率越高~~
- ~~动态内容(时间戳等)每次请求都变,放在前缀中会破坏 cache~~
- ~~移到末尾后静态部分agent identity、技能说明等构成稳定前缀~~
> **取消原因**`_build_system_message()` 已实现 stable/volatile 分离system prompt 无动态内容。详见 U2 取消说明。
### KTD3: 压缩逻辑改线性,移除递归
**决策**`compress()` 改为线性流程(~~align →~~ compress → aggressive → truncate移除 `_compression_depth` 递归参数。
**理由**(修正 OQ20原"递归难以追踪"理由薄弱 — 递归实际是 pseudo-linear最多 2 层后强制 truncate
- ~~递归逻辑难以追踪和调试~~ **修正**:递归虽是 pseudo-linearmax 2 层),但线性流程仍有日志可读性收益
- 线性流程更易添加结构化日志(`_log_compression()` 在每步插入日志点)
- stash 已验证线性流程等价覆盖原递归的所有路径
### KTD4: ~~config_driven.py 压缩触发用消息条数~~ **(已取消 — 随 U4 取消)**
~~**决策**`config_driven.py` 用 `len(rendered_messages) > 6` 作为压缩触发条件(消息条数,非 token 估算)。~~
~~**重新决策**`config_driven.py` 用 `estimate_text_tokens(system_prompt) > max_tokens * 0.8` 作为压缩触发条件token 估算)。~~
> **取消原因**U4 已在第三轮取消F-009 — compress() 守卫使单条消息压缩无效。KTD4 随之失效。
## Implementation Units
### U1. CJK-aware token 估算
**Goal**: 新增 `estimate_text_tokens()` 模块级函数CJK 1:1 / ASCII 4:1 估算;`react.py` 改用此函数。
**Requirements**: R1, R2, R7
**Dependencies**: 无
**Files**:
- `src/agentkit/core/compressor.py` — 新增 `_is_cjk()` + `estimate_text_tokens()` 模块级函数;`estimate_tokens()` 方法内部改用 `estimate_text_tokens`**`_summarize()` line 152 的内联 `// 4` 也改用 `estimate_text_tokens()`(第三轮 NEW-F10避免 CJK 摘要路径仍低估 4 倍导致 context overflow**
- `src/agentkit/core/react.py``_should_compress()``// 4` fallbackline 1721改用 `estimate_text_tokens()`**不移除** — HeadroomCompressor 不实现 `should_compress()`fallback 是 live code
- `tests/unit/test_context_compressor.py` — 新增 CJK 估算测试
**Approach**:
- `_is_cjk(char)`: 检查字符是否在 CJK 统一表意文字U+4E00-U+9FFF、假名U+3040-U+30FF、韩文谚文U+AC00-U+D7AF范围内
- `estimate_text_tokens(text)`: 遍历字符CJK 计 1 token其他按 4:1 估算
- `estimate_tokens(messages)`: 内部调用 `estimate_text_tokens(str(content))` 求和
- `_summarize()` line 152: `estimated_tokens = len(conversation_text) // 4` 改为 `estimate_text_tokens(conversation_text)`(第三轮 NEW-F10
- `react.py._should_compress()`: 已委托给 `compressor.should_compress()`line 1716-1718`// 4` fallbackline 1721**不是死代码** — HeadroomCompressor 不实现 `should_compress()`fallback 是 live code。改为用 `estimate_text_tokens()` 替换 `// 4`,使 CJK 估算惠及 HeadroomCompressor 用户
**Patterns to follow**: 现有 `estimate_tokens()` 方法的签名和返回值约定
**Test scenarios**:
- 纯 CJK 文本:`estimate_text_tokens("你好世界")` == 41:1
- 纯 ASCII`estimate_text_tokens("hello world")` == 211 字符 / 4 = 2.75,向下取整)
- 混合文本CJK 部分 1:1 + ASCII 部分 4:1
- 日文假名:`estimate_text_tokens("こんにちは")` == 5
- 韩文谚文:`estimate_text_tokens("안녕하세요")` == 5
- `estimate_tokens()` 对包含 CJK 的消息列表给出合理估值(不再低估 4 倍)
- `react.py._should_compress()` 对中文长会话正确触发(通过 `compressor.should_compress()` 委托)
- `_summarize()` 对 CJK 文本的预截断正确触发(第三轮 NEW-F10line 152 改用 `estimate_text_tokens`CJK 摘要路径不再低估 4 倍)
**Verification**: `pytest tests/unit/test_context_compressor.py -k "cjk or mixed or kana or hangul" -v` 通过
---
### U2. Prefix 对齐 ~~(已取消)~~
**~Goal~**: ~~新增 `align_prefix()` 方法,将 system prompt 中的动态行移到末尾,保持前缀稳定,提升 KV cache 命中率。~~
> **取消原因ce-doc-review 复核第二轮 OQ17/OQ18**
> 1. `react.py:1511-1561``_build_system_message(stable, volatile)` 已实现 stable/volatile 双块分离Anthropic 用 `cache_control: {"type": "ephemeral"}`,非 Anthropic 用字符串拼接保持 stable 前缀),在 compress 调用前line 670-674已执行。U2 提议的 `align_prefix()` 与此高度重叠。
> 2. grep `当前时间`/`session_id`/timestamp 模式在 system prompt 构造点(`react.py`、`config_driven.py`、`prompts/template.py`)零匹配 — `PromptTemplate.render()` 只做 `${var}` 替换,不注入时间戳/UUID/session_id。U2 要对齐的动态内容在当前代码库中不存在。
**Requirements**: ~~R3, R4~~ (已随 U2 取消)
**Dependencies**: ~~U1同在 compressor.py但逻辑独立~~
**Files**:
- `src/agentkit/core/compressor.py` — 新增 `align_prefix(messages)` 方法
- `tests/unit/test_context_compressor.py` — 新增 `TestAlignPrefix` 测试类
**Approach**:
- `align_prefix(messages)`: 遍历 messages`role == "system"` 的消息:
- 识别动态行:匹配 `当前时间`、`2026-07-02` 格式时间戳、UUID`[0-9a-f]{8}-...`)、`session_id` 等模式
- 将动态行从原位置移除,追加到 content 末尾的 `[Dynamic Context]` 段落
- 静态行保持原序构成稳定前缀
- `compress()` 在执行压缩前首先调用 `align_prefix()`
**Patterns to follow**: 现有 `compress()` 中 system message 处理逻辑
**Test scenarios**:
- 时间戳行移到末尾:含 `当前时间: 2026-07-02` 的 system prompt对齐后该行在 `[Dynamic Context]`
- UUID 行移到末尾:含 `session_id: abc12345-...` 的 system prompt对齐后该行在 `[Dynamic Context]`
- 静态 system prompt 不变:无动态行的 system prompt`align_prefix()` 后内容不变
- 多条 system message每条都正确对齐
- 非 system message 不受影响user/assistant 消息不变
**Verification**: `pytest tests/unit/test_context_compressor.py::TestAlignPrefix -v` 通过
---
### U3. 压缩逻辑简化 + 结构化日志
**Goal**: 重写 `compress()` 为线性流程,移除递归 `_compression_depth`;新增 `_log_compression()` 结构化日志。
**Requirements**: R5, R6
**Dependencies**: ~~U2`compress()` 调用 `align_prefix()`~~ **U2 已取消U3 独立)**
**Files**:
- `src/agentkit/core/compressor.py` — 重写 `compress()`,新增 `_log_compression()`,重写 `_compress_aggressive()` 移除递归参数
- `tests/unit/test_context_compressor.py` — 新增压缩流程测试
**Approach**:
- `compress(messages)` 线性流程:
1. ~~`align_prefix(messages)` — 对齐 prefix~~ **U2 已取消,移除此步骤)**
2. 检查 token 量,若未超阈值直接返回
3. 分离 system/old/recent`_summarize(old)` 生成摘要
4. 若仍超阈值,调用 `_compress_aggressive(**original messages**)`**第三轮修正 F-010**:必须传入 original `messages` 列表(非已压缩的 `compressed`),避免 summary-of-summary 行为变更
5. 若仍超阈值,`_truncate()`
6. `_log_compression()` 记录压缩结果
- `_log_compression(tokens_before, tokens_after, msg_count, strategy)` — 输出 `INFO` 级日志,包含压缩比
- `_compress_aggressive(messages)` — 移除 `_compression_depth` 参数,改为只保留最后 1 条消息 + 摘要
**Patterns to follow**: 现有 `compress()` 的 system/old/recent 分离逻辑
**Test scenarios**:
- 短消息不压缩:`estimate_tokens <= max_tokens` 时返回原消息
- 长消息触发压缩:超过阈值时生成摘要 + 保留 recent
- 压缩后仍超阈值 → aggressive 压缩:只保留最后 1 条 + 摘要
- aggressive 后仍超阈值 → truncate强制截断
- `_log_compression` 输出结构化日志(可通过 `caplog` 验证)
- `compress()` 不再接受 `_compression_depth` 参数(签名变更)
- `_compress_aggressive` 接收 original `messages`(非已压缩的 `compressed`),避免 summary-of-summary第三轮 F-010
**Verification**: `pytest tests/unit/test_context_compressor.py -k "compress or aggressive or truncate" -v` 通过
---
### U4. ~~config_driven.py LLM 调用前压缩~~ ~~+ 工具结果压缩辅助~~ **(已取消 — 第三轮 F-009**
~~**Goal**: `config_driven.py` 在 LLM 调用前对长 system prompt 执行压缩。~~
> **取消原因ce-doc-review 复核第三轮 F-009/NEW-F9**
> 1. `ContextCompressor.compress()` 有守卫 `if len(non_system) <= keep_recent: return messages`compressor.py:98-99。对单条 system 消息,`non_system` 为空列表len=0 ≤ 默认 keep_recent=3compress() 立即返回不压缩。U4 的核心机制根本不会触发。
> 2. `rendered_messages` 来自 `PromptTemplate.render()`,总是 ≤ 2 条1 system + 1 user即使压缩完整列表也无法触发 compress()。
> 3. system prompt 是 agent 设计者精心构造的设计时 artifact运行时 LLM 摘要会降低指令质量。超长 system prompt 应在设计时修正,而非运行时压缩。
>
> **历史**第二轮已取消工具结果压缩辅助OQ19 — HeadroomCompressor 已覆盖)。第三轮取消整个 U4F-009 — compress() 守卫使 U4 无效)。
**Approach**:
- `config_driven.py`: 在 `_handle_direct()`LLM 调用前检查 `estimate_text_tokens(system_prompt)` 是否超过阈值(如 `max_tokens * 0.8`),若是则调用 `compressor.compress([{"role": "system", "content": system_prompt}])`,异常时 `logger.warning` 并继续
- ~~`compressor.py` 工具结果压缩~~(已取消 — 委托 `HeadroomCompressor`
> **触发条件重新设计OQ2/OQ5/OQ16**:原 `len(rendered_messages) > 6` 永远不可能为 true`PromptTemplate.render()` 返回 ≤ 2 条消息)。改为基于 token 估算的单条 system prompt 压缩检查。
**Patterns to follow**: 现有 `config_driven.py``self._compressor` 的使用模式
**Test scenarios**:
- `config_driven.py` 直接模式下长 system prompttoken 超阈值)触发压缩
- `config_driven.py` 直接模式下短 system prompt 不压缩
- 压缩失败时 warning 且不阻断执行
**Verification**: `pytest tests/unit/test_config_driven.py -k "compress" -v` 通过
## Scope Boundaries
### In Scope
- `src/agentkit/core/compressor.py` 的 CJK 估算 + ~~prefix 对齐~~(已取消)+ 压缩简化 + 日志 + ~~工具结果压缩辅助~~(已取消)
- `src/agentkit/core/react.py``_should_compress` fallback `// 4` 改用 `estimate_text_tokens()`fallback 是 live code服务 HeadroomCompressor
- ~~`src/agentkit/core/config_driven.py` 的 LLM 调用前压缩(基于 token 估算,非消息条数)~~ **(已取消 — U4 取消)**
- `tests/unit/test_context_compressor.py` 测试覆盖
- ~~`tests/unit/test_config_driven.py` 压缩测试~~ **(已取消 — U4 取消)**
### Out of Scope
- stash 中混入的无关改动:`portal.py`、`chat.ts`、`index.html`、`tauri.conf.json`、`skills/base.py`、`skills-lock.json`、`test_portal_routes.py`、`test_execution_modes.py`
- `test_compression_strategy.py` 的小幅调整(如签名变更导致的 fixture 更新,随 U3 自然处理)
- 引入 tiktoken 或 provider 特定 tokenizer见 KTD1
- 跨会话的持久化压缩状态
### Deferred to Follow-Up Work
- 压缩比的运行时监控/告警(需接入 metrics 系统)
- 基于 provider 的 tokenizer 精确估算KTD1 的升级路径)
## Risks & Dependencies
- **风险 1**: ~~`align_prefix()` 的动态行识别模式可能遗漏某些动态内容格式~~ **(已取消 — U2 取消)**
- **风险 2**`compress()` 签名移除 `_compression_depth` 参数可能破坏外部调用 → 缓解:`_compression_depth` 是内部参数(下划线前缀),无外部调用者
- **风险 3**: ~~`config_driven.py` 压缩异常时不应阻断主流程~~ **(已取消 — U4 取消)**
- **依赖**~~U2 依赖 U1同文件U3 依赖 U2`compress` 调用 `align_prefix`U4 依赖 U1+U3~~ **修正**U2 已取消U3 独立(无依赖);~~U4 依赖 U1+U3~~ **U4 已取消**
## Acceptance Examples
- **AE1**: 中文长会话100 条 CJK 消息)的 `estimate_tokens` 返回值 ≥ 旧实现的 4 倍(修正低估)
- **AE2**: ~~含时间戳的 system prompt 经 `align_prefix()` 后,时间戳行在 `[Dynamic Context]` 段,静态行位置不变~~ **(已取消 — U2 取消)**
- **AE3**: `compress()` 不再接受 `_compression_depth` 参数,对超长消息线性执行 compress → aggressive → truncate修正移除 align 步骤U2 已取消)
- **AE4**: `react.py._should_compress()` 对中文会话在合理时机触发(第三轮修正:`// 4` fallback 是 live code 服务 HeadroomCompressor改为用 `estimate_text_tokens()` 替换 `// 4` 而非移除)
- **AE5**: ~~`config_driven.py` 直接模式下长 system prompttoken 超阈值)时自动调用压缩,压缩失败不阻断~~ **(已取消 — U4 取消)**
## Open Questions
> 以下 findings 来自 ce-doc-reviewcoherence / feasibility / adversarial 三 persona 审查于实现阶段处理。4 个 safe_auto fixes 已应用U4 Goal 补 `_build_sampled_output`、R9 横切声明、R8 变量名修正、OQ14 ASCII 估算值修正)。
>
> **复核第二轮决策结果(自动用最佳判断处理)**
> - **已解决16 项)**OQ1/OQ4/OQ15函数名修正为 `_should_compress`、OQ2/OQ5/OQ16触发条件改为 token 估算、OQ3/OQ8KTD1 理由修正、OQ6/OQ7/OQ9/OQ12/OQ17/OQ18U2 取消premise 证伪、OQ11/OQ19U4 工具结果压缩辅助取消、OQ14safe_auto、OQ20KTD3 理由修正,维持 U3
> - **仍开放4 项)**OQ10aggressive 压缩质量退化、OQ13CJK 1:1 模型差异FYI
>
> **复核第三轮决策结果(自动用最佳判断处理)**
> - **已应用5 项)**
> - NEW-F13safe_autoKTD1 修正 — `litellm.token_counter` 是纯本地函数,不调用网络 API
> - F-008P0 反转):`// 4` fallback 是 live codeHeadroomCompressor 不实现 `should_compress()`)→ R7 改为"用 `estimate_text_tokens()` 替换 `// 4`"而非"移除死代码"
> - F-009P0U4 全部取消 — `compress()` 守卫使单条 system 消息压缩无效R8/KTD4/AE5/Scope Boundaries 同步取消
> - F-010gated_autoU3 Approach step 4 明确 `_compress_aggressive` 接收 original `messages`(非已压缩的 compressed
> - NEW-F10gated_autoU1 scope 扩展 — `_summarize()` line 152 的内联 `// 4` 也改用 `estimate_text_tokens()`
> - **仍开放3 项)**OQ10aggressive 压缩质量退化、OQ13CJK 1:1 模型差异FYI、OQ21`_truncate()` `* 4` 一致性P2 manual
### P0 — 阻断性(实现前必须解决)
**OQ1** (feasibility, P0): U1/R7 引用的 `react.py._needs_incremental_compression()` **不存在**。实际函数是 `_should_compress()`line 1705已委托给 `compressor.should_compress()`line 1716-1718`// 4` fallbackline 1721是死代码。
- **需决策**: 修改 R7/U1 改为 `_should_compress()` 并移除死代码 fallback还是从 scope 移除 R7react.py 已委托给 compressorU1 改 compressor 即可覆盖)?
**OQ2** (feasibility, P0): U4/R8 引用的 `config_driven.py._execute_direct()` **不存在**。实际方法是 `_handle_direct()`line 1066。且 `rendered_messages` 来自 `PromptTemplate.render()`,总是 ≤ 2 条1 system + 1 user`len(rendered_messages) > 6` **永远不可能为 true**
- **需决策**: U4 的压缩触发条件需重新设计。是在 `_handle_direct()` 内对单条 system prompt 做基于 token 的压缩?还是压缩逻辑应放在其他位置(如 `PromptTemplate.render()` 之前)?
### P1 — 重要(实现时需处理)
**OQ3** (adversarial, P1): KTD1 前提虚假。声称"项目无 tiktoken 依赖",但 `litellm>=1.50` 已是直接依赖(`pyproject.toml` line 30提供 `litellm.token_counter(model, messages)` API 可精确估算 token。
- **需决策**: 是否改用 `litellm.token_counter` 替代启发式?或维持启发式但修正 KTD1 理由(理由改为"避免 litellm 调用开销/兼容性",而非"无依赖"
**OQ4** (adversarial, P1): R7 引用不存在的函数(同 OQ1
**OQ5** (adversarial, P1): KTD4 错误指标 — `len(rendered_messages) > 6` 永远不可能为 true同 OQ2
**OQ6** (feasibility, P1): KV cache benefit 只在 `compress()` 触发时实现。大多数请求不触发 compress因为未超阈值此时 `align_prefix()` 不会被调用 → prefix 对齐的实际收益被高估。
- **需决策**: 是否将 `align_prefix()` 提前到每次请求都执行(独立于 compress还是接受"只在 compress 时对齐"的有限收益?
**OQ7** (feasibility, P1): 动态内容 premise 未在代码库中验证。Plan 假设 system prompt 含时间戳/UUID/session_id但需确认这些动态行是否真的存在于当前 system prompt 构造逻辑中。
- **需决策**: 实现阶段先 grep 验证 system prompt 构造点,确认动态内容确实存在再实现 U2。
### P2 — 值得修复(实现时酌情处理)
**OQ8** (feasibility, P2): `align_prefix()` 幂等性。多次调用 `align_prefix()` 不应改变结果(避免 `[Dynamic Context]` 段重复追加)。需在测试场景中加幂等性测试。
**OQ9** (adversarial, P2): `align_prefix()` 正则误判风险。用户消息中可能含时间戳格式文本(如"会议定于 2026-07-02"),被误移到末尾。需限定只对 system message 的特定行(如"当前时间:"前缀)匹配。
**OQ10** (adversarial, P2): aggressive 压缩质量退化。`_compress_aggressive()` 只保留最后 1 条 + 摘要,可能丢失关键上下文。需评估是否保留更多 recent 消息(如最后 2-3 条)。
**OQ11** (adversarial, P2): U4 范围蔓延。U4 同时包含 config_driven 压缩 + 4 个工具结果压缩辅助方法scope 较大。考虑是否拆分为 U4aconfig_driven 压缩)+ U4b工具结果压缩辅助
**OQ12** (adversarial, P2): 动态内容前提未验证(同 OQ7adversarial 视角)。
### P3 — FYI知情即可
**OQ13** (adversarial, P3): CJK 1:1 估算的模型差异。不同模型GPT-4/Claude/Gemini对 CJK 的实际 token 比例略有差异0.8-1.2 之间1:1 是平均值。headroom_threshold=0.8 的缓冲可吸收此差异。
### Coherence — 结构性gated_auto/manual
**OQ14** (coherence, ~~gated_auto~~ safe_auto, **已解决**): U1 Test scenarios 中 `estimate_text_tokens("hello world")` == 2 或 3 不确定 — 应明确 ASCII 11 字符 / 4 = 2.75,向下取整为 2。**已修正为 == 211 字符 / 4 = 2.75,向下取整)**。
**OQ15** (coherence, manual): AE4 仍引用 `_needs_incremental_compression()`(同 OQ1需随 OQ1 决策同步修正。
**OQ16** (coherence, manual): AE5 仍写">6 条消息触发",但 `rendered_messages` ≤ 2同 OQ2需随 OQ2 决策同步修正。
### 复核第二轮新增 — P0阻断性
**OQ17** (feasibility+adversarial, P0, **新**): U2 与现有 `_build_system_message()` 功能重复。`react.py:1511-1561` 已实现 stable/volatile 双块分离Anthropic 用 `cache_control: {"type": "ephemeral"}`,非 Anthropic 用字符串拼接保持 stable 前缀),在 compress 调用前line 670-674已执行。U2 提议的 `align_prefix()` 与此高度重叠。
- **需决策**: 取消 U2还是将 U2 改写为 spike调研 `_build_system_message()` 是否已覆盖所有动态内容场景,若覆盖则删除 U2
**OQ18** (feasibility+adversarial, P0, **新**): U2 核心前提被证伪。grep `当前时间`/`session_id`/timestamp 模式在 system prompt 构造点(`react.py`、`config_driven.py`、`prompts/template.py`)零匹配 — `PromptTemplate.render()` 只做 `${var}` 替换,不注入时间戳/UUID/session_id。U2 要对齐的动态内容在当前代码库中不存在。
- **需决策**: 取消 U2或保留 U2 作为防御性设计(未来可能添加动态内容)?
**OQ19** (feasibility+adversarial, P0, **新**): U4 工具结果压缩辅助与 `HeadroomCompressor.compress_tool_result()` 重叠。`headroom_compressor.py:126-157` 已实现成熟版本content type 检测、SmartCrusher/CodeCompressor 路由、CCR hash 存储、`min_length=500` 阈值、异常 fallback。U4 提议的 `_compress_json_array`/`_compress_json_object`/`_compress_text`/`_build_sampled_output` 会与之竞争或重复造轮子。且 `compressor.py:237-239``compress_tool_result` 已是 no-op。
- **需决策**: 取消 U4 工具结果压缩辅助部分?还是改为委托 `HeadroomCompressor`
### 复核第二轮新增 — P1重要
**OQ20** (adversarial, P1, **新**): KTD3 前提薄弱。`compress()` 的递归 `_compression_depth` 实际是 pseudo-linear最多 2 层递归后强制 truncateline 118-129并非真正的复杂递归。"递归难以追踪"的理由不够充分,线性化的收益被高估。
- **需决策**: 维持 KTD3线性化仍有日志可读性收益还是删除 U3 的线性化部分,仅保留 `_log_compression()`
### 复核第三轮新增 — P2值得修复
**OQ21** (adversarial, P2, **新**): `_truncate()``* 4` 字符假设与 U1 的 CJK 估算逻辑不一致。`compressor.py:232-233` 中 `_truncate()``target_tokens * 4` 计算字符数(假设 4 字符=1 token对 CJK 文本会截断过多CJK 1 字符 ≈ 1 token`* 4` 计算的字符数只够 1/4 的 CJK token
- **当前状态**: U1 scope 已扩展到 `estimate_tokens()` 方法 + `_summarize()` line 152`_truncate()``* 4` 未纳入。
- **需决策**: 实现阶段评估是否将 `_truncate()` 也改用 CJK-aware 估算(基于 `estimate_text_tokens` 反推字符数),或保留 `* 4` 作为 truncate 路径的保守下界(截断过多总比超出 context window 安全)。第三轮建议:**保留 `* 4` 作为 manual 跟进**,因为它在 truncate 兜底路径上,保守截断是安全的;纳入 U1 scope 会让 U1 边界扩张到 truncate 路径,与"压缩触发时机"的核心目标偏离。

View File

@ -0,0 +1,152 @@
---
title: "ContextCompressor CJK token estimation undercounted by 4x"
date: 2026-07-03
module: core/compressor
component: assistant
tags:
- cjk
- token-estimation
- context-compression
- react-engine
- heuristic
problem_type: logic_error
severity: high
symptoms:
- "estimate_tokens() uses len(content) // 4 (ASCII heuristic), undercounting CJK tokens by ~4x since CJK chars are ~1 char per token"
- "Context compression triggers too late for CJK-heavy conversations, risking context window overflow"
- "_summarize() pre-truncation uses max_chars = max_input_tokens * 4, allowing CJK text to send 4x the token budget to the LLM"
- "ReActEngine._should_compress() fallback inherits the same flawed len // 4 estimation for compressors without should_compress()"
root_cause: logic_error
resolution_type: code_fix
---
## Problem
`ContextCompressor.estimate_tokens()` 使用 `len(content) // 4`ASCII 启发式4 字符 ≈ 1 token估算 token 数。对 CJK中文/日文/韩文文本而言1 个字符 ≈ 1 token因此该估算器将 CJK token 数低估约 4 倍。这导致 CJK 为主的会话压缩触发过晚,存在上下文窗口溢出风险。`_summarize()` 的预截断 `max_chars = max_input_tokens * 4` 进一步放大了问题——允许 CJK 文本向 LLM 发送 4 倍 token 预算的输入。
## Symptoms
- CJK 为主的会话在远超预期阈值(约 4 倍)后才触发压缩,`should_compress()` 返回 `False` 而实际 token 已超出 `model_context_limit`
- `_summarize()` 可能向 LLM 发送 4 倍 token 预算的 CJK 文本P1 缺陷——可能触发上下文上限 / 400 错误)
- 中文长会话面临上下文窗口溢出 / 请求失败的高风险
- 压缩流程依赖递归深度 `_compression_depth`,难以观测与调试,缺乏结构化日志
## What Didn't Work
旧的 `len(content) // 4` 估算器基于 ASCII/拉丁语平均比例(约 4 字符/token。对纯 CJK 文本1 字符 ≈ 1 token该估算产生约 4 倍偏低的估计。例如4000 个中文字符会被估算为 1000 token但实际约 4000 token。这导致
- `should_compress()` 的 headroom 阈值(`model_context_limit * 0.8`)直到实际 token 已 4 倍超出预期阈值才触发
- `_summarize()``max_chars = max_input_tokens * 4` 在预算为 3200 token 时放行 12800 字符≈12800 CJK token— 此 P1 缺陷由 ce-code-review 捕获 (session history)
- 旧的递归式 `compress()` 依赖 `_compression_depth` 计数器,流程难以阅读,且 `_compress_aggressive()` 接收已压缩的 `compressed` 列表,存在 summary-of-summaryF-010风险
## Solution
**1. 新增 CJK 感知的 token 估算器**`src/agentkit/core/compressor.py`
```python
def _is_cjk(char: str) -> bool:
"""Check if a character is CJK (1 token ≈ 1 char)."""
cp = ord(char)
return (
0x4E00 <= cp <= 0x9FFF # CJK Unified Ideographs
or 0x3040 <= cp <= 0x30FF # Hiragana + Katakana
or 0xAC00 <= cp <= 0xD7AF # Hangul Syllables
)
def estimate_text_tokens(text: str) -> int:
"""Estimate token count: CJK 1:1, other characters 4:1."""
cjk_count = 0
non_cjk_count = 0
for char in text:
if _is_cjk(char):
cjk_count += 1
else:
non_cjk_count += 1
return cjk_count + non_cjk_count // 4
```
`estimate_tokens()` 改用 `estimate_text_tokens()`
```python
def estimate_tokens(self, messages: list[dict]) -> int:
"""Estimate total tokens in message list (CJK 1:1, ASCII 4:1)"""
return sum(estimate_text_tokens(str(m.get("content", ""))) for m in messages)
```
**2. 修复 `_summarize()` 预截断**P1 缺陷,由 ce-code-review 捕获):
```python
# 修改前CJK 可 4 倍超预算):
max_chars = max_input_tokens * 4
# 修改后CJK 1:1 精确ASCII 4:1 保守):
max_chars = max_input_tokens
conversation_text = conversation_text[:max_chars] + "\n...[truncated]"
```
**3. `ReActEngine._should_compress()` 回退路径同步使用 `estimate_text_tokens`**`src/agentkit/core/react.py` 约 1750 行):
```python
# Fallback: fixed threshold for compressors without headroom support
estimated_tokens = sum(
estimate_text_tokens(str(m.get("content", ""))) for m in conversation
)
return estimated_tokens > self._DEFAULT_COMPRESS_THRESHOLD
```
**4. 重写 `compress()` 为线性流程**,移除递归式 `_compression_depth`
```python
async def compress(self, messages: list[dict]) -> list[dict]:
"""Linear flow: summarize -> aggressive -> truncate."""
tokens_before = self.estimate_tokens(messages)
if tokens_before <= self._max_tokens:
return messages
# ... 分离 system/old/recent ...
# Step 1: summarize
# Step 2: aggressive (F-010: 传入原始 messages 而非 compressed避免 summary-of-summary)
if self.estimate_tokens(compressed) > self._max_tokens:
compressed = await self._compress_aggressive(messages)
# Step 3: truncate as last resort
if self.estimate_tokens(compressed) > self._max_tokens:
compressed = self._truncate(compressed)
# Step 4: 结构化日志
self._log_compression(tokens_before, tokens_after, len(messages), len(compressed), strategy)
return compressed
```
**5. 新增 `_log_compression()` 结构化日志**
```python
logger.info(
"context compressed: %d -> %d tokens (%.1f%%), messages: %d -> %d, strategy: %s",
tokens_before, tokens_after, ratio * 100,
msg_count_before, msg_count_after, strategy,
)
```
## Why This Works
**根本原因**CJK 字符在主流 tokenizerBPE/WordPiece/SentencePiece中近似 1:1 映射为 token而 ASCII/拉丁文约 4 字符/token。`len(content) // 4` 把这 4 倍差异抹平了,导致 CJK 估算系统性偏低。
修复后的 `estimate_text_tokens()` 对 CJK 字符按 1:1 计数、对非 CJK 保留 4:1既纠正了 CJK 偏差又维持了 ASCII 行为。`_summarize()` 的 `max_chars = max_input_tokens` 对 CJK 精确1:1、对 ASCII 保守(截断到 1/4 预算但安全),彻底消除了"4 倍超预算"路径。
`headroom_threshold=0.8` 吸收了纯 CJK 仍可能存在的 10-20% 估算偏差(`ponytail:` 注释已标注上限与升级路径——`litellm.token_counter` 或 provider 专用 tokenizer
**可维护性改进**:线性 `compress()` 流程summarize → aggressive → truncate移除了递归深度计数器单次读取即可理解全部降级路径`_compress_aggressive()` 接收原始 `messages` 而非已压缩的 `compressed`,规避了 F-010 的 summary-of-summary`_log_compression()` 提供结构化观测字段before/after/ratio/msg_count/strategy使压缩行为可调试、可告警。
## Prevention
- **字符集感知的估算启发式**:任何 token 估算逻辑必须考虑字符集差异。`len // 4` 仅对 ASCII 成立CJK/emoji/其他多字节脚本需分别处理。涉及多语言输入时,优先使用 `litellm.token_counter` 或 provider 专用 tokenizer当前 `estimate_text_tokens``ponytail` 注释已标注此升级路径)
- **修改估算逻辑时审计所有依赖的截断点**:本次修复同步审计了 `estimate_tokens()`、`_summarize()` 预截断、`ReActEngine._should_compress()` 回退路径。`_truncate()` 仍用 `len(content) > self._max_tokens * 4` 判断OQ21 延期项),后续需同步迁移至字符集感知逻辑
- **测试覆盖字符集矩阵**:新增测试覆盖纯 CJK、纯 ASCII、CJK+ASCII 混合、平假名/片假名、谚文等场景(`tests/unit/test_context_compressor.py`),验证估算器在各字符集下的正确性
- **避免 summary-of-summary**多级压缩时后续阶段应接收原始输入而非前级压缩产物防止信息逐级失真F-010 教训)
- **结构化日志先行**:压缩是黑盒操作,必须输出 before/after token、压缩比、消息数、策略等结构化字段便于线上问题定位
## References
- Plan: `docs/plans/2026-07-02-003-feat-context-compressor-cjk-prefix-enhancement-plan.md`3 轮 ce-doc-review收敛到 U1 + U3
- PR #21: `feat/context-compressor-cjk`commits `be45fe4` + `3a05c4d`
- Upstream context: `docs/plans/2026-06-24-004-feat-long-horizon-reliability-optimization-plan.md`headroom 压缩引入点)
- Residual: OQ21`_truncate()` `* 4` 一致性P2 manual 跟进)

View File

@ -12,6 +12,40 @@ from typing import Any, Protocol, runtime_checkable
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def _is_cjk(char: str) -> bool:
"""Check if a character is CJK (1 token ≈ 1 char).
Covers CJK Unified Ideographs, Hiragana, Katakana, and Hangul Syllables.
"""
cp = ord(char)
return (
0x4E00 <= cp <= 0x9FFF # CJK Unified Ideographs
or 0x3040 <= cp <= 0x30FF # Hiragana + Katakana
or 0xAC00 <= cp <= 0xD7AF # Hangul Syllables
)
def estimate_text_tokens(text: str) -> int:
"""Estimate token count: CJK 1:1, other characters 4:1.
CJK characters typically tokenize to ~1 token per character, while
ASCII/Latin text averages ~4 chars per token. Avoids the 4x
underestimation that ``len(text) // 4`` produces for CJK conversations.
ponytail ceiling: pure CJK may still underestimate ~10-20%, but
headroom_threshold=0.8 absorbs this. Upgrade path: litellm.token_counter
or provider-specific tokenizer.
"""
cjk_count = 0
non_cjk_count = 0
for char in text:
if _is_cjk(char):
cjk_count += 1
else:
non_cjk_count += 1
return cjk_count + non_cjk_count // 4
@runtime_checkable @runtime_checkable
class CompressionStrategy(Protocol): class CompressionStrategy(Protocol):
"""压缩策略协议 — 所有压缩器必须实现此接口""" """压缩策略协议 — 所有压缩器必须实现此接口"""
@ -73,22 +107,17 @@ class ContextCompressor:
return False return False
def estimate_tokens(self, messages: list[dict]) -> int: def estimate_tokens(self, messages: list[dict]) -> int:
"""Estimate total tokens in message list (rough: 4 chars = 1 token)""" """Estimate total tokens in message list (CJK 1:1, ASCII 4:1)"""
total = 0 return sum(estimate_text_tokens(str(m.get("content", ""))) for m in messages)
for msg in messages:
content = msg.get("content", "")
total += len(str(content)) // 4
return total
async def compress(self, messages: list[dict], _compression_depth: int = 0) -> list[dict]: async def compress(self, messages: list[dict]) -> list[dict]:
"""Compress messages if they exceed token budget """Compress messages if they exceed token budget.
Strategy: Linear flow: summarize -> aggressive -> truncate.
1. Keep system messages unchanged Each step only fires if the previous didn't bring tokens under budget.
2. Keep the most recent N messages unchanged
3. Compress older messages into a summary using LLM
""" """
if self.estimate_tokens(messages) <= self._max_tokens: tokens_before = self.estimate_tokens(messages)
if tokens_before <= self._max_tokens:
return messages return messages
# Separate system messages, old messages, and recent messages # Separate system messages, old messages, and recent messages
@ -101,10 +130,8 @@ class ContextCompressor:
old_msgs = non_system[: -self._keep_recent] old_msgs = non_system[: -self._keep_recent]
recent_msgs = non_system[-self._keep_recent :] recent_msgs = non_system[-self._keep_recent :]
# Compress old messages # Step 1: Summarize old messages
summary = await self._summarize(old_msgs) summary = await self._summarize(old_msgs)
# Build compressed message list
compressed = list(system_msgs) compressed = list(system_msgs)
if summary: if summary:
compressed.append( compressed.append(
@ -115,21 +142,44 @@ class ContextCompressor:
) )
compressed.extend(recent_msgs) compressed.extend(recent_msgs)
# Recursive check: if still over budget, compress again # Step 2: If still over budget, aggressive compress
# F-010: pass original `messages` (not `compressed`) to avoid summary-of-summary
strategy = "summary"
if self.estimate_tokens(compressed) > self._max_tokens: if self.estimate_tokens(compressed) > self._max_tokens:
if _compression_depth >= 1: compressed = await self._compress_aggressive(messages)
# Depth guard: force truncation instead of infinite recursion strategy = "aggressive"
return self._truncate(compressed)
if len(recent_msgs) > 1: # Step 3: If still over budget, truncate as last resort
# Try keeping fewer recent messages if self.estimate_tokens(compressed) > self._max_tokens:
return await self._compress_aggressive( compressed = self._truncate(compressed)
messages, _compression_depth=_compression_depth + 1 strategy = "truncate"
)
# Last resort: truncate # Step 4: Log compression result
return self._truncate(compressed) tokens_after = self.estimate_tokens(compressed)
self._log_compression(tokens_before, tokens_after, len(messages), len(compressed), strategy)
return compressed return compressed
def _log_compression(
self,
tokens_before: int,
tokens_after: int,
msg_count_before: int,
msg_count_after: int,
strategy: str,
) -> None:
"""Log structured compression info (tokens_before/after/ratio/msg_count)."""
ratio = tokens_after / tokens_before if tokens_before > 0 else 0.0
logger.info(
"context compressed: %d -> %d tokens (%.1f%%), messages: %d -> %d, strategy: %s",
tokens_before,
tokens_after,
ratio * 100,
msg_count_before,
msg_count_after,
strategy,
)
async def _summarize(self, messages: list[dict], max_input_tokens: int = 3200) -> str: async def _summarize(self, messages: list[dict], max_input_tokens: int = 3200) -> str:
"""Summarize a list of messages using LLM. """Summarize a list of messages using LLM.
@ -149,9 +199,12 @@ class ContextCompressor:
) )
# Pre-truncate if conversation_text exceeds safe token threshold # Pre-truncate if conversation_text exceeds safe token threshold
estimated_tokens = len(conversation_text) // 4 estimated_tokens = estimate_text_tokens(conversation_text)
if estimated_tokens > max_input_tokens: if estimated_tokens > max_input_tokens:
max_chars = max_input_tokens * 4 # CJK-aware char limit: max_input_tokens chars is exact for CJK (1:1),
# conservative for ASCII (4:1, truncates to 1/4 budget but safe).
# Review fix #1: old `* 4` allowed 4x token budget for CJK text.
max_chars = max_input_tokens
conversation_text = conversation_text[:max_chars] + "\n...[truncated]" conversation_text = conversation_text[:max_chars] + "\n...[truncated]"
prompt = ( prompt = (
@ -201,10 +254,8 @@ class ContextCompressor:
parts.append(f"[{role}]: {content}...") parts.append(f"[{role}]: {content}...")
return "\n".join(parts) return "\n".join(parts)
async def _compress_aggressive( async def _compress_aggressive(self, messages: list[dict]) -> list[dict]:
self, messages: list[dict], _compression_depth: int = 0 """Aggressive compression: keep only last message + summary of the rest."""
) -> list[dict]:
"""More aggressive compression when standard compression isn't enough"""
system_msgs = [m for m in messages if m.get("role") == "system"] system_msgs = [m for m in messages if m.get("role") == "system"]
non_system = [m for m in messages if m.get("role") != "system"] non_system = [m for m in messages if m.get("role") != "system"]

View File

@ -15,8 +15,14 @@ from dataclasses import dataclass, field
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import TYPE_CHECKING, Awaitable, Callable from typing import TYPE_CHECKING, Awaitable, Callable
from agentkit.core.exceptions import LLMProviderError, LoopDetectedError, TaskCancelledError, TaskTimeoutError from agentkit.core.exceptions import (
LLMProviderError,
LoopDetectedError,
TaskCancelledError,
TaskTimeoutError,
)
from agentkit.core.protocol import CancellationToken from agentkit.core.protocol import CancellationToken
from agentkit.core.compressor import estimate_text_tokens
from agentkit.llm.gateway import LLMGateway from agentkit.llm.gateway import LLMGateway
from agentkit.llm.protocol import LLMResponse from agentkit.llm.protocol import LLMResponse
from agentkit.tools.base import Tool, ToolValidationError from agentkit.tools.base import Tool, ToolValidationError
@ -1072,7 +1078,12 @@ class ReActEngine:
clean_args["_skip_dangerous_check"] = True clean_args["_skip_dangerous_check"] = True
try: try:
tool_result = await tool.safe_execute(**clean_args) tool_result = await tool.safe_execute(**clean_args)
except (ToolValidationError, ValueError, TypeError, RuntimeError) as e: except (
ToolValidationError,
ValueError,
TypeError,
RuntimeError,
) as e:
tool_result = { tool_result = {
"error": f"Tool '{tc.name}' execution failed: {e}", "error": f"Tool '{tc.name}' execution failed: {e}",
"error_code": "tool_execution_failed", "error_code": "tool_execution_failed",
@ -1090,7 +1101,12 @@ class ReActEngine:
if tool if tool
else {"error": f"Tool '{tc.name}' not found"} else {"error": f"Tool '{tc.name}' not found"}
) )
except (ToolValidationError, ValueError, TypeError, RuntimeError) as e: except (
ToolValidationError,
ValueError,
TypeError,
RuntimeError,
) as e:
tool_result = { tool_result = {
"error": f"Tool '{tc.name}' execution failed: {e}", "error": f"Tool '{tc.name}' execution failed: {e}",
"error_code": "tool_execution_failed", "error_code": "tool_execution_failed",
@ -1154,7 +1170,12 @@ class ReActEngine:
if self._should_compress(conversation, compressor): if self._should_compress(conversation, compressor):
try: try:
conversation = await compressor.compress(conversation) conversation = await compressor.compress(conversation)
except (asyncio.TimeoutError, ConnectionError, LLMProviderError, RuntimeError) as e: except (
asyncio.TimeoutError,
ConnectionError,
LLMProviderError,
RuntimeError,
) as e:
logger.warning(f"Incremental compression failed: {e}") logger.warning(f"Incremental compression failed: {e}")
else: else:
@ -1225,7 +1246,12 @@ class ReActEngine:
if self._should_compress(conversation, compressor): if self._should_compress(conversation, compressor):
try: try:
conversation = await compressor.compress(conversation) conversation = await compressor.compress(conversation)
except (asyncio.TimeoutError, ConnectionError, LLMProviderError, RuntimeError) as e: except (
asyncio.TimeoutError,
ConnectionError,
LLMProviderError,
RuntimeError,
) as e:
logger.warning(f"Incremental compression failed: {e}") logger.warning(f"Incremental compression failed: {e}")
else: else:
# ponytail: 检查是否为畸形工具调用(含 <tool_use> 但解析失败) # ponytail: 检查是否为畸形工具调用(含 <tool_use> 但解析失败)
@ -1340,7 +1366,12 @@ class ReActEngine:
reinjections, reinjections,
) )
break break
except (asyncio.TimeoutError, ConnectionError, LLMProviderError, RuntimeError) as e: except (
asyncio.TimeoutError,
ConnectionError,
LLMProviderError,
RuntimeError,
) as e:
logger.warning(f"Verification loop failed: {e}") logger.warning(f"Verification loop failed: {e}")
# Yield final_answer event (legacy format for execute_stream consumers) # Yield final_answer event (legacy format for execute_stream consumers)
@ -1717,8 +1748,10 @@ class ReActEngine:
if should_compress_fn is not None: if should_compress_fn is not None:
return should_compress_fn(conversation) return should_compress_fn(conversation)
# Fallback: fixed threshold for compressors without headroom support # Fallback: fixed threshold for compressors without headroom support
total_chars = sum(len(str(m.get("content", ""))) for m in conversation) # (e.g. HeadroomCompressor which doesn't implement should_compress)
estimated_tokens = total_chars // 4 estimated_tokens = sum(
estimate_text_tokens(str(m.get("content", ""))) for m in conversation
)
return estimated_tokens > self._DEFAULT_COMPRESS_THRESHOLD return estimated_tokens > self._DEFAULT_COMPRESS_THRESHOLD
async def _build_tool_result_message( async def _build_tool_result_message(

View File

@ -1,10 +1,10 @@
"""Tests for ContextCompressor and PromptTemplate cache""" """Tests for ContextCompressor and PromptTemplate cache"""
import inspect
import logging
from unittest.mock import AsyncMock, MagicMock from unittest.mock import AsyncMock, MagicMock
import pytest from agentkit.core.compressor import ContextCompressor, estimate_text_tokens
from agentkit.core.compressor import ContextCompressor
from agentkit.llm.protocol import LLMResponse, TokenUsage from agentkit.llm.protocol import LLMResponse, TokenUsage
from agentkit.prompts.section import PromptSection from agentkit.prompts.section import PromptSection
from agentkit.prompts.template import PromptTemplate from agentkit.prompts.template import PromptTemplate
@ -31,14 +31,18 @@ def make_long_messages(count: int = 10, content_length: int = 2000) -> list[dict
"""生成长消息列表用于测试压缩""" """生成长消息列表用于测试压缩"""
messages = [{"role": "system", "content": "You are a helpful assistant."}] messages = [{"role": "system", "content": "You are a helpful assistant."}]
for i in range(count): for i in range(count):
messages.append({ messages.append(
{
"role": "user", "role": "user",
"content": "x" * content_length + f" message {i}", "content": "x" * content_length + f" message {i}",
}) }
messages.append({ )
messages.append(
{
"role": "assistant", "role": "assistant",
"content": "y" * content_length + f" reply {i}", "content": "y" * content_length + f" reply {i}",
}) }
)
return messages return messages
@ -73,6 +77,79 @@ class TestEstimateTokens:
assert compressor.estimate_tokens(messages) == 0 assert compressor.estimate_tokens(messages) == 0
class TestEstimateTextTokensCJK:
"""estimate_text_tokens CJK 估算测试 (U1)"""
def test_pure_cjk_chinese(self):
# 4 CJK chars = 4 tokens (1:1)
assert estimate_text_tokens("你好世界") == 4
def test_pure_ascii(self):
# 11 chars / 4 = 2.75, floor = 2
assert estimate_text_tokens("hello world") == 2
def test_pure_cjk_japanese_kana(self):
# 5 Hiragana chars = 5 tokens (1:1)
assert estimate_text_tokens("こんにちは") == 5
def test_pure_cjk_korean_hangul(self):
# 5 Hangul chars = 5 tokens (1:1)
assert estimate_text_tokens("안녕하세요") == 5
def test_mixed_cjk_and_ascii(self):
# "你好" (2 CJK = 2 tokens) + " world" (6 ASCII = 1 token) = 3
assert estimate_text_tokens("你好 world") == 3
def test_empty_string(self):
assert estimate_text_tokens("") == 0
def test_estimate_tokens_with_cjk_messages(self):
"""estimate_tokens() 对 CJK 消息不再低估 4 倍"""
compressor = ContextCompressor()
messages = [{"role": "user", "content": "你好世界"}] # 4 CJK = 4 tokens
assert compressor.estimate_tokens(messages) == 4
def test_estimate_tokens_mixed_messages(self):
"""estimate_tokens() 对混合消息给出合理估值"""
compressor = ContextCompressor()
messages = [
{"role": "user", "content": "你好"}, # 2 CJK = 2
{"role": "assistant", "content": "hello"}, # 5 ASCII = 1
]
assert compressor.estimate_tokens(messages) == 3
def test_cjk_not_underestimated_4x(self):
"""AE1: 100 条 CJK 消息的 estimate_tokens >= 旧实现的 4 倍"""
compressor = ContextCompressor()
cjk_msg = [{"role": "user", "content": "你好" * 50}] # 100 CJK chars
new_estimate = compressor.estimate_tokens(cjk_msg)
old_estimate = len("你好" * 50) // 4 # old: len // 4
assert new_estimate >= old_estimate * 4
async def test_summarize_cjk_pre_truncation(self):
"""Review fix #2: _summarize() CJK 文本预截断正确触发
构造 CJK 文本使 estimate_text_tokens > max_input_tokens
len(text) < max_input_tokens * 4验证旧 bug* 4 假设允许 4x 超预算
"""
gateway = make_mock_gateway("Summary result")
compressor = ContextCompressor(llm_gateway=gateway)
# 4000 CJK chars = 4000 tokens (1:1), > max_input_tokens=3200
# But len=4000 < 3200 * 4 = 12800, so old `* 4` limit wouldn't truncate
cjk_content = "" * 4000
messages = [{"role": "user", "content": cjk_content}]
await compressor._summarize(messages, max_input_tokens=3200)
# Verify LLM was called with truncated text (not full 4000 chars)
call_messages = gateway.chat.call_args.kwargs["messages"]
prompt_content = call_messages[0]["content"]
# The conversation_text in the prompt should be truncated to <= 3200 chars
# (plus truncation marker), not the full 4000 chars
assert "...[truncated]" in prompt_content
# Verify the CJK content was actually shortened
assert prompt_content.count("") < 4000
class TestNoCompressionWhenUnderBudget: class TestNoCompressionWhenUnderBudget:
"""Token 预算内不压缩""" """Token 预算内不压缩"""
@ -181,7 +258,8 @@ class TestSummaryGenerationWithLLM:
gateway.chat.assert_called_once() gateway.chat.assert_called_once()
# 摘要应出现在结果中 # 摘要应出现在结果中
summary_msgs = [ summary_msgs = [
m for m in result m
for m in result
if m.get("role") == "system" and "Conversation Summary" in m.get("content", "") if m.get("role") == "system" and "Conversation Summary" in m.get("content", "")
] ]
assert len(summary_msgs) == 1 assert len(summary_msgs) == 1
@ -207,7 +285,8 @@ class TestFallbackToSimpleSummary:
# 应该有摘要消息(简单截断模式) # 应该有摘要消息(简单截断模式)
summary_msgs = [ summary_msgs = [
m for m in result m
for m in result
if m.get("role") == "system" and "Conversation Summary" in m.get("content", "") if m.get("role") == "system" and "Conversation Summary" in m.get("content", "")
] ]
assert len(summary_msgs) == 1 assert len(summary_msgs) == 1
@ -232,7 +311,8 @@ class TestFallbackToSimpleSummary:
# 应该有摘要消息(回退到简单摘要) # 应该有摘要消息(回退到简单摘要)
summary_msgs = [ summary_msgs = [
m for m in result m
for m in result
if m.get("role") == "system" and "Conversation Summary" in m.get("content", "") if m.get("role") == "system" and "Conversation Summary" in m.get("content", "")
] ]
assert len(summary_msgs) == 1 assert len(summary_msgs) == 1
@ -292,6 +372,169 @@ class TestTruncation:
assert result[0]["content"] == "Short message" assert result[0]["content"] == "Short message"
class TestCompressLinearFlow:
"""U3: compress() 线性流程 + 签名变更测试"""
def test_compress_signature_no_compression_depth(self):
"""compress() 不再接受 _compression_depth 参数"""
sig = inspect.signature(ContextCompressor.compress)
assert "_compression_depth" not in sig.parameters
def test_compress_aggressive_signature_no_compression_depth(self):
"""_compress_aggressive() 不再接受 _compression_depth 参数"""
sig = inspect.signature(ContextCompressor._compress_aggressive)
assert "_compression_depth" not in sig.parameters
async def test_short_messages_not_compressed_linear(self):
"""短消息不压缩(线性流程验证)"""
compressor = ContextCompressor(max_tokens=10000)
messages = [
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there!"},
]
result = await compressor.compress(messages)
assert result == messages
async def test_aggressive_receives_original_messages(self):
"""F-010: _compress_aggressive 接收 original messages, 非 compressed"""
# First summary is very long (triggers aggressive), second is short
long_summary = LLMResponse(
content="x" * 5000,
model="test",
usage=TokenUsage(prompt_tokens=10, completion_tokens=10),
)
short_summary = LLMResponse(
content="short summary",
model="test",
usage=TokenUsage(prompt_tokens=10, completion_tokens=10),
)
gateway = MagicMock()
gateway.chat = AsyncMock(side_effect=[long_summary, short_summary])
compressor = ContextCompressor(
llm_gateway=gateway,
max_tokens=10,
keep_recent=2,
)
messages = [
{"role": "user", "content": "ORIGINAL_MARKER_a" * 2000},
{"role": "assistant", "content": "ORIGINAL_MARKER_b" * 2000},
{"role": "user", "content": "Recent"},
{"role": "assistant", "content": "Reply"},
]
await compressor.compress(messages)
# Second call (aggressive) should receive original message content,
# not the first summary ("x" * 5000)
assert gateway.chat.call_count == 2
second_call_content = gateway.chat.call_args_list[1].kwargs["messages"][0]["content"]
assert "ORIGINAL_MARKER" in second_call_content
# First summary content should NOT appear in the aggressive call
assert "xxxx" not in second_call_content
async def test_truncate_triggered_when_aggressive_insufficient(self):
"""aggressive 后仍超阈值 → truncate 强制截断"""
# Both summaries are very long, forcing truncate as last resort
long_summary = LLMResponse(
content="z" * 5000,
model="test",
usage=TokenUsage(prompt_tokens=10, completion_tokens=10),
)
gateway = MagicMock()
gateway.chat = AsyncMock(side_effect=[long_summary, long_summary])
compressor = ContextCompressor(
llm_gateway=gateway,
max_tokens=10,
keep_recent=2,
)
messages = [
{"role": "user", "content": "a" * 5000},
{"role": "assistant", "content": "b" * 5000},
{"role": "user", "content": "c" * 5000},
{"role": "assistant", "content": "d" * 5000},
{"role": "user", "content": "Recent"},
{"role": "assistant", "content": "Reply"},
]
result = await compressor.compress(messages)
# Truncate should have cut message content
total_chars = sum(len(str(m.get("content", ""))) for m in result)
assert total_chars < sum(len(str(m.get("content", ""))) for m in messages)
# Review fix #7: verify truncate actually triggered via truncation marker
assert any("...[truncated]" in str(m.get("content", "")) for m in result)
class TestCompressionLogging:
"""U3: _log_compression 结构化日志测试"""
async def test_log_compression_outputs_structured_info(self, caplog):
"""_log_compression 输出结构化日志(包含 tokens/ratio/strategy"""
gateway = make_mock_gateway("Summary")
compressor = ContextCompressor(
llm_gateway=gateway,
max_tokens=100,
keep_recent=2,
)
messages = [
{"role": "user", "content": "a" * 2000},
{"role": "assistant", "content": "b" * 2000},
{"role": "user", "content": "Recent"},
{"role": "assistant", "content": "Reply"},
]
with caplog.at_level(logging.INFO, logger="agentkit.core.compressor"):
await compressor.compress(messages)
# 验证结构化日志包含压缩信息
log_messages = [record.message for record in caplog.records]
assert any("context compressed" in msg for msg in log_messages)
assert any("strategy: summary" in msg for msg in log_messages)
# 日志应包含 token 数量和消息数量
assert any("tokens" in msg for msg in log_messages)
assert any("messages:" in msg for msg in log_messages)
async def test_no_log_when_not_compressed(self, caplog):
"""未触发压缩时不输出日志"""
compressor = ContextCompressor(max_tokens=10000)
messages = [
{"role": "user", "content": "Hello"},
]
with caplog.at_level(logging.INFO, logger="agentkit.core.compressor"):
await compressor.compress(messages)
log_messages = [record.message for record in caplog.records]
assert not any("context compressed" in msg for msg in log_messages)
async def test_log_strategy_aggressive(self, caplog):
"""压缩策略为 aggressive 时日志记录正确"""
long_summary = LLMResponse(
content="x" * 5000,
model="test",
usage=TokenUsage(prompt_tokens=10, completion_tokens=10),
)
short_summary = LLMResponse(
content="short",
model="test",
usage=TokenUsage(prompt_tokens=10, completion_tokens=10),
)
gateway = MagicMock()
gateway.chat = AsyncMock(side_effect=[long_summary, short_summary])
compressor = ContextCompressor(
llm_gateway=gateway,
max_tokens=10,
keep_recent=2,
)
messages = [
{"role": "user", "content": "a" * 2000},
{"role": "assistant", "content": "b" * 2000},
{"role": "user", "content": "Recent"},
{"role": "assistant", "content": "Reply"},
]
with caplog.at_level(logging.INFO, logger="agentkit.core.compressor"):
await compressor.compress(messages)
log_messages = [record.message for record in caplog.records]
assert any("strategy: aggressive" in msg for msg in log_messages)
class TestNotEnoughMessagesToCompress: class TestNotEnoughMessagesToCompress:
"""消息数量不足时跳过压缩""" """消息数量不足时跳过压缩"""
@ -397,11 +640,13 @@ class TestReActEngineWithCompressor:
from agentkit.llm.protocol import LLMResponse, TokenUsage from agentkit.llm.protocol import LLMResponse, TokenUsage
gateway = MagicMock() gateway = MagicMock()
gateway.chat = AsyncMock(return_value=LLMResponse( gateway.chat = AsyncMock(
return_value=LLMResponse(
content="Final answer", content="Final answer",
model="test", model="test",
usage=TokenUsage(prompt_tokens=10, completion_tokens=10), usage=TokenUsage(prompt_tokens=10, completion_tokens=10),
)) )
)
compressor = ContextCompressor(max_tokens=10000) compressor = ContextCompressor(max_tokens=10000)
engine = ReActEngine(llm_gateway=gateway) engine = ReActEngine(llm_gateway=gateway)
@ -418,11 +663,13 @@ class TestReActEngineWithCompressor:
from agentkit.llm.protocol import LLMResponse, TokenUsage from agentkit.llm.protocol import LLMResponse, TokenUsage
gateway = MagicMock() gateway = MagicMock()
gateway.chat = AsyncMock(return_value=LLMResponse( gateway.chat = AsyncMock(
return_value=LLMResponse(
content="Answer", content="Answer",
model="test", model="test",
usage=TokenUsage(prompt_tokens=10, completion_tokens=10), usage=TokenUsage(prompt_tokens=10, completion_tokens=10),
)) )
)
engine = ReActEngine(llm_gateway=gateway) engine = ReActEngine(llm_gateway=gateway)
@ -432,3 +679,33 @@ class TestReActEngineWithCompressor:
) )
assert result.output == "Answer" assert result.output == "Answer"
async def test_should_compress_cjk_fallback_path(self):
"""Review fix #5: _should_compress() CJK fallback for compressors
without should_compress() method (e.g. HeadroomCompressor)
Verifies R7: react.py fallback uses estimate_text_tokens, so CJK
long conversations correctly trigger compression.
"""
from agentkit.core.react import ReActEngine
gateway = MagicMock()
engine = ReActEngine(llm_gateway=gateway)
# Mock compressor WITHOUT should_compress() method
# (simulates HeadroomCompressor which doesn't implement it)
mock_compressor = MagicMock(spec=["is_available", "compress", "compress_tool_result"])
mock_compressor.is_available.return_value = True
# CJK long conversation: 10000 CJK chars = 10000 tokens > 8000 threshold
cjk_conversation = [
{"role": "user", "content": "" * 5000},
{"role": "assistant", "content": "" * 5000},
]
result = engine._should_compress(cjk_conversation, mock_compressor)
assert result is True
# ASCII short conversation should not trigger
ascii_short = [{"role": "user", "content": "Hello"}]
result = engine._should_compress(ascii_short, mock_compressor)
assert result is False