"""GEO Pipeline 压缩集成测试 验证 GEO Pipeline 在 Headroom 压缩下的端到端工作。 """ import asyncio import json from unittest.mock import AsyncMock, MagicMock, patch import pytest from agentkit.core.compressor import CompressionStrategy, ContextCompressor, create_compressor from agentkit.core.react import ReActEngine, ReActResult from agentkit.llm.gateway import LLMGateway from agentkit.llm.protocol import LLMResponse, TokenUsage, ToolCall from agentkit.tools.registry import ToolRegistry def make_mock_gateway(tool_name: str = "baidu_search") -> MagicMock: """创建 mock LLMGateway""" gateway = MagicMock(spec=LLMGateway) # First call: tool call. Second call: final answer. tool_call = ToolCall(id="tc_1", name=tool_name, arguments={"query": "GEO优化"}) tool_response = LLMResponse( content="", model="test", usage=TokenUsage(prompt_tokens=100, completion_tokens=50), tool_calls=[tool_call], ) final_response = LLMResponse( content="GEO优化建议:1. 添加Schema.org标记 2. 优化页面标题", model="test", usage=TokenUsage(prompt_tokens=80, completion_tokens=40), ) gateway.chat = AsyncMock(side_effect=[tool_response, final_response]) return gateway class MockHeadroomCompressor: """Mock HeadroomCompressor for testing without headroom-ai""" def __init__(self, config=None): self._config = config or {} self._ccr_cache = {} self._compress_count = 0 async def compress(self, messages): result = [] for msg in messages: if msg.get("role") == "tool" and len(str(msg.get("content", ""))) > 100: original = str(msg.get("content", "")) # Simulate compression: keep first 50 chars compressed = original[:50] + "...[compressed]" ccr_hash = self._store_ccr(original) compressed += f"\n" result.append({**msg, "content": compressed}) self._compress_count += 1 else: result.append(msg) return result async def compress_tool_result(self, tool_name, result): content = str(result) if len(content) > 100: compressed = content[:50] + "...[compressed]" ccr_hash = self._store_ccr(content) compressed += f"\n" self._compress_count += 1 return compressed return content def is_available(self): return True def _store_ccr(self, original): import hashlib ccr_hash = hashlib.sha256(original.encode()).hexdigest()[:16] self._ccr_cache[ccr_hash] = original return ccr_hash def retrieve(self, ccr_hash=None, query=None): if ccr_hash and ccr_hash in self._ccr_cache: return {"content": self._ccr_cache[ccr_hash], "ccr_hash": ccr_hash, "success": True} return {"error": "Not found", "success": False} class TestGEOPipelineCompression: """GEO Pipeline 压缩集成测试""" @pytest.mark.asyncio async def test_pipeline_with_compression_enabled(self): """启用压缩后 GEO Pipeline 端到端执行成功""" gateway = make_mock_gateway() engine = ReActEngine(gateway, max_steps=5) compressor = MockHeadroomCompressor() # Create a mock tool from agentkit.tools.base import Tool mock_tool = MagicMock(spec=Tool) mock_tool.name = "baidu_search" mock_tool.description = "Search Baidu" mock_tool.input_schema = {"type": "object", "properties": {"query": {"type": "string"}}} mock_tool.safe_execute = AsyncMock(return_value={ "results": [{"title": f"Result {i}", "url": f"https://example.com/{i}"} for i in range(20)], "success": True, }) result = await engine.execute( messages=[{"role": "user", "content": "分析GEO优化策略"}], tools=[mock_tool], compressor=compressor, ) assert result.status == "success" or result.output assert compressor._compress_count > 0 @pytest.mark.asyncio async def test_tool_outputs_are_compressed(self): """工具输出被压缩""" gateway = make_mock_gateway(tool_name="web_crawl") engine = ReActEngine(gateway, max_steps=5) compressor = MockHeadroomCompressor() from agentkit.tools.base import Tool mock_tool = MagicMock(spec=Tool) mock_tool.name = "web_crawl" mock_tool.description = "Crawl web page" mock_tool.input_schema = {"type": "object", "properties": {"url": {"type": "string"}}} mock_tool.safe_execute = AsyncMock(return_value={ "content": "A" * 5000, # Long content that should be compressed "success": True, }) result = await engine.execute( messages=[{"role": "user", "content": "抓取网页"}], tools=[mock_tool], compressor=compressor, ) assert compressor._compress_count > 0 @pytest.mark.asyncio async def test_ccr_retrieve_works(self): """CCR 检索可取回原始数据""" compressor = MockHeadroomCompressor() # Simulate storing content original = "这是一段很长的搜索结果" * 100 compressed = await compressor.compress_tool_result("baidu_search", original) # Extract CCR hash from compressed content import re match = re.search(r'CCR:hash=([a-f0-9]+)', compressed) assert match, f"No CCR hash found in compressed content: {compressed[:100]}" ccr_hash = match.group(1) retrieved = compressor.retrieve(ccr_hash=ccr_hash) assert retrieved["success"] is True assert retrieved["content"] == original @pytest.mark.asyncio async def test_compression_disabled_pipeline_works(self): """compression.enabled=false 时 Pipeline 行为与之前完全一致""" gateway = make_mock_gateway() engine = ReActEngine(gateway, max_steps=5) from agentkit.tools.base import Tool mock_tool = MagicMock(spec=Tool) mock_tool.name = "baidu_search" mock_tool.description = "Search Baidu" mock_tool.input_schema = {"type": "object", "properties": {"query": {"type": "string"}}} mock_tool.safe_execute = AsyncMock(return_value={"results": [], "success": True}) # No compressor result = await engine.execute( messages=[{"role": "user", "content": "搜索"}], tools=[mock_tool], compressor=None, ) assert result.output # Should still produce output @pytest.mark.asyncio async def test_create_compressor_with_geo_config(self): """GEO 配置正确创建压缩器""" # Disabled assert create_compressor({"enabled": False}) is None # Summary mode c = create_compressor({"enabled": True, "provider": "summary", "max_tokens": 2000}) assert isinstance(c, ContextCompressor) # Headroom mode (falls back since not installed) c = create_compressor({"enabled": True, "provider": "headroom"}) assert isinstance(c, (ContextCompressor, CompressionStrategy))