From bad66445ff6e293f7f96b3400d0a9f6f5e0776aa Mon Sep 17 00:00:00 2001 From: chiguyong Date: Sun, 7 Jun 2026 18:20:41 +0800 Subject: [PATCH] feat(compression): U6 GEO Pipeline compression integration tests and config Add GEO Pipeline end-to-end compression integration tests with MockHeadroomCompressor. Add compression configuration section to llm_config.yaml with headroom and summary mode examples. --- configs/llm_config.yaml | 15 ++ tests/integration/test_geo_compression.py | 196 ++++++++++++++++++++++ 2 files changed, 211 insertions(+) create mode 100644 tests/integration/test_geo_compression.py diff --git a/configs/llm_config.yaml b/configs/llm_config.yaml index 5e82154..bc3dc39 100644 --- a/configs/llm_config.yaml +++ b/configs/llm_config.yaml @@ -28,3 +28,18 @@ model_aliases: fallbacks: deepseek/deepseek-chat: - "openai/qwen3-coder-plus" + +# 上下文压缩配置 — 长会话自动压缩历史消息,保持 Token 在预算内 +# GEO Pipeline 启用后,工具输出(搜索结果、网页抓取等)会自动压缩 +compression: + enabled: false # 是否启用压缩(生产环境建议 true) + provider: "headroom" # "headroom" | "summary" + # --- Headroom 模式(推荐,需安装 headroom-ai)--- + compressors: # 启用的压缩器 + - "smart_crusher" # JSON/结构化数据压缩 + - "code_compressor" # 代码内容压缩 + ccr_ttl: 300 # CCR 缓存 TTL(秒) + min_length: 500 # 最小压缩长度(字符) + # --- Summary 模式(无需额外依赖)--- + # max_tokens: 4000 # Token 预算 + # keep_recent: 3 # 保留最近 N 条消息 diff --git a/tests/integration/test_geo_compression.py b/tests/integration/test_geo_compression.py new file mode 100644 index 0000000..3aab2e4 --- /dev/null +++ b/tests/integration/test_geo_compression.py @@ -0,0 +1,196 @@ +"""GEO Pipeline 压缩集成测试 + +验证 GEO Pipeline 在 Headroom 压缩下的端到端工作。 +""" + +import asyncio +import json +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from agentkit.core.compressor import CompressionStrategy, ContextCompressor, create_compressor +from agentkit.core.react import ReActEngine, ReActResult +from agentkit.llm.gateway import LLMGateway +from agentkit.llm.protocol import LLMResponse, TokenUsage, ToolCall +from agentkit.tools.registry import ToolRegistry + + +def make_mock_gateway(tool_name: str = "baidu_search") -> MagicMock: + """创建 mock LLMGateway""" + gateway = MagicMock(spec=LLMGateway) + # First call: tool call. Second call: final answer. + tool_call = ToolCall(id="tc_1", name=tool_name, arguments={"query": "GEO优化"}) + tool_response = LLMResponse( + content="", + model="test", + usage=TokenUsage(prompt_tokens=100, completion_tokens=50), + tool_calls=[tool_call], + ) + final_response = LLMResponse( + content="GEO优化建议:1. 添加Schema.org标记 2. 优化页面标题", + model="test", + usage=TokenUsage(prompt_tokens=80, completion_tokens=40), + ) + gateway.chat = AsyncMock(side_effect=[tool_response, final_response]) + return gateway + + +class MockHeadroomCompressor: + """Mock HeadroomCompressor for testing without headroom-ai""" + + def __init__(self, config=None): + self._config = config or {} + self._ccr_cache = {} + self._compress_count = 0 + + async def compress(self, messages): + result = [] + for msg in messages: + if msg.get("role") == "tool" and len(str(msg.get("content", ""))) > 100: + original = str(msg.get("content", "")) + # Simulate compression: keep first 50 chars + compressed = original[:50] + "...[compressed]" + ccr_hash = self._store_ccr(original) + compressed += f"\n" + result.append({**msg, "content": compressed}) + self._compress_count += 1 + else: + result.append(msg) + return result + + async def compress_tool_result(self, tool_name, result): + content = str(result) + if len(content) > 100: + compressed = content[:50] + "...[compressed]" + ccr_hash = self._store_ccr(content) + compressed += f"\n" + self._compress_count += 1 + return compressed + return content + + def is_available(self): + return True + + def _store_ccr(self, original): + import hashlib + ccr_hash = hashlib.sha256(original.encode()).hexdigest()[:16] + self._ccr_cache[ccr_hash] = original + return ccr_hash + + def retrieve(self, ccr_hash=None, query=None): + if ccr_hash and ccr_hash in self._ccr_cache: + return {"content": self._ccr_cache[ccr_hash], "ccr_hash": ccr_hash, "success": True} + return {"error": "Not found", "success": False} + + +class TestGEOPipelineCompression: + """GEO Pipeline 压缩集成测试""" + + @pytest.mark.asyncio + async def test_pipeline_with_compression_enabled(self): + """启用压缩后 GEO Pipeline 端到端执行成功""" + gateway = make_mock_gateway() + engine = ReActEngine(gateway, max_steps=5) + compressor = MockHeadroomCompressor() + + # Create a mock tool + from agentkit.tools.base import Tool + mock_tool = MagicMock(spec=Tool) + mock_tool.name = "baidu_search" + mock_tool.description = "Search Baidu" + mock_tool.input_schema = {"type": "object", "properties": {"query": {"type": "string"}}} + mock_tool.safe_execute = AsyncMock(return_value={ + "results": [{"title": f"Result {i}", "url": f"https://example.com/{i}"} for i in range(20)], + "success": True, + }) + + result = await engine.execute( + messages=[{"role": "user", "content": "分析GEO优化策略"}], + tools=[mock_tool], + compressor=compressor, + ) + + assert result.status == "success" or result.output + assert compressor._compress_count > 0 + + @pytest.mark.asyncio + async def test_tool_outputs_are_compressed(self): + """工具输出被压缩""" + gateway = make_mock_gateway(tool_name="web_crawl") + engine = ReActEngine(gateway, max_steps=5) + compressor = MockHeadroomCompressor() + + from agentkit.tools.base import Tool + mock_tool = MagicMock(spec=Tool) + mock_tool.name = "web_crawl" + mock_tool.description = "Crawl web page" + mock_tool.input_schema = {"type": "object", "properties": {"url": {"type": "string"}}} + mock_tool.safe_execute = AsyncMock(return_value={ + "content": "A" * 5000, # Long content that should be compressed + "success": True, + }) + + result = await engine.execute( + messages=[{"role": "user", "content": "抓取网页"}], + tools=[mock_tool], + compressor=compressor, + ) + + assert compressor._compress_count > 0 + + @pytest.mark.asyncio + async def test_ccr_retrieve_works(self): + """CCR 检索可取回原始数据""" + compressor = MockHeadroomCompressor() + + # Simulate storing content + original = "这是一段很长的搜索结果" * 100 + compressed = await compressor.compress_tool_result("baidu_search", original) + + # Extract CCR hash from compressed content + import re + match = re.search(r'CCR:hash=([a-f0-9]+)', compressed) + assert match, f"No CCR hash found in compressed content: {compressed[:100]}" + + ccr_hash = match.group(1) + retrieved = compressor.retrieve(ccr_hash=ccr_hash) + + assert retrieved["success"] is True + assert retrieved["content"] == original + + @pytest.mark.asyncio + async def test_compression_disabled_pipeline_works(self): + """compression.enabled=false 时 Pipeline 行为与之前完全一致""" + gateway = make_mock_gateway() + engine = ReActEngine(gateway, max_steps=5) + + from agentkit.tools.base import Tool + mock_tool = MagicMock(spec=Tool) + mock_tool.name = "baidu_search" + mock_tool.description = "Search Baidu" + mock_tool.input_schema = {"type": "object", "properties": {"query": {"type": "string"}}} + mock_tool.safe_execute = AsyncMock(return_value={"results": [], "success": True}) + + # No compressor + result = await engine.execute( + messages=[{"role": "user", "content": "搜索"}], + tools=[mock_tool], + compressor=None, + ) + + assert result.output # Should still produce output + + @pytest.mark.asyncio + async def test_create_compressor_with_geo_config(self): + """GEO 配置正确创建压缩器""" + # Disabled + assert create_compressor({"enabled": False}) is None + + # Summary mode + c = create_compressor({"enabled": True, "provider": "summary", "max_tokens": 2000}) + assert isinstance(c, ContextCompressor) + + # Headroom mode (falls back since not installed) + c = create_compressor({"enabled": True, "provider": "headroom"}) + assert isinstance(c, (ContextCompressor, CompressionStrategy))