feat(core): add headroom-based compression trigger

U3: ContextCompressor now accepts model_context_limit, headroom_threshold,
and min_tokens. should_compress() triggers when token ratio exceeds 0.8 of
model limit OR exceeds min_tokens (8000 fallback). ReActEngine._should_compress
delegates to compressor when available, checks is_available() first.

Tests: 6 scenarios (headroom trigger, min_tokens guard, small model,
unavailable compressor, delegation, fallback) — all pass.
This commit is contained in:
chiguyong 2026-06-24 20:28:14 +08:00
parent 717aad1303
commit 122173ec2c
3 changed files with 138 additions and 1 deletions

View File

@ -38,11 +38,33 @@ class ContextCompressor:
max_tokens: int = 4000, max_tokens: int = 4000,
keep_recent: int = 3, keep_recent: int = 3,
model: str = "default", model: str = "default",
model_context_limit: int = 128_000,
headroom_threshold: float = 0.8,
min_tokens: int = 8_000,
): ):
self._llm_gateway = llm_gateway self._llm_gateway = llm_gateway
self._max_tokens = max_tokens self._max_tokens = max_tokens
self._keep_recent = keep_recent self._keep_recent = keep_recent
self._model = model self._model = model
# U3: Headroom-based compression trigger — predict context overflow
# before the single-request limit is hit.
self._model_context_limit = model_context_limit
self._headroom_threshold = headroom_threshold
self._min_tokens = min_tokens
def should_compress(self, messages: list[dict]) -> bool:
"""Check if compression should be triggered based on headroom ratio.
Triggers when either:
1. estimated_tokens / model_context_limit > headroom_threshold (headroom)
2. estimated_tokens > min_tokens (fixed fallback, preserves old behavior)
"""
estimated = self.estimate_tokens(messages)
if estimated / self._model_context_limit > self._headroom_threshold:
return True
if estimated > self._min_tokens:
return True
return False
def estimate_tokens(self, messages: list[dict]) -> int: def estimate_tokens(self, messages: list[dict]) -> int:
"""Estimate total tokens in message list (rough: 4 chars = 1 token)""" """Estimate total tokens in message list (rough: 4 chars = 1 token)"""

View File

@ -1742,7 +1742,15 @@ class ReActEngine:
"""检查是否需要增量压缩""" """检查是否需要增量压缩"""
if not compressor: if not compressor:
return False return False
# Estimate tokens in conversation (rough: 4 chars ≈ 1 token) # U3: Skip if compressor reports unavailable
is_available_fn = getattr(compressor, "is_available", None)
if is_available_fn is not None and not is_available_fn():
return False
# U3: Delegate to compressor's headroom-based should_compress if available
should_compress_fn = getattr(compressor, "should_compress", None)
if should_compress_fn is not None:
return should_compress_fn(conversation)
# Fallback: fixed threshold for compressors without headroom support
total_chars = sum(len(str(m.get("content", ""))) for m in conversation) total_chars = sum(len(str(m.get("content", ""))) for m in conversation)
estimated_tokens = total_chars // 4 estimated_tokens = total_chars // 4
return estimated_tokens > self._DEFAULT_COMPRESS_THRESHOLD return estimated_tokens > self._DEFAULT_COMPRESS_THRESHOLD

View File

@ -776,3 +776,110 @@ class TestLoopDetection:
assert len(engine._loop_window) == 0 assert len(engine._loop_window) == 0
assert engine._loop_corrected is False assert engine._loop_corrected is False
# ── U3: Headroom 压缩测试 ─────────────────────────────────
def _make_messages(token_count: int) -> list[dict]:
"""Create messages with approximately the given token count (4 chars = 1 token)."""
char_count = token_count * 4
return [{"role": "user", "content": "x" * char_count}]
class TestHeadroomCompression:
"""U3: 主动压缩触发 — 基于 token 用量预测主动触发压缩"""
def test_headroom_triggers_when_ratio_exceeds_threshold(self):
"""Happy path: 110K tokens, model_limit 128K → 0.86 > 0.8 → 触发"""
from agentkit.core.compressor import ContextCompressor
compressor = ContextCompressor(model_context_limit=128_000)
messages = _make_messages(110_000)
assert compressor.should_compress(messages) is True
def test_headroom_does_not_trigger_below_min_tokens(self):
"""Edge case: 5K tokens, model_limit 128K → 不触发(低于 min_tokens 8000"""
from agentkit.core.compressor import ContextCompressor
compressor = ContextCompressor(model_context_limit=128_000)
messages = _make_messages(5_000)
assert compressor.should_compress(messages) is False
def test_headroom_triggers_for_small_model(self):
"""Edge case: model_limit 8K, conversation 7K → 0.875 > 0.8 → 触发"""
from agentkit.core.compressor import ContextCompressor
compressor = ContextCompressor(model_context_limit=8_000)
messages = _make_messages(7_000)
assert compressor.should_compress(messages) is True
def test_react_skips_compression_when_unavailable(self):
"""Error path: 压缩器 is_available()=False → 跳过压缩"""
from agentkit.core.react import ReActEngine
gateway = make_mock_gateway([make_response(content="Done")])
engine = ReActEngine(llm_gateway=gateway)
compressor = MagicMock()
compressor.is_available.return_value = False
compressor.should_compress = MagicMock(return_value=True)
result = engine._should_compress(
[{"role": "user", "content": "x" * 100000}], compressor
)
assert result is False
def test_react_delegates_to_compressor_should_compress(self):
"""ReActEngine._should_compress delegates to compressor.should_compress"""
from agentkit.core.react import ReActEngine
gateway = make_mock_gateway([make_response(content="Done")])
engine = ReActEngine(llm_gateway=gateway)
compressor = ContextCompressorStub(available=True, compress=True)
result = engine._should_compress([{"role": "user", "content": "test"}], compressor)
assert result is True
compressor = ContextCompressorStub(available=True, compress=False)
result = engine._should_compress([{"role": "user", "content": "test"}], compressor)
assert result is False
def test_react_fallback_for_compressors_without_should_compress(self):
"""Fallback: compressors without should_compress use fixed threshold"""
from agentkit.core.react import ReActEngine
gateway = make_mock_gateway([make_response(content="Done")])
engine = ReActEngine(llm_gateway=gateway)
# Compressor with is_available but no should_compress method
compressor = MagicMock()
compressor.is_available.return_value = True
# Remove should_compress attribute to test fallback
del compressor.should_compress
# Below threshold → no compression
small_msgs = _make_messages(4_000)
assert engine._should_compress(small_msgs, compressor) is False
# Above threshold → compression
large_msgs = _make_messages(10_000)
assert engine._should_compress(large_msgs, compressor) is True
class ContextCompressorStub:
"""Stub compressor for testing _should_compress delegation."""
def __init__(self, available: bool, compress: bool):
self._available = available
self._compress = compress
def is_available(self) -> bool:
return self._available
def should_compress(self, messages: list[dict]) -> bool:
return self._compress