"""Comprehensive Capability Backtest — 综合能力回测脚本 覆盖维度: 1. 预处理准确度 (Preprocessing Accuracy) — greeting/tool/skill/complex 路由 2. 召回率 (Recall) — @skill 前缀识别与 fallback 3. 过拟合检测 (Overfitting Detection) — 同意图不同表达的一致性 4. 执行效率 (Execution Efficiency) — DIRECT_CHAT/REACT 路径耗时 5. 工具搜索准确度 (Tool Search Accuracy) — BM25 相关性排序 6. 事件模型完整性 (Event Model Integrity) — SQ/EQ 双队列 7. Spec 管理功能 (Spec Management) — CRUD + 确认流程 8. 验证循环 (Verification Loop) — verify + retry 设计原则: - 不依赖真实 LLM 调用(使用 Mock 或直接测试组件接口) - 可独立运行(不依赖 E2E 服务器、Redis、PostgreSQL) - 标记为 @pytest.mark.e2e_capability - 最后生成综合能力报告(JSON + 中文文本) """ from __future__ import annotations import asyncio import json import time from datetime import datetime, timezone from pathlib import Path from typing import Any import pytest from agentkit.chat.request_preprocessor import RequestPreprocessor from agentkit.core.event_queue import EventQueue, Submission, SubmissionQueue from agentkit.core.protocol import ( Event, SessionEventType, TaskEventType, TurnEventType, ) from agentkit.core.spec_manager import Spec, SpecManager, SpecStep from agentkit.core.verification_loop import VerificationLoop from agentkit.skills.base import Skill, SkillConfig from agentkit.skills.registry import SkillRegistry from agentkit.tools.base import Tool from agentkit.tools.search import ToolSearchIndex # ═══════════════════════════════════════════════════════════════════════════ # 结果收集器(模块级,跨测试类共享) # ═══════════════════════════════════════════════════════════════════════════ class _ResultCollector: """收集所有测试结果,用于生成综合报告。""" def __init__(self) -> None: self.results: dict[str, list[dict[str, Any]]] = {} def record( self, dimension: str, case_id: str, passed: bool, **extra: Any, ) -> None: """记录单条测试结果。""" if dimension not in self.results: self.results[dimension] = [] entry: dict[str, Any] = {"case_id": case_id, "passed": passed} entry.update(extra) self.results[dimension].append(entry) def dimension_score(self, dimension: str) -> float: """计算某维度的得分(百分比)。""" cases = self.results.get(dimension, []) if not cases: return 0.0 passed = sum(1 for c in cases if c["passed"]) return passed / len(cases) * 100 def total_score(self) -> float: """计算总体得分(所有维度的平均通过率)。""" all_cases: list[dict[str, Any]] = [] for cases in self.results.values(): all_cases.extend(cases) if not all_cases: return 0.0 passed = sum(1 for c in all_cases if c["passed"]) return passed / len(all_cases) * 100 def clear(self) -> None: """清空收集器(用于报告测试中重新收集)。""" self.results.clear() _COLLECTOR = _ResultCollector() # ═══════════════════════════════════════════════════════════════════════════ # 测试辅助工具 # ═══════════════════════════════════════════════════════════════════════════ class _FakeTool(Tool): """测试用的 Fake Tool。""" def __init__( self, name: str, description: str, input_schema: dict[str, Any] | None = None, tags: list[str] | None = None, ) -> None: super().__init__( name=name, description=description, input_schema=input_schema, tags=tags or [], ) async def execute(self, **kwargs: Any) -> dict[str, Any]: return {"status": "ok"} def _build_test_tools() -> list[Tool]: """创建一组测试工具(覆盖 io/file/web/shell/testing 场景)。""" return [ _FakeTool( name="read_file", description="Read the contents of a file from the filesystem.", input_schema={ "type": "object", "properties": { "path": {"type": "string", "description": "file path to read"}, }, "required": ["path"], }, tags=["io", "file"], ), _FakeTool( name="write_file", description="Write content to a file on the filesystem.", input_schema={ "type": "object", "properties": { "path": {"type": "string", "description": "file path to write"}, "content": {"type": "string", "description": "content to write"}, }, "required": ["path", "content"], }, tags=["io", "file"], ), _FakeTool( name="web_search", description="Search the web for information using a search engine.", input_schema={ "type": "object", "properties": { "query": {"type": "string", "description": "search query"}, }, "required": ["query"], }, tags=["web", "search"], ), _FakeTool( name="shell_exec", description="Execute a shell command and return the output.", input_schema={ "type": "object", "properties": { "command": { "type": "string", "description": "shell command to execute", }, }, "required": ["command"], }, tags=["shell", "system"], ), _FakeTool( name="run_tests", description="Run project tests to verify code changes.", input_schema={ "type": "object", "properties": { "commands": { "type": "array", "description": "test commands to run", }, }, }, tags=["testing", "verification"], ), ] def _build_mock_skill_registry() -> SkillRegistry: """构建包含测试 Skill 的 SkillRegistry(不依赖真实 LLM)。""" registry = SkillRegistry() tools = _build_test_tools() # react_agent skill — 使用 web_search 工具 react_config = SkillConfig( name="react_agent", agent_type="react_agent", version="1.0.0", description="ReAct agent skill for tool-augmented reasoning", execution_mode="react", prompt={ "identity": "You are a ReAct agent.", "instructions": "Use tools to answer questions step by step.", }, ) registry.register(Skill(react_config, tools=[tools[2]])) # web_search # coder skill — 使用 read_file + write_file 工具 coder_config = SkillConfig( name="coder", agent_type="coder", version="1.0.0", description="Code generation and review skill", execution_mode="react", prompt={ "identity": "You are a coding assistant.", "instructions": "Help with code generation, review, and refactoring.", }, ) registry.register( Skill(coder_config, tools=[tools[0], tools[1]]) # read_file, write_file ) return registry def _build_preprocessor() -> RequestPreprocessor: """构建带 mock skill_registry 的 RequestPreprocessor。""" return RequestPreprocessor( skill_registry=_build_mock_skill_registry(), default_tools=_build_test_tools(), default_system_prompt="You are a helpful assistant.", default_model="test-model", default_agent_name="default", ) # ═══════════════════════════════════════════════════════════════════════════ # 测试数据定义 # ═══════════════════════════════════════════════════════════════════════════ # 1. 预处理准确度测试用例(≥15) PREPROCESSING_CASES: list[dict[str, Any]] = [ # Greeting/Chitchat → DIRECT_CHAT {"id": "greeting_cn", "input": "你好", "expected_mode": "direct_chat"}, {"id": "greeting_en", "input": "hello", "expected_mode": "direct_chat"}, {"id": "greeting_hi", "input": "hi", "expected_mode": "direct_chat"}, {"id": "chitchat_thanks", "input": "谢谢", "expected_mode": "direct_chat"}, {"id": "chitchat_ok", "input": "好的", "expected_mode": "direct_chat"}, {"id": "identity_who", "input": "你是谁", "expected_mode": "direct_chat"}, {"id": "identity_name", "input": "你叫什么", "expected_mode": "direct_chat"}, # Tool-requiring queries → REACT {"id": "tool_ip", "input": "查下ip", "expected_mode": "react"}, {"id": "tool_search", "input": "搜索golang教程", "expected_mode": "react"}, {"id": "tool_shell", "input": "执行ls命令", "expected_mode": "react"}, {"id": "tool_file", "input": "读一下配置文件", "expected_mode": "react"}, {"id": "tool_monitor", "input": "检查服务状态", "expected_mode": "react"}, # Complex queries → REACT {"id": "complex_analysis", "input": "帮我分析一下这个数据并生成报告", "expected_mode": "react"}, {"id": "complex_code", "input": "重构这个函数使其更高效", "expected_mode": "react"}, {"id": "complex_multi", "input": "搜索最新的AI论文并总结关键发现", "expected_mode": "react"}, # @skill prefix → SKILL_REACT { "id": "skill_prefix_react", "input": "@skill:react_agent 查看当前ip", "expected_mode": "skill_react", }, { "id": "skill_prefix_coder", "input": "@skill:coder 写一个函数", "expected_mode": "skill_react", }, ] # 2. 召回率测试用例(≥8) RECALL_CASES: list[dict[str, Any]] = [ { "id": "recall_valid_react", "input": "@skill:react_agent 查看ip", "expected_matched": True, "expected_skill": "react_agent", "expected_mode": "skill_react", }, { "id": "recall_valid_coder", "input": "@skill:coder 写代码", "expected_matched": True, "expected_skill": "coder", "expected_mode": "skill_react", }, { "id": "recall_invalid_skill", "input": "@skill:nonexistent 做点什么", "expected_matched": False, "expected_skill": None, "expected_mode": "react", }, { "id": "recall_no_prefix_react", "input": "查下ip地址", "expected_matched": False, "expected_skill": None, "expected_mode": "react", }, { "id": "recall_no_prefix_greeting", "input": "你好", "expected_matched": False, "expected_skill": None, "expected_mode": "direct_chat", }, { "id": "recall_no_prefix_complex", "input": "分析数据并生成报告", "expected_matched": False, "expected_skill": None, "expected_mode": "react", }, { "id": "recall_skill_only_prefix", "input": "@skill:react_agent", "expected_matched": True, "expected_skill": "react_agent", "expected_mode": "skill_react", }, { "id": "recall_skill_with_long_content", "input": "@skill:coder 请帮我重构这个函数,使其时间复杂度从 O(n²) 降到 O(n)", "expected_matched": True, "expected_skill": "coder", "expected_mode": "skill_react", }, ] # 3. 过拟合检测测试用例(≥5 组,每组原始 + 3 个改写) OVERFITTING_CASES: list[dict[str, Any]] = [ { "id": "overfit_ip_check", "original": "查看当前ip", "paraphrases": ["查下ip", "获取ip地址", "看下ip"], "expected_mode": "react", }, { "id": "overfit_search", "original": "搜索golang教程", "paraphrases": ["搜一下golang教程", "找下golang学习资料", "帮我搜golang入门"], "expected_mode": "react", }, { "id": "overfit_greeting", "original": "你好", "paraphrases": ["hello", "hi", "嗨"], "expected_mode": "direct_chat", }, { "id": "overfit_file_read", "original": "读一下配置文件", "paraphrases": ["看一下配置文件", "帮我读配置", "查看配置文件内容"], "expected_mode": "react", }, { "id": "overfit_identity", "original": "你是谁", "paraphrases": ["你叫什么", "自我介绍", "你是什么"], "expected_mode": "direct_chat", }, ] # 4. 执行效率测试用例(≥5) EFFICIENCY_CASES: list[dict[str, Any]] = [ { "id": "efficiency_greeting", "input": "你好", "expected_mode": "direct_chat", "max_time_ms": 2000, }, { "id": "efficiency_chitchat", "input": "谢谢", "expected_mode": "direct_chat", "max_time_ms": 2000, }, { "id": "efficiency_identity", "input": "你是谁", "expected_mode": "direct_chat", "max_time_ms": 2000, }, { "id": "efficiency_react_tool", "input": "查下ip", "expected_mode": "react", "max_time_ms": 5000, }, { "id": "efficiency_react_complex", "input": "帮我分析一下这个数据并生成报告", "expected_mode": "react", "max_time_ms": 5000, }, ] # 5. 工具搜索准确度测试用例(≥8) TOOL_SEARCH_CASES: list[dict[str, Any]] = [ { "id": "tool_search_read", "query": "read file", "expected_top1": "read_file", }, { "id": "tool_search_write", "query": "write file", "expected_top1": "write_file", }, { "id": "tool_search_web", "query": "web search", "expected_top1": "web_search", }, { "id": "tool_search_shell", "query": "shell command execute", "expected_top1": "shell_exec", }, { "id": "tool_search_tests", "query": "run tests verify", "expected_top1": "run_tests", }, { "id": "tool_search_file_multiple", "query": "file", "expected_contains": ["read_file", "write_file"], }, { "id": "tool_search_no_match", "query": "xyzzy_nonexistent_xyz", "expected_empty": True, }, { "id": "tool_search_empty_query", "query": "", "expected_empty": True, }, ] # ═══════════════════════════════════════════════════════════════════════════ # 1. 预处理准确度 # ═══════════════════════════════════════════════════════════════════════════ @pytest.mark.e2e_capability class TestPreprocessingAccuracy: """预处理准确度测试:验证 RequestPreprocessor 的路由决策。 覆盖场景: - Greeting/Chitchat → DIRECT_CHAT(零成本快速路径) - Tool-requiring queries → REACT(LLM 决定工具使用) - @skill prefix → SKILL_REACT(显式技能选择) - Complex queries → REACT(默认 agent 循环) """ @pytest.mark.parametrize( "case", PREPROCESSING_CASES, ids=[c["id"] for c in PREPROCESSING_CASES], ) def test_preprocessing_routing(self, case: dict[str, Any]) -> None: """验证每个输入被路由到正确的执行模式。""" preprocessor = _build_preprocessor() result = asyncio.run(preprocessor.preprocess(content=case["input"])) actual_mode = result.execution_mode.value expected_mode = case["expected_mode"] passed = actual_mode == expected_mode _COLLECTOR.record( dimension="preprocessing_accuracy", case_id=case["id"], passed=passed, input=case["input"], expected=expected_mode, actual=actual_mode, match_method=result.match_method, ) assert actual_mode == expected_mode, ( f"'{case['input']}': expected {expected_mode}, got {actual_mode} " f"(method={result.match_method})" ) # ═══════════════════════════════════════════════════════════════════════════ # 2. 召回率 — 技能匹配 # ═══════════════════════════════════════════════════════════════════════════ @pytest.mark.e2e_capability class TestSkillRecall: """技能召回率测试:验证 @skill 前缀识别和 fallback 机制。 覆盖场景: - 有效 @skill 前缀 → 正确匹配技能 - 无效 @skill 前缀 → fallback 到 REACT - 无前缀 → 默认 REACT 或 DIRECT_CHAT """ @pytest.mark.parametrize( "case", RECALL_CASES, ids=[c["id"] for c in RECALL_CASES], ) def test_skill_recall(self, case: dict[str, Any]) -> None: """验证 @skill 前缀的召回和 fallback 行为。""" preprocessor = _build_preprocessor() result = asyncio.run(preprocessor.preprocess(content=case["input"])) actual_matched = result.matched actual_skill = result.skill_name actual_mode = result.execution_mode.value expected_matched = case["expected_matched"] expected_skill = case["expected_skill"] expected_mode = case["expected_mode"] passed = ( actual_matched == expected_matched and actual_skill == expected_skill and actual_mode == expected_mode ) _COLLECTOR.record( dimension="skill_recall", case_id=case["id"], passed=passed, input=case["input"], expected_matched=expected_matched, actual_matched=actual_matched, expected_skill=expected_skill, actual_skill=actual_skill, expected_mode=expected_mode, actual_mode=actual_mode, ) assert actual_matched == expected_matched, ( f"'{case['input']}': matched expected {expected_matched}, got {actual_matched}" ) assert actual_skill == expected_skill, ( f"'{case['input']}': skill expected {expected_skill}, got {actual_skill}" ) assert actual_mode == expected_mode, ( f"'{case['input']}': mode expected {expected_mode}, got {actual_mode}" ) # ═══════════════════════════════════════════════════════════════════════════ # 3. 过拟合检测 # ═══════════════════════════════════════════════════════════════════════════ @pytest.mark.e2e_capability class TestOverfittingDetection: """过拟合检测:验证同一意图的不同表达方式产生一致的执行模式。 核心思路:如果路由器对 "查看当前ip" 和 "查下ip" 给出不同的执行模式, 说明路由器对特定表达过拟合,泛化能力不足。 """ @pytest.mark.parametrize( "case", OVERFITTING_CASES, ids=[c["id"] for c in OVERFITTING_CASES], ) def test_paraphrase_consistency(self, case: dict[str, Any]) -> None: """验证原始输入和改写输入产生相同的执行模式。""" preprocessor = _build_preprocessor() expected_mode = case["expected_mode"] # 测试原始输入 original_result = asyncio.run(preprocessor.preprocess(content=case["original"])) original_mode = original_result.execution_mode.value # 测试所有改写 paraphrase_modes: list[str] = [] for para in case["paraphrases"]: result = asyncio.run(preprocessor.preprocess(content=para)) paraphrase_modes.append(result.execution_mode.value) all_modes = [original_mode] + paraphrase_modes all_consistent = all(m == expected_mode for m in all_modes) _COLLECTOR.record( dimension="overfitting_detection", case_id=case["id"], passed=all_consistent, original=case["original"], original_mode=original_mode, paraphrases=case["paraphrases"], paraphrase_modes=paraphrase_modes, expected_mode=expected_mode, ) assert all_consistent, ( f"Overfitting detected for '{case['id']}': " f"original='{case['original']}' → {original_mode}, " f"paraphrases={case['paraphrases']} → {paraphrase_modes}, " f"expected={expected_mode}" ) # ═══════════════════════════════════════════════════════════════════════════ # 4. 执行效率 # ═══════════════════════════════════════════════════════════════════════════ @pytest.mark.e2e_capability class TestExecutionEfficiency: """执行效率测试:测量预处理阶段的响应时间。 约束: - DIRECT_CHAT 路径:< 2s(零成本快速路径) - REACT 路径启动:< 5s(预处理阶段,不含 LLM 调用) """ @pytest.mark.parametrize( "case", EFFICIENCY_CASES, ids=[c["id"] for c in EFFICIENCY_CASES], ) def test_preprocessing_latency(self, case: dict[str, Any]) -> None: """验证预处理阶段耗时在阈值范围内。""" preprocessor = _build_preprocessor() start = time.perf_counter() result = asyncio.run(preprocessor.preprocess(content=case["input"])) elapsed_ms = (time.perf_counter() - start) * 1000 actual_mode = result.execution_mode.value max_time_ms = case["max_time_ms"] passed = elapsed_ms < max_time_ms and actual_mode == case["expected_mode"] _COLLECTOR.record( dimension="execution_efficiency", case_id=case["id"], passed=passed, input=case["input"], elapsed_ms=round(elapsed_ms, 2), max_time_ms=max_time_ms, actual_mode=actual_mode, expected_mode=case["expected_mode"], ) assert elapsed_ms < max_time_ms, ( f"'{case['input']}': elapsed {elapsed_ms:.1f}ms > limit {max_time_ms}ms" ) assert actual_mode == case["expected_mode"], ( f"'{case['input']}': mode expected {case['expected_mode']}, got {actual_mode}" ) # ═══════════════════════════════════════════════════════════════════════════ # 5. 工具搜索准确度 # ═══════════════════════════════════════════════════════════════════════════ @pytest.mark.e2e_capability class TestToolSearchAccuracy: """工具搜索准确度测试:验证 BM25 搜索的相关性排序。 覆盖场景: - 精确匹配工具名/描述 - 模糊匹配关键词 - 无匹配返回空 - 空查询返回空 """ @pytest.mark.parametrize( "case", TOOL_SEARCH_CASES, ids=[c["id"] for c in TOOL_SEARCH_CASES], ) def test_tool_search(self, case: dict[str, Any]) -> None: """验证工具搜索返回正确的结果。""" index = ToolSearchIndex(_build_test_tools()) results = index.search(case["query"], top_k=5) result_names = [r.name for r in results] passed = False detail: dict[str, Any] = {"query": case["query"], "results": result_names} if case.get("expected_empty"): passed = len(results) == 0 detail["expected_empty"] = True elif "expected_top1" in case: passed = len(results) > 0 and results[0].name == case["expected_top1"] detail["expected_top1"] = case["expected_top1"] elif "expected_contains" in case: expected = case["expected_contains"] passed = all(name in result_names for name in expected) detail["expected_contains"] = expected _COLLECTOR.record( dimension="tool_search_accuracy", case_id=case["id"], passed=passed, **detail, ) if case.get("expected_empty"): assert len(results) == 0, f"Query '{case['query']}': expected empty, got {result_names}" elif "expected_top1" in case: assert len(results) > 0, f"Query '{case['query']}': no results" assert results[0].name == case["expected_top1"], ( f"Query '{case['query']}': expected top1={case['expected_top1']}, " f"got {results[0].name}" ) elif "expected_contains" in case: for name in case["expected_contains"]: assert name in result_names, ( f"Query '{case['query']}': expected '{name}' in results, got {result_names}" ) # ═══════════════════════════════════════════════════════════════════════════ # 6. 事件模型完整性 # ═══════════════════════════════════════════════════════════════════════════ @pytest.mark.e2e_capability class TestEventModelIntegrity: """事件模型完整性测试:验证 SQ/EQ 双队列的基本功能。 覆盖场景: - SQ 提交和消费 - SQ 取消任务 - EQ 事件推送和订阅 - EQ 多订阅者广播 - EQ 缓冲回放 - 事件类型分类 """ async def test_sq_submit_and_drain(self) -> None: """SQ 正确接收用户输入并按顺序消费。""" sq = SubmissionQueue() task_id = await sq.submit("hello", "session-1") received: list[Submission] = [] async def consumer() -> None: async for sub in sq.drain(): received.append(sub) if len(received) >= 1: break consumer_task = asyncio.create_task(consumer()) await asyncio.wait_for(consumer_task, timeout=1.0) passed = len(received) == 1 and received[0].content == "hello" _COLLECTOR.record( dimension="event_model_integrity", case_id="sq_submit_and_drain", passed=passed, ) assert passed assert received[0].task_id == task_id async def test_sq_cancel_task(self) -> None: """SQ 取消任务后 drain 跳过该提交。""" sq = SubmissionQueue() task_id_1 = await sq.submit("first", "session-1") await sq.submit("second", "session-1") await sq.cancel(task_id_1) received: list[str] = [] async def consumer() -> None: async for sub in sq.drain(): received.append(sub.content) if len(received) >= 1: break consumer_task = asyncio.create_task(consumer()) await asyncio.wait_for(consumer_task, timeout=1.0) passed = received == ["second"] _COLLECTOR.record( dimension="event_model_integrity", case_id="sq_cancel_task", passed=passed, ) assert passed async def test_eq_emit_and_subscribe(self) -> None: """EQ 正确推送事件给订阅者。""" eq = EventQueue() event = Event.create( event_type=TurnEventType.TOKEN, task_id="task-1", session_id="session-1", data={"text": "hello"}, ) received: list[Event] = [] async def subscriber() -> None: async for evt in eq.subscribe(): received.append(evt) break sub_task = asyncio.create_task(subscriber()) await asyncio.sleep(0.05) await eq.emit(event) await asyncio.wait_for(sub_task, timeout=1.0) passed = ( len(received) == 1 and received[0].event_type == TurnEventType.TOKEN and received[0].data == {"text": "hello"} ) _COLLECTOR.record( dimension="event_model_integrity", case_id="eq_emit_and_subscribe", passed=passed, ) assert passed async def test_eq_broadcast_to_multiple_subscribers(self) -> None: """EQ 多订阅者同时接收事件(广播)。""" eq = EventQueue() received_a: list[Event] = [] received_b: list[Event] = [] async def subscriber_a() -> None: async for evt in eq.subscribe(): received_a.append(evt) if len(received_a) >= 2: break async def subscriber_b() -> None: async for evt in eq.subscribe(): received_b.append(evt) if len(received_b) >= 2: break task_a = asyncio.create_task(subscriber_a()) task_b = asyncio.create_task(subscriber_b()) await asyncio.sleep(0.05) await eq.emit(Event.create(TurnEventType.TOKEN, "t1", "s1", {"seq": 1})) await eq.emit(Event.create(TurnEventType.TOKEN, "t1", "s1", {"seq": 2})) await asyncio.wait_for(task_a, timeout=1.0) await asyncio.wait_for(task_b, timeout=1.0) passed = len(received_a) == 2 and len(received_b) == 2 _COLLECTOR.record( dimension="event_model_integrity", case_id="eq_broadcast", passed=passed, ) assert passed async def test_eq_buffer_replay(self) -> None: """EQ 事件缓冲对新订阅者的回放。""" eq = EventQueue(buffer_size=100) await eq.emit(Event.create(TurnEventType.TOKEN, "t1", "s1", {"seq": 1})) await eq.emit(Event.create(TurnEventType.TOKEN, "t1", "s1", {"seq": 2})) received: list[Event] = [] async def subscriber() -> None: async for evt in eq.subscribe(): received.append(evt) if len(received) >= 2: break sub_task = asyncio.create_task(subscriber()) await asyncio.wait_for(sub_task, timeout=1.0) passed = ( len(received) == 2 and received[0].data == {"seq": 1} and received[1].data == {"seq": 2} ) _COLLECTOR.record( dimension="event_model_integrity", case_id="eq_buffer_replay", passed=passed, ) assert passed def test_event_type_classification(self) -> None: """事件类型按前缀正确分类(session/task/turn)。""" session_events = [ SessionEventType.SESSION_STARTED, SessionEventType.SESSION_ENDED, ] task_events = [ TaskEventType.TASK_CREATED, TaskEventType.TASK_STARTED, TaskEventType.TASK_COMPLETED, TaskEventType.TASK_FAILED, ] turn_events = [ TurnEventType.TURN_STARTED, TurnEventType.THINKING, TurnEventType.TOOL_CALL, TurnEventType.TOKEN, TurnEventType.FINAL_ANSWER, ] all_correct = ( all(e.startswith("session.") for e in session_events) and all(e.startswith("task.") for e in task_events) and all(e.startswith("turn.") for e in turn_events) ) _COLLECTOR.record( dimension="event_model_integrity", case_id="event_type_classification", passed=all_correct, ) assert all_correct # ═══════════════════════════════════════════════════════════════════════════ # 7. Spec 管理功能 # ═══════════════════════════════════════════════════════════════════════════ @pytest.mark.e2e_capability class TestSpecManagement: """Spec 管理功能测试:验证 Spec 的创建/读取/更新/确认流程。 覆盖场景: - 创建 Spec 并持久化到 YAML - 读取 Spec(缓存 + 磁盘) - 更新 Spec 字段 - 确认 Spec(状态 + 时间戳 + 步骤) - 删除 Spec """ def test_spec_create_and_get(self, tmp_path: Path) -> None: """创建 Spec 并读取,验证字段完整。""" mgr = SpecManager(specs_dir=str(tmp_path / "specs")) spec = Spec( spec_id="test-spec-1", goal="Test goal", steps=[ SpecStep(step_id="s1", name="Step 1", description="First"), SpecStep( step_id="s2", name="Step 2", description="Second", dependencies=["s1"], ), ], ) path = mgr.create(spec) loaded = mgr.get("test-spec-1") passed = ( path.exists() and loaded is not None and loaded.spec_id == "test-spec-1" and loaded.goal == "Test goal" and len(loaded.steps) == 2 and loaded.steps[1].dependencies == ["s1"] ) _COLLECTOR.record( dimension="spec_management", case_id="spec_create_and_get", passed=passed, ) assert passed def test_spec_update(self, tmp_path: Path) -> None: """更新 Spec 字段并持久化。""" mgr = SpecManager(specs_dir=str(tmp_path / "specs")) spec = Spec(spec_id="test-spec-2", goal="Original") mgr.create(spec) updated = mgr.update("test-spec-2", goal="Updated goal") reloaded = mgr.get("test-spec-2") passed = ( updated is not None and updated.goal == "Updated goal" and reloaded is not None and reloaded.goal == "Updated goal" ) _COLLECTOR.record( dimension="spec_management", case_id="spec_update", passed=passed, ) assert passed def test_spec_confirm(self, tmp_path: Path) -> None: """确认 Spec 后状态和步骤状态正确变更。""" mgr = SpecManager(specs_dir=str(tmp_path / "specs")) spec = Spec( spec_id="test-spec-3", goal="Confirm test", steps=[ SpecStep(step_id="s1", name="Step 1", description="First"), ], ) mgr.create(spec) confirmed = mgr.confirm("test-spec-3") passed = ( confirmed is not None and confirmed.status == "confirmed" and confirmed.confirmed_at is not None and all(s.status == "confirmed" for s in confirmed.steps) ) _COLLECTOR.record( dimension="spec_management", case_id="spec_confirm", passed=passed, ) assert passed def test_spec_list_and_filter(self, tmp_path: Path) -> None: """列出 Spec 并按状态过滤。""" mgr = SpecManager(specs_dir=str(tmp_path / "specs")) mgr.create(Spec(spec_id="draft-1", goal="Draft")) mgr.create(Spec(spec_id="confirmed-1", goal="Confirmed")) mgr.confirm("confirmed-1") all_specs = mgr.list_specs() draft_specs = mgr.list_specs(status="draft") confirmed_specs = mgr.list_specs(status="confirmed") passed = ( len(all_specs) == 2 and len(draft_specs) == 1 and len(confirmed_specs) == 1 and confirmed_specs[0].spec_id == "confirmed-1" ) _COLLECTOR.record( dimension="spec_management", case_id="spec_list_and_filter", passed=passed, ) assert passed def test_spec_delete(self, tmp_path: Path) -> None: """删除 Spec 后文件和缓存均移除。""" mgr = SpecManager(specs_dir=str(tmp_path / "specs")) mgr.create(Spec(spec_id="delete-me", goal="To be deleted")) result = mgr.delete("delete-me") loaded = mgr.get("delete-me") passed = result is True and loaded is None _COLLECTOR.record( dimension="spec_management", case_id="spec_delete", passed=passed, ) assert passed # ═══════════════════════════════════════════════════════════════════════════ # 8. 验证循环 # ═══════════════════════════════════════════════════════════════════════════ @pytest.mark.e2e_capability class TestVerificationLoop: """验证循环测试:验证 VerificationLoop 的 verify 和 retry 机制。 覆盖场景: - 成功命令返回 passed=True - 失败命令返回 passed=False - 超时命令返回 passed=False - 重试机制(无 fix_callback / 有 fix_callback) """ async def test_verify_success(self) -> None: """成功命令返回 passed=True。""" loop = VerificationLoop(commands=["echo ok"], timeout=10.0) result = await loop.verify() passed = result.passed is True and "ok" in result.test_output _COLLECTOR.record( dimension="verification_loop", case_id="verify_success", passed=passed, ) assert passed async def test_verify_failure(self) -> None: """失败命令返回 passed=False。""" loop = VerificationLoop(commands=["false"], timeout=10.0) result = await loop.verify() passed = result.passed is False and len(result.errors) > 0 _COLLECTOR.record( dimension="verification_loop", case_id="verify_failure", passed=passed, ) assert passed async def test_verify_timeout(self) -> None: """超时命令返回 passed=False。""" loop = VerificationLoop(commands=["sleep 10"], timeout=0.5) result = await loop.verify() passed = result.passed is False and any("timed out" in e for e in result.errors) _COLLECTOR.record( dimension="verification_loop", case_id="verify_timeout", passed=passed, ) assert passed async def test_verify_and_retry_no_callback(self) -> None: """无 fix_callback 时重试指定次数。""" loop = VerificationLoop(commands=["false"], max_retries=2, timeout=5.0) result = await loop.verify_and_retry() passed = result.passed is False and result.attempts == 3 _COLLECTOR.record( dimension="verification_loop", case_id="verify_and_retry_no_callback", passed=passed, ) assert passed async def test_verify_and_retry_with_callback(self) -> None: """fix_callback 被调用并接收 errors 和 test_output。""" call_count = 0 async def fix_cb(errors: list[str], test_output: str) -> None: nonlocal call_count call_count += 1 loop = VerificationLoop(commands=["false"], max_retries=1, timeout=5.0) result = await loop.verify_and_retry(fix_callback=fix_cb) passed = result.passed is False and call_count == 1 _COLLECTOR.record( dimension="verification_loop", case_id="verify_and_retry_with_callback", passed=passed, ) assert passed # ═══════════════════════════════════════════════════════════════════════════ # 9. 综合报告生成 # ═══════════════════════════════════════════════════════════════════════════ def _run_all_checks_for_report() -> _ResultCollector: """运行所有维度的检查,返回填充好的收集器(用于报告生成)。 这确保报告测试自包含,不依赖其他测试的执行顺序。 """ collector = _ResultCollector() preprocessor = _build_preprocessor() tools = _build_test_tools() search_index = ToolSearchIndex(tools) # --- 1. 预处理准确度 --- for case in PREPROCESSING_CASES: result = asyncio.run(preprocessor.preprocess(content=case["input"])) actual = result.execution_mode.value passed = actual == case["expected_mode"] collector.record( dimension="preprocessing_accuracy", case_id=case["id"], passed=passed, input=case["input"], expected=case["expected_mode"], actual=actual, ) # --- 2. 召回率 --- for case in RECALL_CASES: result = asyncio.run(preprocessor.preprocess(content=case["input"])) passed = ( result.matched == case["expected_matched"] and result.skill_name == case["expected_skill"] and result.execution_mode.value == case["expected_mode"] ) collector.record( dimension="skill_recall", case_id=case["id"], passed=passed, ) # --- 3. 过拟合检测 --- for case in OVERFITTING_CASES: original_result = asyncio.run(preprocessor.preprocess(content=case["original"])) modes = [original_result.execution_mode.value] for para in case["paraphrases"]: r = asyncio.run(preprocessor.preprocess(content=para)) modes.append(r.execution_mode.value) passed = all(m == case["expected_mode"] for m in modes) collector.record( dimension="overfitting_detection", case_id=case["id"], passed=passed, ) # --- 4. 执行效率 --- for case in EFFICIENCY_CASES: start = time.perf_counter() result = asyncio.run(preprocessor.preprocess(content=case["input"])) elapsed_ms = (time.perf_counter() - start) * 1000 passed = ( elapsed_ms < case["max_time_ms"] and result.execution_mode.value == case["expected_mode"] ) collector.record( dimension="execution_efficiency", case_id=case["id"], passed=passed, elapsed_ms=round(elapsed_ms, 2), ) # --- 5. 工具搜索准确度 --- for case in TOOL_SEARCH_CASES: results = search_index.search(case["query"], top_k=5) names = [r.name for r in results] if case.get("expected_empty"): passed = len(results) == 0 elif "expected_top1" in case: passed = len(results) > 0 and results[0].name == case["expected_top1"] elif "expected_contains" in case: passed = all(n in names for n in case["expected_contains"]) else: passed = False collector.record( dimension="tool_search_accuracy", case_id=case["id"], passed=passed, ) # --- 6. 事件模型完整性 --- async def _run_event_checks() -> None: # SQ submit + drain sq = SubmissionQueue() await sq.submit("test", "s1") sq_received: list[Submission] = [] async def sq_consumer() -> None: async for sub in sq.drain(): sq_received.append(sub) break sq_task = asyncio.create_task(sq_consumer()) await asyncio.wait_for(sq_task, timeout=1.0) collector.record( dimension="event_model_integrity", case_id="sq_submit_and_drain", passed=len(sq_received) == 1, ) # EQ emit + subscribe eq = EventQueue() event = Event.create(TurnEventType.TOKEN, "t1", "s1", {"text": "hi"}) eq_received: list[Event] = [] async def eq_sub() -> None: async for evt in eq.subscribe(): eq_received.append(evt) break eq_task = asyncio.create_task(eq_sub()) await asyncio.sleep(0.05) await eq.emit(event) await asyncio.wait_for(eq_task, timeout=1.0) collector.record( dimension="event_model_integrity", case_id="eq_emit_and_subscribe", passed=len(eq_received) == 1, ) asyncio.run(_run_event_checks()) # 事件类型分类 type_ok = ( SessionEventType.SESSION_STARTED.startswith("session.") and TaskEventType.TASK_STARTED.startswith("task.") and TurnEventType.TOKEN.startswith("turn.") ) collector.record( dimension="event_model_integrity", case_id="event_type_classification", passed=type_ok, ) # --- 7. Spec 管理 --- import tempfile with tempfile.TemporaryDirectory() as tmpdir: mgr = SpecManager(specs_dir=tmpdir) spec = Spec( spec_id="report-spec", goal="Report test", steps=[SpecStep(step_id="s1", name="S1", description="Step 1")], ) mgr.create(spec) loaded = mgr.get("report-spec") collector.record( dimension="spec_management", case_id="spec_create_and_get", passed=loaded is not None and loaded.goal == "Report test", ) confirmed = mgr.confirm("report-spec") collector.record( dimension="spec_management", case_id="spec_confirm", passed=confirmed is not None and confirmed.status == "confirmed", ) # --- 8. 验证循环 --- async def _run_verification_checks() -> None: loop_ok = VerificationLoop(commands=["echo ok"], timeout=10.0) result_ok = await loop_ok.verify() collector.record( dimension="verification_loop", case_id="verify_success", passed=result_ok.passed is True, ) loop_fail = VerificationLoop(commands=["false"], timeout=5.0) result_fail = await loop_fail.verify() collector.record( dimension="verification_loop", case_id="verify_failure", passed=result_fail.passed is False, ) asyncio.run(_run_verification_checks()) return collector def _generate_json_report(collector: _ResultCollector) -> dict[str, Any]: """生成 JSON 格式的综合报告。""" dimensions = [ "preprocessing_accuracy", "skill_recall", "overfitting_detection", "execution_efficiency", "tool_search_accuracy", "event_model_integrity", "spec_management", "verification_loop", ] dimension_scores: dict[str, float] = {} dimension_details: dict[str, Any] = {} for dim in dimensions: score = collector.dimension_score(dim) dimension_scores[dim] = round(score, 1) dimension_details[dim] = { "total": len(collector.results.get(dim, [])), "passed": sum(1 for c in collector.results.get(dim, []) if c["passed"]), "score": round(score, 1), "cases": collector.results.get(dim, []), } total_score = collector.total_score() # 改进建议 suggestions: list[str] = [] for dim, score in dimension_scores.items(): if score < 100: suggestions.append(f"[{dim}] 得分 {score:.1f}%,存在失败用例,需检查相关组件") if not suggestions: suggestions.append("所有维度均达到 100%,架构状态良好") return { "report_type": "comprehensive_capability_backtest", "generated_at": datetime.now(timezone.utc).isoformat(), "total_score": round(total_score, 1), "total_cases": sum(len(cases) for cases in collector.results.values()), "total_passed": sum( 1 for cases in collector.results.values() for c in cases if c["passed"] ), "dimension_scores": dimension_scores, "dimension_details": dimension_details, "suggestions": suggestions, } def _generate_text_report(json_report: dict[str, Any]) -> str: """生成中文文本格式的综合报告。""" lines: list[str] = [] sep = "=" * 70 lines.append(sep) lines.append("Fischer AgentKit 综合能力回测报告") lines.append(sep) lines.append(f"生成时间: {json_report['generated_at']}") lines.append(f"总体评分: {json_report['total_score']:.1f}%") lines.append( f"用例总数: {json_report['total_cases']} " f"通过: {json_report['total_passed']} " f"失败: {json_report['total_cases'] - json_report['total_passed']}" ) lines.append("") # 各维度得分 lines.append("-" * 70) lines.append("各维度得分") lines.append("-" * 70) dim_names: dict[str, str] = { "preprocessing_accuracy": "预处理准确度", "skill_recall": "技能召回率", "overfitting_detection": "过拟合检测", "execution_efficiency": "执行效率", "tool_search_accuracy": "工具搜索准确度", "event_model_integrity": "事件模型完整性", "spec_management": "Spec 管理功能", "verification_loop": "验证循环", } for dim, score in json_report["dimension_scores"].items(): name = dim_names.get(dim, dim) detail = json_report["dimension_details"][dim] status = "OK" if score == 100 else "FAIL" lines.append(f" {status} {name}: {score:.1f}% ({detail['passed']}/{detail['total']})") lines.append("") # 详细用例结果 lines.append("-" * 70) lines.append("详细用例结果") lines.append("-" * 70) for dim, details in json_report["dimension_details"].items(): name = dim_names.get(dim, dim) lines.append(f"\n[{name}]") for case in details["cases"]: status = "OK" if case["passed"] else "FAIL" lines.append(f" {status} {case['case_id']}") lines.append("") # 改进建议 lines.append("-" * 70) lines.append("改进建议") lines.append("-" * 70) for suggestion in json_report["suggestions"]: lines.append(f" • {suggestion}") lines.append("") lines.append(sep) return "\n".join(lines) @pytest.mark.e2e_capability class TestComprehensiveReport: """综合报告生成测试:在所有测试完成后生成综合能力报告。 输出: - JSON 报告: test-results/e2e/comprehensive_report.json - 文本报告: test-results/e2e/comprehensive_report.txt """ def test_generate_comprehensive_report(self, tmp_path: Path) -> None: """运行所有维度的检查并生成综合报告。""" # 自包含运行所有检查(不依赖其他测试的执行顺序) collector = _run_all_checks_for_report() # 合并已有收集器结果(如果其他测试已运行) for dim, cases in _COLLECTOR.results.items(): if dim not in collector.results: collector.results[dim] = cases # 生成报告 json_report = _generate_json_report(collector) text_report = _generate_text_report(json_report) # 确保输出目录存在 output_dir = Path("test-results/e2e") output_dir.mkdir(parents=True, exist_ok=True) # 保存 JSON 报告 json_path = output_dir / "comprehensive_report.json" json_path.write_text( json.dumps(json_report, ensure_ascii=False, indent=2), encoding="utf-8", ) # 保存文本报告 text_path = output_dir / "comprehensive_report.txt" text_path.write_text(text_report, encoding="utf-8") # 打印报告到控制台 print(f"\n{text_report}") print(f"\nJSON 报告: {json_path}") print(f"文本报告: {text_path}") # 验证报告文件已生成 assert json_path.exists(), "JSON report file not generated" assert text_path.exists(), "Text report file not generated" # 验证报告内容完整 assert json_report["total_cases"] > 0, "No test cases in report" assert len(json_report["dimension_scores"]) == 8, "Expected 8 dimensions in report" # 验证总体通过率不低于阈值(允许部分用例失败,但总体应 > 80%) total_score = json_report["total_score"] print(f"\n总体评分: {total_score:.1f}%") assert total_score >= 80.0, f"Total score {total_score:.1f}% is below 80% threshold" # ═══════════════════════════════════════════════════════════════════════════ # 10. 标准 Benchmark 框架集成 # ═══════════════════════════════════════════════════════════════════════════ @pytest.mark.e2e_capability class TestStandardBenchmarkIntegration: """测试标准 Benchmark 框架集成。""" def test_benchmark_task_creation(self) -> None: """测试 BenchmarkTask 可以正确创建。""" from agentkit.cli.benchmark import BenchmarkTask task = BenchmarkTask( task_id="test-001", dimension="preprocessing", category="greeting", difficulty="easy", input="你好", expected="direct_chat", tags=["regex", "chinese"], description="测试用例", paraphrases=[], ) assert task.task_id == "test-001" assert task.dimension == "preprocessing" def test_metric_set_prf(self) -> None: """测试 MetricSet P/R/F1 计算。""" from agentkit.cli.benchmark import MetricSet m = MetricSet( accuracy=0.9, precision=0.95, recall=0.85, f1=0.90, latency_p50_ms=1.0, latency_p95_ms=2.0, latency_p99_ms=3.0, consistency=1.0, total=100, passed=90, failed=10, ) assert m.f1 == 0.90 assert m.precision == 0.95 def test_benchmark_runs_successfully(self) -> None: """测试 benchmark 函数可以成功运行(fast 模式)。""" from agentkit.cli.benchmark import BenchmarkDimension, benchmark # 使用 fast 模式,不生成报告,不输出到终端 # 只验证不抛异常 try: benchmark( dimension=BenchmarkDimension.ALL, report=False, fast=True, verbose=False, runs=1, output_dir="test-results/benchmark", format="json", ) except SystemExit: pass # benchmark 可能通过 typer.Exit 退出 def test_report_generation(self, tmp_path: Path) -> None: """测试报告文件可以正确生成。""" import os from agentkit.cli.benchmark import BenchmarkDimension, benchmark out_dir = str(tmp_path / "benchmark") try: benchmark( dimension=BenchmarkDimension.ALL, report=True, fast=True, verbose=False, runs=1, output_dir=out_dir, format="markdown", ) except SystemExit: pass # 验证报告文件生成 json_path = os.path.join(out_dir, "benchmark_report.json") md_path = os.path.join(out_dir, "benchmark_report.md") assert os.path.exists(json_path), f"JSON report not found: {json_path}" assert os.path.exists(md_path), f"Markdown report not found: {md_path}"