fischer-agentkit/tests/integration/test_geo_compression.py

197 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""GEO Pipeline 压缩集成测试
验证 GEO Pipeline 在 Headroom 压缩下的端到端工作。
"""
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from agentkit.core.compressor import CompressionStrategy, ContextCompressor, create_compressor
from agentkit.core.react import ReActEngine, ReActResult
from agentkit.llm.gateway import LLMGateway
from agentkit.llm.protocol import LLMResponse, TokenUsage, ToolCall
from agentkit.tools.registry import ToolRegistry
def make_mock_gateway(tool_name: str = "baidu_search") -> MagicMock:
"""创建 mock LLMGateway"""
gateway = MagicMock(spec=LLMGateway)
# First call: tool call. Second call: final answer.
tool_call = ToolCall(id="tc_1", name=tool_name, arguments={"query": "GEO优化"})
tool_response = LLMResponse(
content="",
model="test",
usage=TokenUsage(prompt_tokens=100, completion_tokens=50),
tool_calls=[tool_call],
)
final_response = LLMResponse(
content="GEO优化建议1. 添加Schema.org标记 2. 优化页面标题",
model="test",
usage=TokenUsage(prompt_tokens=80, completion_tokens=40),
)
gateway.chat = AsyncMock(side_effect=[tool_response, final_response])
return gateway
class MockHeadroomCompressor:
"""Mock HeadroomCompressor for testing without headroom-ai"""
def __init__(self, config=None):
self._config = config or {}
self._ccr_cache = {}
self._compress_count = 0
async def compress(self, messages):
result = []
for msg in messages:
if msg.get("role") == "tool" and len(str(msg.get("content", ""))) > 100:
original = str(msg.get("content", ""))
# Simulate compression: keep first 50 chars
compressed = original[:50] + "...[compressed]"
ccr_hash = self._store_ccr(original)
compressed += f"\n<!-- CCR:hash={ccr_hash} -->"
result.append({**msg, "content": compressed})
self._compress_count += 1
else:
result.append(msg)
return result
async def compress_tool_result(self, tool_name, result):
content = str(result)
if len(content) > 100:
compressed = content[:50] + "...[compressed]"
ccr_hash = self._store_ccr(content)
compressed += f"\n<!-- CCR:hash={ccr_hash} -->"
self._compress_count += 1
return compressed
return content
def is_available(self):
return True
def _store_ccr(self, original):
import hashlib
ccr_hash = hashlib.sha256(original.encode()).hexdigest()
self._ccr_cache[ccr_hash] = original
return ccr_hash
def retrieve(self, ccr_hash=None, query=None):
if ccr_hash and ccr_hash in self._ccr_cache:
return {"content": self._ccr_cache[ccr_hash], "ccr_hash": ccr_hash, "success": True}
return {"error": "Not found", "success": False}
class TestGEOPipelineCompression:
"""GEO Pipeline 压缩集成测试"""
@pytest.mark.asyncio
async def test_pipeline_with_compression_enabled(self):
"""启用压缩后 GEO Pipeline 端到端执行成功"""
gateway = make_mock_gateway()
engine = ReActEngine(gateway, max_steps=5)
compressor = MockHeadroomCompressor()
# Create a mock tool
from agentkit.tools.base import Tool
mock_tool = MagicMock(spec=Tool)
mock_tool.name = "baidu_search"
mock_tool.description = "Search Baidu"
mock_tool.input_schema = {"type": "object", "properties": {"query": {"type": "string"}}}
mock_tool.safe_execute = AsyncMock(return_value={
"results": [{"title": f"Result {i}", "url": f"https://example.com/{i}"} for i in range(20)],
"success": True,
})
result = await engine.execute(
messages=[{"role": "user", "content": "分析GEO优化策略"}],
tools=[mock_tool],
compressor=compressor,
)
assert result.status == "success" or result.output
assert compressor._compress_count > 0
@pytest.mark.asyncio
async def test_tool_outputs_are_compressed(self):
"""工具输出被压缩"""
gateway = make_mock_gateway(tool_name="web_crawl")
engine = ReActEngine(gateway, max_steps=5)
compressor = MockHeadroomCompressor()
from agentkit.tools.base import Tool
mock_tool = MagicMock(spec=Tool)
mock_tool.name = "web_crawl"
mock_tool.description = "Crawl web page"
mock_tool.input_schema = {"type": "object", "properties": {"url": {"type": "string"}}}
mock_tool.safe_execute = AsyncMock(return_value={
"content": "A" * 5000, # Long content that should be compressed
"success": True,
})
result = await engine.execute(
messages=[{"role": "user", "content": "抓取网页"}],
tools=[mock_tool],
compressor=compressor,
)
assert compressor._compress_count > 0
@pytest.mark.asyncio
async def test_ccr_retrieve_works(self):
"""CCR 检索可取回原始数据"""
compressor = MockHeadroomCompressor()
# Simulate storing content
original = "这是一段很长的搜索结果" * 100
compressed = await compressor.compress_tool_result("baidu_search", original)
# Extract CCR hash from compressed content
import re
match = re.search(r'CCR:hash=([a-f0-9]+)', compressed)
assert match, f"No CCR hash found in compressed content: {compressed[:100]}"
ccr_hash = match.group(1)
retrieved = compressor.retrieve(ccr_hash=ccr_hash)
assert retrieved["success"] is True
assert retrieved["content"] == original
@pytest.mark.asyncio
async def test_compression_disabled_pipeline_works(self):
"""compression.enabled=false 时 Pipeline 行为与之前完全一致"""
gateway = make_mock_gateway()
engine = ReActEngine(gateway, max_steps=5)
from agentkit.tools.base import Tool
mock_tool = MagicMock(spec=Tool)
mock_tool.name = "baidu_search"
mock_tool.description = "Search Baidu"
mock_tool.input_schema = {"type": "object", "properties": {"query": {"type": "string"}}}
mock_tool.safe_execute = AsyncMock(return_value={"results": [], "success": True})
# No compressor
result = await engine.execute(
messages=[{"role": "user", "content": "搜索"}],
tools=[mock_tool],
compressor=None,
)
assert result.output # Should still produce output
@pytest.mark.asyncio
async def test_create_compressor_with_geo_config(self):
"""GEO 配置正确创建压缩器"""
# Disabled
assert create_compressor({"enabled": False}) is None
# Summary mode
c = create_compressor({"enabled": True, "provider": "summary", "max_tokens": 2000})
assert isinstance(c, ContextCompressor)
# Headroom mode (falls back since not installed)
c = create_compressor({"enabled": True, "provider": "headroom"})
assert isinstance(c, (ContextCompressor, CompressionStrategy))