1520 lines
54 KiB
Python
1520 lines
54 KiB
Python
"""Comprehensive Capability Backtest — 综合能力回测脚本
|
||
|
||
覆盖维度:
|
||
1. 预处理准确度 (Preprocessing Accuracy) — greeting/tool/skill/complex 路由
|
||
2. 召回率 (Recall) — @skill 前缀识别与 fallback
|
||
3. 过拟合检测 (Overfitting Detection) — 同意图不同表达的一致性
|
||
4. 执行效率 (Execution Efficiency) — DIRECT_CHAT/REACT 路径耗时
|
||
5. 工具搜索准确度 (Tool Search Accuracy) — BM25 相关性排序
|
||
6. 事件模型完整性 (Event Model Integrity) — SQ/EQ 双队列
|
||
7. Spec 管理功能 (Spec Management) — CRUD + 确认流程
|
||
8. 验证循环 (Verification Loop) — verify + retry
|
||
|
||
设计原则:
|
||
- 不依赖真实 LLM 调用(使用 Mock 或直接测试组件接口)
|
||
- 可独立运行(不依赖 E2E 服务器、Redis、PostgreSQL)
|
||
- 标记为 @pytest.mark.e2e_capability
|
||
- 最后生成综合能力报告(JSON + 中文文本)
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import time
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
import pytest
|
||
|
||
from agentkit.chat.request_preprocessor import RequestPreprocessor
|
||
from agentkit.core.event_queue import EventQueue, Submission, SubmissionQueue
|
||
from agentkit.core.protocol import (
|
||
Event,
|
||
SessionEventType,
|
||
TaskEventType,
|
||
TurnEventType,
|
||
)
|
||
from agentkit.core.spec_manager import Spec, SpecManager, SpecStep
|
||
from agentkit.core.verification_loop import VerificationLoop
|
||
from agentkit.skills.base import Skill, SkillConfig
|
||
from agentkit.skills.registry import SkillRegistry
|
||
from agentkit.tools.base import Tool
|
||
from agentkit.tools.search import ToolSearchIndex
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# 结果收集器(模块级,跨测试类共享)
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
|
||
class _ResultCollector:
|
||
"""收集所有测试结果,用于生成综合报告。"""
|
||
|
||
def __init__(self) -> None:
|
||
self.results: dict[str, list[dict[str, Any]]] = {}
|
||
|
||
def record(
|
||
self,
|
||
dimension: str,
|
||
case_id: str,
|
||
passed: bool,
|
||
**extra: Any,
|
||
) -> None:
|
||
"""记录单条测试结果。"""
|
||
if dimension not in self.results:
|
||
self.results[dimension] = []
|
||
entry: dict[str, Any] = {"case_id": case_id, "passed": passed}
|
||
entry.update(extra)
|
||
self.results[dimension].append(entry)
|
||
|
||
def dimension_score(self, dimension: str) -> float:
|
||
"""计算某维度的得分(百分比)。"""
|
||
cases = self.results.get(dimension, [])
|
||
if not cases:
|
||
return 0.0
|
||
passed = sum(1 for c in cases if c["passed"])
|
||
return passed / len(cases) * 100
|
||
|
||
def total_score(self) -> float:
|
||
"""计算总体得分(所有维度的平均通过率)。"""
|
||
all_cases: list[dict[str, Any]] = []
|
||
for cases in self.results.values():
|
||
all_cases.extend(cases)
|
||
if not all_cases:
|
||
return 0.0
|
||
passed = sum(1 for c in all_cases if c["passed"])
|
||
return passed / len(all_cases) * 100
|
||
|
||
def clear(self) -> None:
|
||
"""清空收集器(用于报告测试中重新收集)。"""
|
||
self.results.clear()
|
||
|
||
|
||
_COLLECTOR = _ResultCollector()
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# 测试辅助工具
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
|
||
class _FakeTool(Tool):
|
||
"""测试用的 Fake Tool。"""
|
||
|
||
def __init__(
|
||
self,
|
||
name: str,
|
||
description: str,
|
||
input_schema: dict[str, Any] | None = None,
|
||
tags: list[str] | None = None,
|
||
) -> None:
|
||
super().__init__(
|
||
name=name,
|
||
description=description,
|
||
input_schema=input_schema,
|
||
tags=tags or [],
|
||
)
|
||
|
||
async def execute(self, **kwargs: Any) -> dict[str, Any]:
|
||
return {"status": "ok"}
|
||
|
||
|
||
def _build_test_tools() -> list[Tool]:
|
||
"""创建一组测试工具(覆盖 io/file/web/shell/testing 场景)。"""
|
||
return [
|
||
_FakeTool(
|
||
name="read_file",
|
||
description="Read the contents of a file from the filesystem.",
|
||
input_schema={
|
||
"type": "object",
|
||
"properties": {
|
||
"path": {"type": "string", "description": "file path to read"},
|
||
},
|
||
"required": ["path"],
|
||
},
|
||
tags=["io", "file"],
|
||
),
|
||
_FakeTool(
|
||
name="write_file",
|
||
description="Write content to a file on the filesystem.",
|
||
input_schema={
|
||
"type": "object",
|
||
"properties": {
|
||
"path": {"type": "string", "description": "file path to write"},
|
||
"content": {"type": "string", "description": "content to write"},
|
||
},
|
||
"required": ["path", "content"],
|
||
},
|
||
tags=["io", "file"],
|
||
),
|
||
_FakeTool(
|
||
name="web_search",
|
||
description="Search the web for information using a search engine.",
|
||
input_schema={
|
||
"type": "object",
|
||
"properties": {
|
||
"query": {"type": "string", "description": "search query"},
|
||
},
|
||
"required": ["query"],
|
||
},
|
||
tags=["web", "search"],
|
||
),
|
||
_FakeTool(
|
||
name="shell_exec",
|
||
description="Execute a shell command and return the output.",
|
||
input_schema={
|
||
"type": "object",
|
||
"properties": {
|
||
"command": {
|
||
"type": "string",
|
||
"description": "shell command to execute",
|
||
},
|
||
},
|
||
"required": ["command"],
|
||
},
|
||
tags=["shell", "system"],
|
||
),
|
||
_FakeTool(
|
||
name="run_tests",
|
||
description="Run project tests to verify code changes.",
|
||
input_schema={
|
||
"type": "object",
|
||
"properties": {
|
||
"commands": {
|
||
"type": "array",
|
||
"description": "test commands to run",
|
||
},
|
||
},
|
||
},
|
||
tags=["testing", "verification"],
|
||
),
|
||
]
|
||
|
||
|
||
def _build_mock_skill_registry() -> SkillRegistry:
|
||
"""构建包含测试 Skill 的 SkillRegistry(不依赖真实 LLM)。"""
|
||
registry = SkillRegistry()
|
||
tools = _build_test_tools()
|
||
|
||
# react_agent skill — 使用 web_search 工具
|
||
react_config = SkillConfig(
|
||
name="react_agent",
|
||
agent_type="react_agent",
|
||
version="1.0.0",
|
||
description="ReAct agent skill for tool-augmented reasoning",
|
||
execution_mode="react",
|
||
prompt={
|
||
"identity": "You are a ReAct agent.",
|
||
"instructions": "Use tools to answer questions step by step.",
|
||
},
|
||
)
|
||
registry.register(Skill(react_config, tools=[tools[2]])) # web_search
|
||
|
||
# coder skill — 使用 read_file + write_file 工具
|
||
coder_config = SkillConfig(
|
||
name="coder",
|
||
agent_type="coder",
|
||
version="1.0.0",
|
||
description="Code generation and review skill",
|
||
execution_mode="react",
|
||
prompt={
|
||
"identity": "You are a coding assistant.",
|
||
"instructions": "Help with code generation, review, and refactoring.",
|
||
},
|
||
)
|
||
registry.register(
|
||
Skill(coder_config, tools=[tools[0], tools[1]]) # read_file, write_file
|
||
)
|
||
|
||
return registry
|
||
|
||
|
||
def _build_preprocessor() -> RequestPreprocessor:
|
||
"""构建带 mock skill_registry 的 RequestPreprocessor。"""
|
||
return RequestPreprocessor(
|
||
skill_registry=_build_mock_skill_registry(),
|
||
default_tools=_build_test_tools(),
|
||
default_system_prompt="You are a helpful assistant.",
|
||
default_model="test-model",
|
||
default_agent_name="default",
|
||
)
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# 测试数据定义
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
# 1. 预处理准确度测试用例(≥15)
|
||
PREPROCESSING_CASES: list[dict[str, Any]] = [
|
||
# Greeting/Chitchat → DIRECT_CHAT
|
||
{"id": "greeting_cn", "input": "你好", "expected_mode": "direct_chat"},
|
||
{"id": "greeting_en", "input": "hello", "expected_mode": "direct_chat"},
|
||
{"id": "greeting_hi", "input": "hi", "expected_mode": "direct_chat"},
|
||
{"id": "chitchat_thanks", "input": "谢谢", "expected_mode": "direct_chat"},
|
||
{"id": "chitchat_ok", "input": "好的", "expected_mode": "direct_chat"},
|
||
{"id": "identity_who", "input": "你是谁", "expected_mode": "direct_chat"},
|
||
{"id": "identity_name", "input": "你叫什么", "expected_mode": "direct_chat"},
|
||
# Tool-requiring queries → REACT
|
||
{"id": "tool_ip", "input": "查下ip", "expected_mode": "react"},
|
||
{"id": "tool_search", "input": "搜索golang教程", "expected_mode": "react"},
|
||
{"id": "tool_shell", "input": "执行ls命令", "expected_mode": "react"},
|
||
{"id": "tool_file", "input": "读一下配置文件", "expected_mode": "react"},
|
||
{"id": "tool_monitor", "input": "检查服务状态", "expected_mode": "react"},
|
||
# Complex queries → REACT
|
||
{"id": "complex_analysis", "input": "帮我分析一下这个数据并生成报告", "expected_mode": "react"},
|
||
{"id": "complex_code", "input": "重构这个函数使其更高效", "expected_mode": "react"},
|
||
{"id": "complex_multi", "input": "搜索最新的AI论文并总结关键发现", "expected_mode": "react"},
|
||
# @skill prefix → SKILL_REACT
|
||
{
|
||
"id": "skill_prefix_react",
|
||
"input": "@skill:react_agent 查看当前ip",
|
||
"expected_mode": "skill_react",
|
||
},
|
||
{
|
||
"id": "skill_prefix_coder",
|
||
"input": "@skill:coder 写一个函数",
|
||
"expected_mode": "skill_react",
|
||
},
|
||
]
|
||
|
||
# 2. 召回率测试用例(≥8)
|
||
RECALL_CASES: list[dict[str, Any]] = [
|
||
{
|
||
"id": "recall_valid_react",
|
||
"input": "@skill:react_agent 查看ip",
|
||
"expected_matched": True,
|
||
"expected_skill": "react_agent",
|
||
"expected_mode": "skill_react",
|
||
},
|
||
{
|
||
"id": "recall_valid_coder",
|
||
"input": "@skill:coder 写代码",
|
||
"expected_matched": True,
|
||
"expected_skill": "coder",
|
||
"expected_mode": "skill_react",
|
||
},
|
||
{
|
||
"id": "recall_invalid_skill",
|
||
"input": "@skill:nonexistent 做点什么",
|
||
"expected_matched": False,
|
||
"expected_skill": None,
|
||
"expected_mode": "react",
|
||
},
|
||
{
|
||
"id": "recall_no_prefix_react",
|
||
"input": "查下ip地址",
|
||
"expected_matched": False,
|
||
"expected_skill": None,
|
||
"expected_mode": "react",
|
||
},
|
||
{
|
||
"id": "recall_no_prefix_greeting",
|
||
"input": "你好",
|
||
"expected_matched": False,
|
||
"expected_skill": None,
|
||
"expected_mode": "direct_chat",
|
||
},
|
||
{
|
||
"id": "recall_no_prefix_complex",
|
||
"input": "分析数据并生成报告",
|
||
"expected_matched": False,
|
||
"expected_skill": None,
|
||
"expected_mode": "react",
|
||
},
|
||
{
|
||
"id": "recall_skill_only_prefix",
|
||
"input": "@skill:react_agent",
|
||
"expected_matched": True,
|
||
"expected_skill": "react_agent",
|
||
"expected_mode": "skill_react",
|
||
},
|
||
{
|
||
"id": "recall_skill_with_long_content",
|
||
"input": "@skill:coder 请帮我重构这个函数,使其时间复杂度从 O(n²) 降到 O(n)",
|
||
"expected_matched": True,
|
||
"expected_skill": "coder",
|
||
"expected_mode": "skill_react",
|
||
},
|
||
]
|
||
|
||
# 3. 过拟合检测测试用例(≥5 组,每组原始 + 3 个改写)
|
||
OVERFITTING_CASES: list[dict[str, Any]] = [
|
||
{
|
||
"id": "overfit_ip_check",
|
||
"original": "查看当前ip",
|
||
"paraphrases": ["查下ip", "获取ip地址", "看下ip"],
|
||
"expected_mode": "react",
|
||
},
|
||
{
|
||
"id": "overfit_search",
|
||
"original": "搜索golang教程",
|
||
"paraphrases": ["搜一下golang教程", "找下golang学习资料", "帮我搜golang入门"],
|
||
"expected_mode": "react",
|
||
},
|
||
{
|
||
"id": "overfit_greeting",
|
||
"original": "你好",
|
||
"paraphrases": ["hello", "hi", "嗨"],
|
||
"expected_mode": "direct_chat",
|
||
},
|
||
{
|
||
"id": "overfit_file_read",
|
||
"original": "读一下配置文件",
|
||
"paraphrases": ["看一下配置文件", "帮我读配置", "查看配置文件内容"],
|
||
"expected_mode": "react",
|
||
},
|
||
{
|
||
"id": "overfit_identity",
|
||
"original": "你是谁",
|
||
"paraphrases": ["你叫什么", "自我介绍", "你是什么"],
|
||
"expected_mode": "direct_chat",
|
||
},
|
||
]
|
||
|
||
# 4. 执行效率测试用例(≥5)
|
||
EFFICIENCY_CASES: list[dict[str, Any]] = [
|
||
{
|
||
"id": "efficiency_greeting",
|
||
"input": "你好",
|
||
"expected_mode": "direct_chat",
|
||
"max_time_ms": 2000,
|
||
},
|
||
{
|
||
"id": "efficiency_chitchat",
|
||
"input": "谢谢",
|
||
"expected_mode": "direct_chat",
|
||
"max_time_ms": 2000,
|
||
},
|
||
{
|
||
"id": "efficiency_identity",
|
||
"input": "你是谁",
|
||
"expected_mode": "direct_chat",
|
||
"max_time_ms": 2000,
|
||
},
|
||
{
|
||
"id": "efficiency_react_tool",
|
||
"input": "查下ip",
|
||
"expected_mode": "react",
|
||
"max_time_ms": 5000,
|
||
},
|
||
{
|
||
"id": "efficiency_react_complex",
|
||
"input": "帮我分析一下这个数据并生成报告",
|
||
"expected_mode": "react",
|
||
"max_time_ms": 5000,
|
||
},
|
||
]
|
||
|
||
# 5. 工具搜索准确度测试用例(≥8)
|
||
TOOL_SEARCH_CASES: list[dict[str, Any]] = [
|
||
{
|
||
"id": "tool_search_read",
|
||
"query": "read file",
|
||
"expected_top1": "read_file",
|
||
},
|
||
{
|
||
"id": "tool_search_write",
|
||
"query": "write file",
|
||
"expected_top1": "write_file",
|
||
},
|
||
{
|
||
"id": "tool_search_web",
|
||
"query": "web search",
|
||
"expected_top1": "web_search",
|
||
},
|
||
{
|
||
"id": "tool_search_shell",
|
||
"query": "shell command execute",
|
||
"expected_top1": "shell_exec",
|
||
},
|
||
{
|
||
"id": "tool_search_tests",
|
||
"query": "run tests verify",
|
||
"expected_top1": "run_tests",
|
||
},
|
||
{
|
||
"id": "tool_search_file_multiple",
|
||
"query": "file",
|
||
"expected_contains": ["read_file", "write_file"],
|
||
},
|
||
{
|
||
"id": "tool_search_no_match",
|
||
"query": "xyzzy_nonexistent_xyz",
|
||
"expected_empty": True,
|
||
},
|
||
{
|
||
"id": "tool_search_empty_query",
|
||
"query": "",
|
||
"expected_empty": True,
|
||
},
|
||
]
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# 1. 预处理准确度
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
|
||
@pytest.mark.e2e_capability
|
||
class TestPreprocessingAccuracy:
|
||
"""预处理准确度测试:验证 RequestPreprocessor 的路由决策。
|
||
|
||
覆盖场景:
|
||
- Greeting/Chitchat → DIRECT_CHAT(零成本快速路径)
|
||
- Tool-requiring queries → REACT(LLM 决定工具使用)
|
||
- @skill prefix → SKILL_REACT(显式技能选择)
|
||
- Complex queries → REACT(默认 agent 循环)
|
||
"""
|
||
|
||
@pytest.mark.parametrize(
|
||
"case",
|
||
PREPROCESSING_CASES,
|
||
ids=[c["id"] for c in PREPROCESSING_CASES],
|
||
)
|
||
def test_preprocessing_routing(self, case: dict[str, Any]) -> None:
|
||
"""验证每个输入被路由到正确的执行模式。"""
|
||
preprocessor = _build_preprocessor()
|
||
result = asyncio.run(preprocessor.preprocess(content=case["input"]))
|
||
|
||
actual_mode = result.execution_mode.value
|
||
expected_mode = case["expected_mode"]
|
||
passed = actual_mode == expected_mode
|
||
|
||
_COLLECTOR.record(
|
||
dimension="preprocessing_accuracy",
|
||
case_id=case["id"],
|
||
passed=passed,
|
||
input=case["input"],
|
||
expected=expected_mode,
|
||
actual=actual_mode,
|
||
match_method=result.match_method,
|
||
)
|
||
|
||
assert actual_mode == expected_mode, (
|
||
f"'{case['input']}': expected {expected_mode}, got {actual_mode} "
|
||
f"(method={result.match_method})"
|
||
)
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# 2. 召回率 — 技能匹配
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
|
||
@pytest.mark.e2e_capability
|
||
class TestSkillRecall:
|
||
"""技能召回率测试:验证 @skill 前缀识别和 fallback 机制。
|
||
|
||
覆盖场景:
|
||
- 有效 @skill 前缀 → 正确匹配技能
|
||
- 无效 @skill 前缀 → fallback 到 REACT
|
||
- 无前缀 → 默认 REACT 或 DIRECT_CHAT
|
||
"""
|
||
|
||
@pytest.mark.parametrize(
|
||
"case",
|
||
RECALL_CASES,
|
||
ids=[c["id"] for c in RECALL_CASES],
|
||
)
|
||
def test_skill_recall(self, case: dict[str, Any]) -> None:
|
||
"""验证 @skill 前缀的召回和 fallback 行为。"""
|
||
preprocessor = _build_preprocessor()
|
||
result = asyncio.run(preprocessor.preprocess(content=case["input"]))
|
||
|
||
actual_matched = result.matched
|
||
actual_skill = result.skill_name
|
||
actual_mode = result.execution_mode.value
|
||
expected_matched = case["expected_matched"]
|
||
expected_skill = case["expected_skill"]
|
||
expected_mode = case["expected_mode"]
|
||
|
||
passed = (
|
||
actual_matched == expected_matched
|
||
and actual_skill == expected_skill
|
||
and actual_mode == expected_mode
|
||
)
|
||
|
||
_COLLECTOR.record(
|
||
dimension="skill_recall",
|
||
case_id=case["id"],
|
||
passed=passed,
|
||
input=case["input"],
|
||
expected_matched=expected_matched,
|
||
actual_matched=actual_matched,
|
||
expected_skill=expected_skill,
|
||
actual_skill=actual_skill,
|
||
expected_mode=expected_mode,
|
||
actual_mode=actual_mode,
|
||
)
|
||
|
||
assert actual_matched == expected_matched, (
|
||
f"'{case['input']}': matched expected {expected_matched}, got {actual_matched}"
|
||
)
|
||
assert actual_skill == expected_skill, (
|
||
f"'{case['input']}': skill expected {expected_skill}, got {actual_skill}"
|
||
)
|
||
assert actual_mode == expected_mode, (
|
||
f"'{case['input']}': mode expected {expected_mode}, got {actual_mode}"
|
||
)
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# 3. 过拟合检测
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
|
||
@pytest.mark.e2e_capability
|
||
class TestOverfittingDetection:
|
||
"""过拟合检测:验证同一意图的不同表达方式产生一致的执行模式。
|
||
|
||
核心思路:如果路由器对 "查看当前ip" 和 "查下ip" 给出不同的执行模式,
|
||
说明路由器对特定表达过拟合,泛化能力不足。
|
||
"""
|
||
|
||
@pytest.mark.parametrize(
|
||
"case",
|
||
OVERFITTING_CASES,
|
||
ids=[c["id"] for c in OVERFITTING_CASES],
|
||
)
|
||
def test_paraphrase_consistency(self, case: dict[str, Any]) -> None:
|
||
"""验证原始输入和改写输入产生相同的执行模式。"""
|
||
preprocessor = _build_preprocessor()
|
||
expected_mode = case["expected_mode"]
|
||
|
||
# 测试原始输入
|
||
original_result = asyncio.run(preprocessor.preprocess(content=case["original"]))
|
||
original_mode = original_result.execution_mode.value
|
||
|
||
# 测试所有改写
|
||
paraphrase_modes: list[str] = []
|
||
for para in case["paraphrases"]:
|
||
result = asyncio.run(preprocessor.preprocess(content=para))
|
||
paraphrase_modes.append(result.execution_mode.value)
|
||
|
||
all_modes = [original_mode] + paraphrase_modes
|
||
all_consistent = all(m == expected_mode for m in all_modes)
|
||
|
||
_COLLECTOR.record(
|
||
dimension="overfitting_detection",
|
||
case_id=case["id"],
|
||
passed=all_consistent,
|
||
original=case["original"],
|
||
original_mode=original_mode,
|
||
paraphrases=case["paraphrases"],
|
||
paraphrase_modes=paraphrase_modes,
|
||
expected_mode=expected_mode,
|
||
)
|
||
|
||
assert all_consistent, (
|
||
f"Overfitting detected for '{case['id']}': "
|
||
f"original='{case['original']}' → {original_mode}, "
|
||
f"paraphrases={case['paraphrases']} → {paraphrase_modes}, "
|
||
f"expected={expected_mode}"
|
||
)
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# 4. 执行效率
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
|
||
@pytest.mark.e2e_capability
|
||
class TestExecutionEfficiency:
|
||
"""执行效率测试:测量预处理阶段的响应时间。
|
||
|
||
约束:
|
||
- DIRECT_CHAT 路径:< 2s(零成本快速路径)
|
||
- REACT 路径启动:< 5s(预处理阶段,不含 LLM 调用)
|
||
"""
|
||
|
||
@pytest.mark.parametrize(
|
||
"case",
|
||
EFFICIENCY_CASES,
|
||
ids=[c["id"] for c in EFFICIENCY_CASES],
|
||
)
|
||
def test_preprocessing_latency(self, case: dict[str, Any]) -> None:
|
||
"""验证预处理阶段耗时在阈值范围内。"""
|
||
preprocessor = _build_preprocessor()
|
||
|
||
start = time.perf_counter()
|
||
result = asyncio.run(preprocessor.preprocess(content=case["input"]))
|
||
elapsed_ms = (time.perf_counter() - start) * 1000
|
||
|
||
actual_mode = result.execution_mode.value
|
||
max_time_ms = case["max_time_ms"]
|
||
passed = elapsed_ms < max_time_ms and actual_mode == case["expected_mode"]
|
||
|
||
_COLLECTOR.record(
|
||
dimension="execution_efficiency",
|
||
case_id=case["id"],
|
||
passed=passed,
|
||
input=case["input"],
|
||
elapsed_ms=round(elapsed_ms, 2),
|
||
max_time_ms=max_time_ms,
|
||
actual_mode=actual_mode,
|
||
expected_mode=case["expected_mode"],
|
||
)
|
||
|
||
assert elapsed_ms < max_time_ms, (
|
||
f"'{case['input']}': elapsed {elapsed_ms:.1f}ms > limit {max_time_ms}ms"
|
||
)
|
||
assert actual_mode == case["expected_mode"], (
|
||
f"'{case['input']}': mode expected {case['expected_mode']}, got {actual_mode}"
|
||
)
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# 5. 工具搜索准确度
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
|
||
@pytest.mark.e2e_capability
|
||
class TestToolSearchAccuracy:
|
||
"""工具搜索准确度测试:验证 BM25 搜索的相关性排序。
|
||
|
||
覆盖场景:
|
||
- 精确匹配工具名/描述
|
||
- 模糊匹配关键词
|
||
- 无匹配返回空
|
||
- 空查询返回空
|
||
"""
|
||
|
||
@pytest.mark.parametrize(
|
||
"case",
|
||
TOOL_SEARCH_CASES,
|
||
ids=[c["id"] for c in TOOL_SEARCH_CASES],
|
||
)
|
||
def test_tool_search(self, case: dict[str, Any]) -> None:
|
||
"""验证工具搜索返回正确的结果。"""
|
||
index = ToolSearchIndex(_build_test_tools())
|
||
results = index.search(case["query"], top_k=5)
|
||
result_names = [r.name for r in results]
|
||
|
||
passed = False
|
||
detail: dict[str, Any] = {"query": case["query"], "results": result_names}
|
||
|
||
if case.get("expected_empty"):
|
||
passed = len(results) == 0
|
||
detail["expected_empty"] = True
|
||
elif "expected_top1" in case:
|
||
passed = len(results) > 0 and results[0].name == case["expected_top1"]
|
||
detail["expected_top1"] = case["expected_top1"]
|
||
elif "expected_contains" in case:
|
||
expected = case["expected_contains"]
|
||
passed = all(name in result_names for name in expected)
|
||
detail["expected_contains"] = expected
|
||
|
||
_COLLECTOR.record(
|
||
dimension="tool_search_accuracy",
|
||
case_id=case["id"],
|
||
passed=passed,
|
||
**detail,
|
||
)
|
||
|
||
if case.get("expected_empty"):
|
||
assert len(results) == 0, f"Query '{case['query']}': expected empty, got {result_names}"
|
||
elif "expected_top1" in case:
|
||
assert len(results) > 0, f"Query '{case['query']}': no results"
|
||
assert results[0].name == case["expected_top1"], (
|
||
f"Query '{case['query']}': expected top1={case['expected_top1']}, "
|
||
f"got {results[0].name}"
|
||
)
|
||
elif "expected_contains" in case:
|
||
for name in case["expected_contains"]:
|
||
assert name in result_names, (
|
||
f"Query '{case['query']}': expected '{name}' in results, got {result_names}"
|
||
)
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# 6. 事件模型完整性
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
|
||
@pytest.mark.e2e_capability
|
||
class TestEventModelIntegrity:
|
||
"""事件模型完整性测试:验证 SQ/EQ 双队列的基本功能。
|
||
|
||
覆盖场景:
|
||
- SQ 提交和消费
|
||
- SQ 取消任务
|
||
- EQ 事件推送和订阅
|
||
- EQ 多订阅者广播
|
||
- EQ 缓冲回放
|
||
- 事件类型分类
|
||
"""
|
||
|
||
async def test_sq_submit_and_drain(self) -> None:
|
||
"""SQ 正确接收用户输入并按顺序消费。"""
|
||
sq = SubmissionQueue()
|
||
task_id = await sq.submit("hello", "session-1")
|
||
|
||
received: list[Submission] = []
|
||
|
||
async def consumer() -> None:
|
||
async for sub in sq.drain():
|
||
received.append(sub)
|
||
if len(received) >= 1:
|
||
break
|
||
|
||
consumer_task = asyncio.create_task(consumer())
|
||
await asyncio.wait_for(consumer_task, timeout=1.0)
|
||
|
||
passed = len(received) == 1 and received[0].content == "hello"
|
||
_COLLECTOR.record(
|
||
dimension="event_model_integrity",
|
||
case_id="sq_submit_and_drain",
|
||
passed=passed,
|
||
)
|
||
assert passed
|
||
assert received[0].task_id == task_id
|
||
|
||
async def test_sq_cancel_task(self) -> None:
|
||
"""SQ 取消任务后 drain 跳过该提交。"""
|
||
sq = SubmissionQueue()
|
||
task_id_1 = await sq.submit("first", "session-1")
|
||
await sq.submit("second", "session-1")
|
||
await sq.cancel(task_id_1)
|
||
|
||
received: list[str] = []
|
||
|
||
async def consumer() -> None:
|
||
async for sub in sq.drain():
|
||
received.append(sub.content)
|
||
if len(received) >= 1:
|
||
break
|
||
|
||
consumer_task = asyncio.create_task(consumer())
|
||
await asyncio.wait_for(consumer_task, timeout=1.0)
|
||
|
||
passed = received == ["second"]
|
||
_COLLECTOR.record(
|
||
dimension="event_model_integrity",
|
||
case_id="sq_cancel_task",
|
||
passed=passed,
|
||
)
|
||
assert passed
|
||
|
||
async def test_eq_emit_and_subscribe(self) -> None:
|
||
"""EQ 正确推送事件给订阅者。"""
|
||
eq = EventQueue()
|
||
event = Event.create(
|
||
event_type=TurnEventType.TOKEN,
|
||
task_id="task-1",
|
||
session_id="session-1",
|
||
data={"text": "hello"},
|
||
)
|
||
|
||
received: list[Event] = []
|
||
|
||
async def subscriber() -> None:
|
||
async for evt in eq.subscribe():
|
||
received.append(evt)
|
||
break
|
||
|
||
sub_task = asyncio.create_task(subscriber())
|
||
await asyncio.sleep(0.05)
|
||
await eq.emit(event)
|
||
await asyncio.wait_for(sub_task, timeout=1.0)
|
||
|
||
passed = (
|
||
len(received) == 1
|
||
and received[0].event_type == TurnEventType.TOKEN
|
||
and received[0].data == {"text": "hello"}
|
||
)
|
||
_COLLECTOR.record(
|
||
dimension="event_model_integrity",
|
||
case_id="eq_emit_and_subscribe",
|
||
passed=passed,
|
||
)
|
||
assert passed
|
||
|
||
async def test_eq_broadcast_to_multiple_subscribers(self) -> None:
|
||
"""EQ 多订阅者同时接收事件(广播)。"""
|
||
eq = EventQueue()
|
||
received_a: list[Event] = []
|
||
received_b: list[Event] = []
|
||
|
||
async def subscriber_a() -> None:
|
||
async for evt in eq.subscribe():
|
||
received_a.append(evt)
|
||
if len(received_a) >= 2:
|
||
break
|
||
|
||
async def subscriber_b() -> None:
|
||
async for evt in eq.subscribe():
|
||
received_b.append(evt)
|
||
if len(received_b) >= 2:
|
||
break
|
||
|
||
task_a = asyncio.create_task(subscriber_a())
|
||
task_b = asyncio.create_task(subscriber_b())
|
||
await asyncio.sleep(0.05)
|
||
|
||
await eq.emit(Event.create(TurnEventType.TOKEN, "t1", "s1", {"seq": 1}))
|
||
await eq.emit(Event.create(TurnEventType.TOKEN, "t1", "s1", {"seq": 2}))
|
||
|
||
await asyncio.wait_for(task_a, timeout=1.0)
|
||
await asyncio.wait_for(task_b, timeout=1.0)
|
||
|
||
passed = len(received_a) == 2 and len(received_b) == 2
|
||
_COLLECTOR.record(
|
||
dimension="event_model_integrity",
|
||
case_id="eq_broadcast",
|
||
passed=passed,
|
||
)
|
||
assert passed
|
||
|
||
async def test_eq_buffer_replay(self) -> None:
|
||
"""EQ 事件缓冲对新订阅者的回放。"""
|
||
eq = EventQueue(buffer_size=100)
|
||
|
||
await eq.emit(Event.create(TurnEventType.TOKEN, "t1", "s1", {"seq": 1}))
|
||
await eq.emit(Event.create(TurnEventType.TOKEN, "t1", "s1", {"seq": 2}))
|
||
|
||
received: list[Event] = []
|
||
|
||
async def subscriber() -> None:
|
||
async for evt in eq.subscribe():
|
||
received.append(evt)
|
||
if len(received) >= 2:
|
||
break
|
||
|
||
sub_task = asyncio.create_task(subscriber())
|
||
await asyncio.wait_for(sub_task, timeout=1.0)
|
||
|
||
passed = (
|
||
len(received) == 2 and received[0].data == {"seq": 1} and received[1].data == {"seq": 2}
|
||
)
|
||
_COLLECTOR.record(
|
||
dimension="event_model_integrity",
|
||
case_id="eq_buffer_replay",
|
||
passed=passed,
|
||
)
|
||
assert passed
|
||
|
||
def test_event_type_classification(self) -> None:
|
||
"""事件类型按前缀正确分类(session/task/turn)。"""
|
||
session_events = [
|
||
SessionEventType.SESSION_STARTED,
|
||
SessionEventType.SESSION_ENDED,
|
||
]
|
||
task_events = [
|
||
TaskEventType.TASK_CREATED,
|
||
TaskEventType.TASK_STARTED,
|
||
TaskEventType.TASK_COMPLETED,
|
||
TaskEventType.TASK_FAILED,
|
||
]
|
||
turn_events = [
|
||
TurnEventType.TURN_STARTED,
|
||
TurnEventType.THINKING,
|
||
TurnEventType.TOOL_CALL,
|
||
TurnEventType.TOKEN,
|
||
TurnEventType.FINAL_ANSWER,
|
||
]
|
||
|
||
all_correct = (
|
||
all(e.startswith("session.") for e in session_events)
|
||
and all(e.startswith("task.") for e in task_events)
|
||
and all(e.startswith("turn.") for e in turn_events)
|
||
)
|
||
|
||
_COLLECTOR.record(
|
||
dimension="event_model_integrity",
|
||
case_id="event_type_classification",
|
||
passed=all_correct,
|
||
)
|
||
assert all_correct
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# 7. Spec 管理功能
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
|
||
@pytest.mark.e2e_capability
|
||
class TestSpecManagement:
|
||
"""Spec 管理功能测试:验证 Spec 的创建/读取/更新/确认流程。
|
||
|
||
覆盖场景:
|
||
- 创建 Spec 并持久化到 YAML
|
||
- 读取 Spec(缓存 + 磁盘)
|
||
- 更新 Spec 字段
|
||
- 确认 Spec(状态 + 时间戳 + 步骤)
|
||
- 删除 Spec
|
||
"""
|
||
|
||
def test_spec_create_and_get(self, tmp_path: Path) -> None:
|
||
"""创建 Spec 并读取,验证字段完整。"""
|
||
mgr = SpecManager(specs_dir=str(tmp_path / "specs"))
|
||
spec = Spec(
|
||
spec_id="test-spec-1",
|
||
goal="Test goal",
|
||
steps=[
|
||
SpecStep(step_id="s1", name="Step 1", description="First"),
|
||
SpecStep(
|
||
step_id="s2",
|
||
name="Step 2",
|
||
description="Second",
|
||
dependencies=["s1"],
|
||
),
|
||
],
|
||
)
|
||
path = mgr.create(spec)
|
||
loaded = mgr.get("test-spec-1")
|
||
|
||
passed = (
|
||
path.exists()
|
||
and loaded is not None
|
||
and loaded.spec_id == "test-spec-1"
|
||
and loaded.goal == "Test goal"
|
||
and len(loaded.steps) == 2
|
||
and loaded.steps[1].dependencies == ["s1"]
|
||
)
|
||
_COLLECTOR.record(
|
||
dimension="spec_management",
|
||
case_id="spec_create_and_get",
|
||
passed=passed,
|
||
)
|
||
assert passed
|
||
|
||
def test_spec_update(self, tmp_path: Path) -> None:
|
||
"""更新 Spec 字段并持久化。"""
|
||
mgr = SpecManager(specs_dir=str(tmp_path / "specs"))
|
||
spec = Spec(spec_id="test-spec-2", goal="Original")
|
||
mgr.create(spec)
|
||
|
||
updated = mgr.update("test-spec-2", goal="Updated goal")
|
||
reloaded = mgr.get("test-spec-2")
|
||
|
||
passed = (
|
||
updated is not None
|
||
and updated.goal == "Updated goal"
|
||
and reloaded is not None
|
||
and reloaded.goal == "Updated goal"
|
||
)
|
||
_COLLECTOR.record(
|
||
dimension="spec_management",
|
||
case_id="spec_update",
|
||
passed=passed,
|
||
)
|
||
assert passed
|
||
|
||
def test_spec_confirm(self, tmp_path: Path) -> None:
|
||
"""确认 Spec 后状态和步骤状态正确变更。"""
|
||
mgr = SpecManager(specs_dir=str(tmp_path / "specs"))
|
||
spec = Spec(
|
||
spec_id="test-spec-3",
|
||
goal="Confirm test",
|
||
steps=[
|
||
SpecStep(step_id="s1", name="Step 1", description="First"),
|
||
],
|
||
)
|
||
mgr.create(spec)
|
||
|
||
confirmed = mgr.confirm("test-spec-3")
|
||
|
||
passed = (
|
||
confirmed is not None
|
||
and confirmed.status == "confirmed"
|
||
and confirmed.confirmed_at is not None
|
||
and all(s.status == "confirmed" for s in confirmed.steps)
|
||
)
|
||
_COLLECTOR.record(
|
||
dimension="spec_management",
|
||
case_id="spec_confirm",
|
||
passed=passed,
|
||
)
|
||
assert passed
|
||
|
||
def test_spec_list_and_filter(self, tmp_path: Path) -> None:
|
||
"""列出 Spec 并按状态过滤。"""
|
||
mgr = SpecManager(specs_dir=str(tmp_path / "specs"))
|
||
mgr.create(Spec(spec_id="draft-1", goal="Draft"))
|
||
mgr.create(Spec(spec_id="confirmed-1", goal="Confirmed"))
|
||
mgr.confirm("confirmed-1")
|
||
|
||
all_specs = mgr.list_specs()
|
||
draft_specs = mgr.list_specs(status="draft")
|
||
confirmed_specs = mgr.list_specs(status="confirmed")
|
||
|
||
passed = (
|
||
len(all_specs) == 2
|
||
and len(draft_specs) == 1
|
||
and len(confirmed_specs) == 1
|
||
and confirmed_specs[0].spec_id == "confirmed-1"
|
||
)
|
||
_COLLECTOR.record(
|
||
dimension="spec_management",
|
||
case_id="spec_list_and_filter",
|
||
passed=passed,
|
||
)
|
||
assert passed
|
||
|
||
def test_spec_delete(self, tmp_path: Path) -> None:
|
||
"""删除 Spec 后文件和缓存均移除。"""
|
||
mgr = SpecManager(specs_dir=str(tmp_path / "specs"))
|
||
mgr.create(Spec(spec_id="delete-me", goal="To be deleted"))
|
||
|
||
result = mgr.delete("delete-me")
|
||
loaded = mgr.get("delete-me")
|
||
|
||
passed = result is True and loaded is None
|
||
_COLLECTOR.record(
|
||
dimension="spec_management",
|
||
case_id="spec_delete",
|
||
passed=passed,
|
||
)
|
||
assert passed
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# 8. 验证循环
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
|
||
@pytest.mark.e2e_capability
|
||
class TestVerificationLoop:
|
||
"""验证循环测试:验证 VerificationLoop 的 verify 和 retry 机制。
|
||
|
||
覆盖场景:
|
||
- 成功命令返回 passed=True
|
||
- 失败命令返回 passed=False
|
||
- 超时命令返回 passed=False
|
||
- 重试机制(无 fix_callback / 有 fix_callback)
|
||
"""
|
||
|
||
async def test_verify_success(self) -> None:
|
||
"""成功命令返回 passed=True。"""
|
||
loop = VerificationLoop(commands=["echo ok"], timeout=10.0)
|
||
result = await loop.verify()
|
||
|
||
passed = result.passed is True and "ok" in result.test_output
|
||
_COLLECTOR.record(
|
||
dimension="verification_loop",
|
||
case_id="verify_success",
|
||
passed=passed,
|
||
)
|
||
assert passed
|
||
|
||
async def test_verify_failure(self) -> None:
|
||
"""失败命令返回 passed=False。"""
|
||
loop = VerificationLoop(commands=["false"], timeout=10.0)
|
||
result = await loop.verify()
|
||
|
||
passed = result.passed is False and len(result.errors) > 0
|
||
_COLLECTOR.record(
|
||
dimension="verification_loop",
|
||
case_id="verify_failure",
|
||
passed=passed,
|
||
)
|
||
assert passed
|
||
|
||
async def test_verify_timeout(self) -> None:
|
||
"""超时命令返回 passed=False。"""
|
||
loop = VerificationLoop(commands=["sleep 10"], timeout=0.5)
|
||
result = await loop.verify()
|
||
|
||
passed = result.passed is False and any("timed out" in e for e in result.errors)
|
||
_COLLECTOR.record(
|
||
dimension="verification_loop",
|
||
case_id="verify_timeout",
|
||
passed=passed,
|
||
)
|
||
assert passed
|
||
|
||
async def test_verify_and_retry_no_callback(self) -> None:
|
||
"""无 fix_callback 时重试指定次数。"""
|
||
loop = VerificationLoop(commands=["false"], max_retries=2, timeout=5.0)
|
||
result = await loop.verify_and_retry()
|
||
|
||
passed = result.passed is False and result.attempts == 3
|
||
_COLLECTOR.record(
|
||
dimension="verification_loop",
|
||
case_id="verify_and_retry_no_callback",
|
||
passed=passed,
|
||
)
|
||
assert passed
|
||
|
||
async def test_verify_and_retry_with_callback(self) -> None:
|
||
"""fix_callback 被调用并接收 errors 和 test_output。"""
|
||
call_count = 0
|
||
|
||
async def fix_cb(errors: list[str], test_output: str) -> None:
|
||
nonlocal call_count
|
||
call_count += 1
|
||
|
||
loop = VerificationLoop(commands=["false"], max_retries=1, timeout=5.0)
|
||
result = await loop.verify_and_retry(fix_callback=fix_cb)
|
||
|
||
passed = result.passed is False and call_count == 1
|
||
_COLLECTOR.record(
|
||
dimension="verification_loop",
|
||
case_id="verify_and_retry_with_callback",
|
||
passed=passed,
|
||
)
|
||
assert passed
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# 9. 综合报告生成
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
|
||
def _run_all_checks_for_report() -> _ResultCollector:
|
||
"""运行所有维度的检查,返回填充好的收集器(用于报告生成)。
|
||
|
||
这确保报告测试自包含,不依赖其他测试的执行顺序。
|
||
"""
|
||
collector = _ResultCollector()
|
||
preprocessor = _build_preprocessor()
|
||
tools = _build_test_tools()
|
||
search_index = ToolSearchIndex(tools)
|
||
|
||
# --- 1. 预处理准确度 ---
|
||
for case in PREPROCESSING_CASES:
|
||
result = asyncio.run(preprocessor.preprocess(content=case["input"]))
|
||
actual = result.execution_mode.value
|
||
passed = actual == case["expected_mode"]
|
||
collector.record(
|
||
dimension="preprocessing_accuracy",
|
||
case_id=case["id"],
|
||
passed=passed,
|
||
input=case["input"],
|
||
expected=case["expected_mode"],
|
||
actual=actual,
|
||
)
|
||
|
||
# --- 2. 召回率 ---
|
||
for case in RECALL_CASES:
|
||
result = asyncio.run(preprocessor.preprocess(content=case["input"]))
|
||
passed = (
|
||
result.matched == case["expected_matched"]
|
||
and result.skill_name == case["expected_skill"]
|
||
and result.execution_mode.value == case["expected_mode"]
|
||
)
|
||
collector.record(
|
||
dimension="skill_recall",
|
||
case_id=case["id"],
|
||
passed=passed,
|
||
)
|
||
|
||
# --- 3. 过拟合检测 ---
|
||
for case in OVERFITTING_CASES:
|
||
original_result = asyncio.run(preprocessor.preprocess(content=case["original"]))
|
||
modes = [original_result.execution_mode.value]
|
||
for para in case["paraphrases"]:
|
||
r = asyncio.run(preprocessor.preprocess(content=para))
|
||
modes.append(r.execution_mode.value)
|
||
passed = all(m == case["expected_mode"] for m in modes)
|
||
collector.record(
|
||
dimension="overfitting_detection",
|
||
case_id=case["id"],
|
||
passed=passed,
|
||
)
|
||
|
||
# --- 4. 执行效率 ---
|
||
for case in EFFICIENCY_CASES:
|
||
start = time.perf_counter()
|
||
result = asyncio.run(preprocessor.preprocess(content=case["input"]))
|
||
elapsed_ms = (time.perf_counter() - start) * 1000
|
||
passed = (
|
||
elapsed_ms < case["max_time_ms"]
|
||
and result.execution_mode.value == case["expected_mode"]
|
||
)
|
||
collector.record(
|
||
dimension="execution_efficiency",
|
||
case_id=case["id"],
|
||
passed=passed,
|
||
elapsed_ms=round(elapsed_ms, 2),
|
||
)
|
||
|
||
# --- 5. 工具搜索准确度 ---
|
||
for case in TOOL_SEARCH_CASES:
|
||
results = search_index.search(case["query"], top_k=5)
|
||
names = [r.name for r in results]
|
||
if case.get("expected_empty"):
|
||
passed = len(results) == 0
|
||
elif "expected_top1" in case:
|
||
passed = len(results) > 0 and results[0].name == case["expected_top1"]
|
||
elif "expected_contains" in case:
|
||
passed = all(n in names for n in case["expected_contains"])
|
||
else:
|
||
passed = False
|
||
collector.record(
|
||
dimension="tool_search_accuracy",
|
||
case_id=case["id"],
|
||
passed=passed,
|
||
)
|
||
|
||
# --- 6. 事件模型完整性 ---
|
||
async def _run_event_checks() -> None:
|
||
# SQ submit + drain
|
||
sq = SubmissionQueue()
|
||
await sq.submit("test", "s1")
|
||
sq_received: list[Submission] = []
|
||
|
||
async def sq_consumer() -> None:
|
||
async for sub in sq.drain():
|
||
sq_received.append(sub)
|
||
break
|
||
|
||
sq_task = asyncio.create_task(sq_consumer())
|
||
await asyncio.wait_for(sq_task, timeout=1.0)
|
||
collector.record(
|
||
dimension="event_model_integrity",
|
||
case_id="sq_submit_and_drain",
|
||
passed=len(sq_received) == 1,
|
||
)
|
||
|
||
# EQ emit + subscribe
|
||
eq = EventQueue()
|
||
event = Event.create(TurnEventType.TOKEN, "t1", "s1", {"text": "hi"})
|
||
eq_received: list[Event] = []
|
||
|
||
async def eq_sub() -> None:
|
||
async for evt in eq.subscribe():
|
||
eq_received.append(evt)
|
||
break
|
||
|
||
eq_task = asyncio.create_task(eq_sub())
|
||
await asyncio.sleep(0.05)
|
||
await eq.emit(event)
|
||
await asyncio.wait_for(eq_task, timeout=1.0)
|
||
collector.record(
|
||
dimension="event_model_integrity",
|
||
case_id="eq_emit_and_subscribe",
|
||
passed=len(eq_received) == 1,
|
||
)
|
||
|
||
asyncio.run(_run_event_checks())
|
||
|
||
# 事件类型分类
|
||
type_ok = (
|
||
SessionEventType.SESSION_STARTED.startswith("session.")
|
||
and TaskEventType.TASK_STARTED.startswith("task.")
|
||
and TurnEventType.TOKEN.startswith("turn.")
|
||
)
|
||
collector.record(
|
||
dimension="event_model_integrity",
|
||
case_id="event_type_classification",
|
||
passed=type_ok,
|
||
)
|
||
|
||
# --- 7. Spec 管理 ---
|
||
import tempfile
|
||
|
||
with tempfile.TemporaryDirectory() as tmpdir:
|
||
mgr = SpecManager(specs_dir=tmpdir)
|
||
spec = Spec(
|
||
spec_id="report-spec",
|
||
goal="Report test",
|
||
steps=[SpecStep(step_id="s1", name="S1", description="Step 1")],
|
||
)
|
||
mgr.create(spec)
|
||
loaded = mgr.get("report-spec")
|
||
collector.record(
|
||
dimension="spec_management",
|
||
case_id="spec_create_and_get",
|
||
passed=loaded is not None and loaded.goal == "Report test",
|
||
)
|
||
|
||
confirmed = mgr.confirm("report-spec")
|
||
collector.record(
|
||
dimension="spec_management",
|
||
case_id="spec_confirm",
|
||
passed=confirmed is not None and confirmed.status == "confirmed",
|
||
)
|
||
|
||
# --- 8. 验证循环 ---
|
||
async def _run_verification_checks() -> None:
|
||
loop_ok = VerificationLoop(commands=["echo ok"], timeout=10.0)
|
||
result_ok = await loop_ok.verify()
|
||
collector.record(
|
||
dimension="verification_loop",
|
||
case_id="verify_success",
|
||
passed=result_ok.passed is True,
|
||
)
|
||
|
||
loop_fail = VerificationLoop(commands=["false"], timeout=5.0)
|
||
result_fail = await loop_fail.verify()
|
||
collector.record(
|
||
dimension="verification_loop",
|
||
case_id="verify_failure",
|
||
passed=result_fail.passed is False,
|
||
)
|
||
|
||
asyncio.run(_run_verification_checks())
|
||
|
||
return collector
|
||
|
||
|
||
def _generate_json_report(collector: _ResultCollector) -> dict[str, Any]:
|
||
"""生成 JSON 格式的综合报告。"""
|
||
dimensions = [
|
||
"preprocessing_accuracy",
|
||
"skill_recall",
|
||
"overfitting_detection",
|
||
"execution_efficiency",
|
||
"tool_search_accuracy",
|
||
"event_model_integrity",
|
||
"spec_management",
|
||
"verification_loop",
|
||
]
|
||
|
||
dimension_scores: dict[str, float] = {}
|
||
dimension_details: dict[str, Any] = {}
|
||
for dim in dimensions:
|
||
score = collector.dimension_score(dim)
|
||
dimension_scores[dim] = round(score, 1)
|
||
dimension_details[dim] = {
|
||
"total": len(collector.results.get(dim, [])),
|
||
"passed": sum(1 for c in collector.results.get(dim, []) if c["passed"]),
|
||
"score": round(score, 1),
|
||
"cases": collector.results.get(dim, []),
|
||
}
|
||
|
||
total_score = collector.total_score()
|
||
|
||
# 改进建议
|
||
suggestions: list[str] = []
|
||
for dim, score in dimension_scores.items():
|
||
if score < 100:
|
||
suggestions.append(f"[{dim}] 得分 {score:.1f}%,存在失败用例,需检查相关组件")
|
||
if not suggestions:
|
||
suggestions.append("所有维度均达到 100%,架构状态良好")
|
||
|
||
return {
|
||
"report_type": "comprehensive_capability_backtest",
|
||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||
"total_score": round(total_score, 1),
|
||
"total_cases": sum(len(cases) for cases in collector.results.values()),
|
||
"total_passed": sum(
|
||
1 for cases in collector.results.values() for c in cases if c["passed"]
|
||
),
|
||
"dimension_scores": dimension_scores,
|
||
"dimension_details": dimension_details,
|
||
"suggestions": suggestions,
|
||
}
|
||
|
||
|
||
def _generate_text_report(json_report: dict[str, Any]) -> str:
|
||
"""生成中文文本格式的综合报告。"""
|
||
lines: list[str] = []
|
||
sep = "=" * 70
|
||
|
||
lines.append(sep)
|
||
lines.append("Fischer AgentKit 综合能力回测报告")
|
||
lines.append(sep)
|
||
lines.append(f"生成时间: {json_report['generated_at']}")
|
||
lines.append(f"总体评分: {json_report['total_score']:.1f}%")
|
||
lines.append(
|
||
f"用例总数: {json_report['total_cases']} "
|
||
f"通过: {json_report['total_passed']} "
|
||
f"失败: {json_report['total_cases'] - json_report['total_passed']}"
|
||
)
|
||
lines.append("")
|
||
|
||
# 各维度得分
|
||
lines.append("-" * 70)
|
||
lines.append("各维度得分")
|
||
lines.append("-" * 70)
|
||
dim_names: dict[str, str] = {
|
||
"preprocessing_accuracy": "预处理准确度",
|
||
"skill_recall": "技能召回率",
|
||
"overfitting_detection": "过拟合检测",
|
||
"execution_efficiency": "执行效率",
|
||
"tool_search_accuracy": "工具搜索准确度",
|
||
"event_model_integrity": "事件模型完整性",
|
||
"spec_management": "Spec 管理功能",
|
||
"verification_loop": "验证循环",
|
||
}
|
||
for dim, score in json_report["dimension_scores"].items():
|
||
name = dim_names.get(dim, dim)
|
||
detail = json_report["dimension_details"][dim]
|
||
status = "✓" if score == 100 else "✗"
|
||
lines.append(f" {status} {name}: {score:.1f}% ({detail['passed']}/{detail['total']})")
|
||
lines.append("")
|
||
|
||
# 详细用例结果
|
||
lines.append("-" * 70)
|
||
lines.append("详细用例结果")
|
||
lines.append("-" * 70)
|
||
for dim, details in json_report["dimension_details"].items():
|
||
name = dim_names.get(dim, dim)
|
||
lines.append(f"\n[{name}]")
|
||
for case in details["cases"]:
|
||
status = "✓" if case["passed"] else "✗"
|
||
lines.append(f" {status} {case['case_id']}")
|
||
lines.append("")
|
||
|
||
# 改进建议
|
||
lines.append("-" * 70)
|
||
lines.append("改进建议")
|
||
lines.append("-" * 70)
|
||
for suggestion in json_report["suggestions"]:
|
||
lines.append(f" • {suggestion}")
|
||
lines.append("")
|
||
lines.append(sep)
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
@pytest.mark.e2e_capability
|
||
class TestComprehensiveReport:
|
||
"""综合报告生成测试:在所有测试完成后生成综合能力报告。
|
||
|
||
输出:
|
||
- JSON 报告: test-results/e2e/comprehensive_report.json
|
||
- 文本报告: test-results/e2e/comprehensive_report.txt
|
||
"""
|
||
|
||
def test_generate_comprehensive_report(self, tmp_path: Path) -> None:
|
||
"""运行所有维度的检查并生成综合报告。"""
|
||
# 自包含运行所有检查(不依赖其他测试的执行顺序)
|
||
collector = _run_all_checks_for_report()
|
||
|
||
# 合并已有收集器结果(如果其他测试已运行)
|
||
for dim, cases in _COLLECTOR.results.items():
|
||
if dim not in collector.results:
|
||
collector.results[dim] = cases
|
||
|
||
# 生成报告
|
||
json_report = _generate_json_report(collector)
|
||
text_report = _generate_text_report(json_report)
|
||
|
||
# 确保输出目录存在
|
||
output_dir = Path("test-results/e2e")
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 保存 JSON 报告
|
||
json_path = output_dir / "comprehensive_report.json"
|
||
json_path.write_text(
|
||
json.dumps(json_report, ensure_ascii=False, indent=2),
|
||
encoding="utf-8",
|
||
)
|
||
|
||
# 保存文本报告
|
||
text_path = output_dir / "comprehensive_report.txt"
|
||
text_path.write_text(text_report, encoding="utf-8")
|
||
|
||
# 打印报告到控制台
|
||
print(f"\n{text_report}")
|
||
print(f"\nJSON 报告: {json_path}")
|
||
print(f"文本报告: {text_path}")
|
||
|
||
# 验证报告文件已生成
|
||
assert json_path.exists(), "JSON report file not generated"
|
||
assert text_path.exists(), "Text report file not generated"
|
||
|
||
# 验证报告内容完整
|
||
assert json_report["total_cases"] > 0, "No test cases in report"
|
||
assert len(json_report["dimension_scores"]) == 8, "Expected 8 dimensions in report"
|
||
|
||
# 验证总体通过率不低于阈值(允许部分用例失败,但总体应 > 80%)
|
||
total_score = json_report["total_score"]
|
||
print(f"\n总体评分: {total_score:.1f}%")
|
||
assert total_score >= 80.0, f"Total score {total_score:.1f}% is below 80% threshold"
|