diff --git a/src/agentkit/cli/benchmark.py b/src/agentkit/cli/benchmark.py
new file mode 100644
index 0000000..45e7dd7
--- /dev/null
+++ b/src/agentkit/cli/benchmark.py
@@ -0,0 +1,1369 @@
+"""Benchmark CLI command — run capability backtests and generate reports.
+
+Tests core AgentKit components directly (no pytest subprocess, no real LLM):
+- preprocessing: RequestPreprocessor routing accuracy
+- overfitting: routing consistency across paraphrases
+- efficiency: component execution timing
+- tool_search: ToolSearchIndex BM25 relevance
+- event_model: SubmissionQueue / EventQueue lifecycle
+- spec_management: SpecManager CRUD operations
+- verification: VerificationLoop execute/retry behavior
+
+Usage:
+ agentkit benchmark # run all dimensions
+ agentkit benchmark --dimension preprocessing
+ agentkit benchmark --report # JSON + TXT report
+ agentkit benchmark --report --format html # + HTML report
+ agentkit benchmark --output-dir ./my-results
+ agentkit benchmark --fast # core cases only
+ agentkit benchmark --verbose # detailed output
+"""
+
+from __future__ import annotations
+
+import asyncio
+import json
+import time
+from dataclasses import asdict, dataclass, field
+from datetime import datetime, timezone
+from enum import Enum
+from pathlib import Path
+from typing import Any
+
+import typer
+from rich.console import Console
+from rich.panel import Panel
+from rich.progress import (
+ BarColumn,
+ Progress,
+ SpinnerColumn,
+ TaskProgressColumn,
+ TextColumn,
+)
+from rich.table import Table
+
+console = Console()
+
+_DEFAULT_OUTPUT_DIR = "test-results/benchmark"
+
+
+class BenchmarkDimension(str, Enum):
+ """Benchmark test dimensions."""
+
+ PREPROCESSING = "preprocessing"
+ OVERFITTING = "overfitting"
+ EFFICIENCY = "efficiency"
+ TOOL_SEARCH = "tool_search"
+ EVENT_MODEL = "event_model"
+ SPEC_MANAGEMENT = "spec_management"
+ VERIFICATION = "verification"
+ ALL = "all"
+
+
+# ---------------------------------------------------------------------------
+# Result data structures
+# ---------------------------------------------------------------------------
+
+
+@dataclass
+class TestCaseResult:
+ """Single test case result."""
+
+ case_id: str
+ passed: bool
+ expected: str
+ actual: str
+ duration_ms: float
+ detail: str = ""
+
+
+@dataclass
+class DimensionResult:
+ """Aggregated result for one dimension."""
+
+ dimension: str
+ total: int = 0
+ passed: int = 0
+ failed: int = 0
+ details: list[TestCaseResult] = field(default_factory=list)
+
+ @property
+ def score(self) -> float:
+ return self.passed / self.total if self.total > 0 else 0.0
+
+ def add(self, case: TestCaseResult) -> None:
+ self.total += 1
+ if case.passed:
+ self.passed += 1
+ else:
+ self.failed += 1
+ self.details.append(case)
+
+ def to_dict(self) -> dict[str, Any]:
+ return {
+ "score": round(self.score, 4),
+ "total": self.total,
+ "passed": self.passed,
+ "failed": self.failed,
+ "details": [asdict(d) for d in self.details],
+ }
+
+
+# ---------------------------------------------------------------------------
+# Helpers — mock objects
+# ---------------------------------------------------------------------------
+
+
+def _make_mock_skill_registry():
+ """Build a SkillRegistry with a couple of mock skills for preprocessing tests."""
+ from agentkit.skills.base import Skill, SkillConfig
+ from agentkit.skills.registry import SkillRegistry
+
+ registry = SkillRegistry()
+
+ react_config = SkillConfig(
+ name="react_agent",
+ agent_type="react",
+ description="General ReAct agent",
+ execution_mode="react",
+ prompt={"identity": "You are a helpful assistant."},
+ )
+ registry.register(Skill(react_config))
+
+ direct_config = SkillConfig(
+ name="chat_only",
+ agent_type="direct",
+ description="Direct chat agent",
+ execution_mode="direct",
+ prompt={"identity": "You are a chat bot."},
+ )
+ registry.register(Skill(direct_config))
+
+ return registry
+
+
+def _make_mock_tools():
+ """Build a list of mock Tool instances for tool_search tests."""
+ from agentkit.tools.base import Tool
+
+ class _FakeTool(Tool):
+ def __init__(
+ self,
+ name: str,
+ description: str,
+ input_schema: dict[str, Any] | None = None,
+ tags: list[str] | None = None,
+ ):
+ super().__init__(
+ name=name,
+ description=description,
+ input_schema=input_schema,
+ tags=tags or [],
+ )
+
+ async def execute(self, **kwargs) -> dict:
+ return {"status": "ok"}
+
+ return [
+ _FakeTool(
+ name="read_file",
+ description="Read the contents of a file from the filesystem.",
+ input_schema={
+ "type": "object",
+ "properties": {"path": {"type": "string", "description": "file path to read"}},
+ "required": ["path"],
+ },
+ tags=["io", "file"],
+ ),
+ _FakeTool(
+ name="write_file",
+ description="Write content to a file on the filesystem.",
+ input_schema={
+ "type": "object",
+ "properties": {
+ "path": {"type": "string", "description": "file path to write"},
+ "content": {"type": "string", "description": "content to write"},
+ },
+ "required": ["path", "content"],
+ },
+ tags=["io", "file"],
+ ),
+ _FakeTool(
+ name="web_search",
+ description="Search the web for information using a search engine.",
+ input_schema={
+ "type": "object",
+ "properties": {"query": {"type": "string", "description": "search query"}},
+ "required": ["query"],
+ },
+ tags=["web", "search"],
+ ),
+ _FakeTool(
+ name="shell_exec",
+ description="Execute a shell command and return the output.",
+ input_schema={
+ "type": "object",
+ "properties": {"command": {"type": "string", "description": "shell command"}},
+ "required": ["command"],
+ },
+ tags=["system", "shell"],
+ ),
+ _FakeTool(
+ name="http_request",
+ description="Send an HTTP request to a URL and return the response.",
+ input_schema={
+ "type": "object",
+ "properties": {
+ "url": {"type": "string", "description": "target URL"},
+ "method": {"type": "string", "description": "HTTP method"},
+ },
+ "required": ["url"],
+ },
+ tags=["web", "http"],
+ ),
+ ]
+
+
+# ---------------------------------------------------------------------------
+# Dimension test runners
+# ---------------------------------------------------------------------------
+
+
+async def _run_preprocessing(fast: bool, verbose: bool) -> DimensionResult:
+ """Test RequestPreprocessor routing accuracy."""
+ from agentkit.chat.request_preprocessor import RequestPreprocessor
+
+ registry = _make_mock_skill_registry()
+ preprocessor = RequestPreprocessor(skill_registry=registry)
+
+ cases: list[dict[str, str]] = [
+ {"id": "greeting_cn", "input": "你好", "expected": "direct_chat"},
+ {"id": "greeting_en", "input": "hello", "expected": "direct_chat"},
+ {"id": "chitchat_thanks", "input": "谢谢", "expected": "direct_chat"},
+ {"id": "identity_who", "input": "你是谁", "expected": "direct_chat"},
+ {"id": "colloquial_ip_1", "input": "查下ip", "expected": "react"},
+ {"id": "colloquial_ip_2", "input": "查看当前ip", "expected": "react"},
+ {"id": "tool_search", "input": "搜索golang教程", "expected": "react"},
+ {"id": "tool_shell", "input": "执行ls命令", "expected": "react"},
+ {"id": "translation", "input": "翻译hello为中文", "expected": "react"},
+ {"id": "knowledge", "input": "什么是机器学习", "expected": "react"},
+ {"id": "skill_prefix_react", "input": "@skill:react_agent 查看ip", "expected": "skill_react"},
+ {"id": "skill_prefix_direct", "input": "@skill:chat_only 你好", "expected": "skill_react"},
+ {"id": "skill_not_found", "input": "@skill:nonexistent 做点什么", "expected": "react"},
+ {"id": "complex_analysis", "input": "帮我分析一下这个数据并生成报告", "expected": "react"},
+ {"id": "empty_fallback", "input": "随便聊聊", "expected": "react"},
+ ]
+
+ if fast:
+ # Core cases only: greetings, tool queries, skill prefix
+ fast_ids = {
+ "greeting_cn",
+ "colloquial_ip_1",
+ "tool_search",
+ "skill_prefix_react",
+ "skill_not_found",
+ }
+ cases = [c for c in cases if c["id"] in fast_ids]
+
+ result = DimensionResult(dimension="preprocessing")
+
+ for case in cases:
+ start = time.perf_counter()
+ routing = await preprocessor.preprocess(content=case["input"])
+ elapsed_ms = (time.perf_counter() - start) * 1000
+
+ actual = routing.execution_mode.value
+ passed = actual == case["expected"]
+
+ result.add(
+ TestCaseResult(
+ case_id=case["id"],
+ passed=passed,
+ expected=case["expected"],
+ actual=actual,
+ duration_ms=round(elapsed_ms, 2),
+ detail=f"input={case['input']!r} method={routing.match_method}",
+ )
+ )
+
+ if verbose and not passed:
+ console.print(
+ f" [red]✗[/red] {case['id']}: expected={case['expected']} "
+ f"actual={actual} ({routing.match_method})"
+ )
+ elif verbose:
+ console.print(f" [green]✓[/green] {case['id']}: {actual} ({elapsed_ms:.1f}ms)")
+
+ return result
+
+
+async def _run_overfitting(fast: bool, verbose: bool) -> DimensionResult:
+ """Test routing consistency across paraphrases (overfitting detection).
+
+ Same intent expressed differently should route to the same execution mode.
+ """
+ from agentkit.chat.request_preprocessor import RequestPreprocessor
+
+ registry = _make_mock_skill_registry()
+ preprocessor = RequestPreprocessor(skill_registry=registry)
+
+ paraphrase_groups: list[dict[str, Any]] = [
+ {
+ "id": "ip_check_variants",
+ "paraphrases": ["查下ip", "查看当前ip", "获取ip地址", "看下ip", "帮我查一下ip"],
+ "expected": "react",
+ },
+ {
+ "id": "search_variants",
+ "paraphrases": ["搜索golang教程", "搜一下golang教程", "找下golang学习资料"],
+ "expected": "react",
+ },
+ {
+ "id": "greeting_variants",
+ "paraphrases": ["你好", "hello", "hi", "嗨", "哈喽"],
+ "expected": "direct_chat",
+ },
+ ]
+
+ if fast:
+ paraphrase_groups = paraphrase_groups[:2]
+
+ result = DimensionResult(dimension="overfitting")
+
+ for group in paraphrase_groups:
+ modes: list[str] = []
+ for text in group["paraphrases"]:
+ routing = await preprocessor.preprocess(content=text)
+ modes.append(routing.execution_mode.value)
+
+ # All paraphrases should produce the same mode
+ unique_modes = set(modes)
+ consistent = len(unique_modes) == 1
+ expected_mode = group["expected"]
+ correct = consistent and modes[0] == expected_mode if modes else False
+
+ result.add(
+ TestCaseResult(
+ case_id=group["id"],
+ passed=correct,
+ expected=expected_mode,
+ actual=",".join(modes),
+ duration_ms=0.0,
+ detail=f"paraphrases={len(group['paraphrases'])} consistent={consistent}",
+ )
+ )
+
+ if verbose:
+ status = "[green]✓[/green]" if correct else "[red]✗[/red]"
+ console.print(f" {status} {group['id']}: modes={modes}")
+
+ return result
+
+
+async def _run_efficiency(fast: bool, verbose: bool) -> DimensionResult:
+ """Test component execution efficiency (timing bounds)."""
+ from agentkit.chat.request_preprocessor import RequestPreprocessor
+ from agentkit.tools.search import ToolSearchIndex
+
+ registry = _make_mock_skill_registry()
+ preprocessor = RequestPreprocessor(skill_registry=registry)
+ tools = _make_mock_tools()
+ search_index = ToolSearchIndex(tools)
+
+ # Thresholds in milliseconds (generous — these are pure-Python ops)
+ thresholds: list[dict[str, Any]] = [
+ {
+ "id": "preprocess_greeting",
+ "func": lambda: preprocessor.preprocess(content="你好"),
+ "max_ms": 50.0,
+ "iterations": 100,
+ },
+ {
+ "id": "preprocess_react",
+ "func": lambda: preprocessor.preprocess(content="查下ip"),
+ "max_ms": 50.0,
+ "iterations": 100,
+ },
+ {
+ "id": "preprocess_skill_prefix",
+ "func": lambda: preprocessor.preprocess(content="@skill:react_agent test"),
+ "max_ms": 50.0,
+ "iterations": 100,
+ },
+ {
+ "id": "tool_search_query",
+ "func": None, # handled specially (sync)
+ "max_ms": 10.0,
+ "iterations": 200,
+ },
+ {
+ "id": "tool_search_empty",
+ "func": None,
+ "max_ms": 5.0,
+ "iterations": 200,
+ },
+ ]
+
+ if fast:
+ thresholds = [t for t in thresholds if t["id"] in {
+ "preprocess_greeting", "tool_search_query"
+ }]
+
+ result = DimensionResult(dimension="efficiency")
+
+ for spec in thresholds:
+ start = time.perf_counter()
+ if spec["func"] is not None:
+ for _ in range(spec["iterations"]):
+ await spec["func"]()
+ else:
+ query = "read file" if "query" in spec["id"] else ""
+ for _ in range(spec["iterations"]):
+ search_index.search(query, top_k=5)
+ total_ms = (time.perf_counter() - start) * 1000
+ avg_ms = total_ms / spec["iterations"]
+
+ passed = avg_ms <= spec["max_ms"]
+ result.add(
+ TestCaseResult(
+ case_id=spec["id"],
+ passed=passed,
+ expected=f"<= {spec['max_ms']}ms/call",
+ actual=f"{avg_ms:.3f}ms/call",
+ duration_ms=round(total_ms, 2),
+ detail=f"iterations={spec['iterations']}",
+ )
+ )
+
+ if verbose:
+ status = "[green]✓[/green]" if passed else "[red]✗[/red]"
+ console.print(
+ f" {status} {spec['id']}: {avg_ms:.3f}ms/call "
+ f"(threshold {spec['max_ms']}ms)"
+ )
+
+ return result
+
+
+async def _run_tool_search(fast: bool, verbose: bool) -> DimensionResult:
+ """Test ToolSearchIndex BM25 relevance ranking."""
+ from agentkit.tools.search import ToolSearchIndex
+
+ tools = _make_mock_tools()
+ index = ToolSearchIndex(tools)
+
+ cases: list[dict[str, Any]] = [
+ {"id": "read_file_query", "query": "read file", "expected_top": "read_file"},
+ {"id": "write_file_query", "query": "write file content", "expected_top": "write_file"},
+ {"id": "web_search_query", "query": "search web information", "expected_top": "web_search"},
+ {"id": "shell_exec_query", "query": "execute shell command", "expected_top": "shell_exec"},
+ {"id": "http_request_query", "query": "send http request url", "expected_top": "http_request"},
+ {"id": "file_tag_query", "query": "io file", "expected_top": "read_file"},
+ {"id": "empty_query", "query": "", "expected_top": "__none__"},
+ {"id": "no_match_query", "query": "zzzznonexistent", "expected_top": "__none__"},
+ {"id": "top_k_limit", "query": "file", "expected_top": "read_file", "top_k": 1},
+ {"id": "multi_token_query", "query": "search query engine", "expected_top": "web_search"},
+ ]
+
+ if fast:
+ fast_ids = {"read_file_query", "web_search_query", "empty_query", "top_k_limit"}
+ cases = [c for c in cases if c["id"] in fast_ids]
+
+ result = DimensionResult(dimension="tool_search")
+
+ for case in cases:
+ start = time.perf_counter()
+ top_k = case.get("top_k", 5)
+ found = index.search(case["query"], top_k=top_k)
+ elapsed_ms = (time.perf_counter() - start) * 1000
+
+ if case["expected_top"] == "__none__":
+ passed = len(found) == 0
+ actual = "[]" if passed else found[0].name
+ else:
+ actual = found[0].name if found else "__empty__"
+ passed = actual == case["expected_top"]
+
+ result.add(
+ TestCaseResult(
+ case_id=case["id"],
+ passed=passed,
+ expected=case["expected_top"],
+ actual=actual,
+ duration_ms=round(elapsed_ms, 2),
+ detail=f"query={case['query']!r} top_k={top_k} results={len(found)}",
+ )
+ )
+
+ if verbose:
+ status = "[green]✓[/green]" if passed else "[red]✗[/red]"
+ console.print(f" {status} {case['id']}: top={actual} ({elapsed_ms:.2f}ms)")
+
+ return result
+
+
+async def _run_event_model(fast: bool, verbose: bool) -> DimensionResult:
+ """Test SubmissionQueue / EventQueue lifecycle."""
+ from agentkit.core.event_queue import EventQueue, SubmissionQueue
+ from agentkit.core.protocol import Event
+
+ result = DimensionResult(dimension="event_model")
+
+ # --- SubmissionQueue tests ---
+ sq = SubmissionQueue()
+
+ # Test 1: submit and drain
+ start = time.perf_counter()
+ task_id = await sq.submit("hello", "session-1")
+ drained: list[str] = []
+ async for submission in sq.drain():
+ drained.append(submission.content)
+ break # only drain one to avoid blocking
+ elapsed_ms = (time.perf_counter() - start) * 1000
+ passed = task_id != "" and drained == ["hello"]
+ result.add(
+ TestCaseResult(
+ case_id="sq_submit_drain",
+ passed=passed,
+ expected="task_id + drained=['hello']",
+ actual=f"task_id={task_id[:8]}... drained={drained}",
+ duration_ms=round(elapsed_ms, 2),
+ )
+ )
+ if verbose:
+ console.print(f" {'[green]✓[/green]' if passed else '[red]✗[/red]'} sq_submit_drain")
+
+ # Test 2: cancel
+ start = time.perf_counter()
+ cancel_id = await sq.submit("to-cancel", "session-2")
+ cancelled = await sq.cancel(cancel_id)
+ elapsed_ms = (time.perf_counter() - start) * 1000
+ passed = cancelled and sq._submissions[cancel_id].cancelled
+ result.add(
+ TestCaseResult(
+ case_id="sq_cancel",
+ passed=passed,
+ expected="cancelled=True",
+ actual=f"cancelled={cancelled}",
+ duration_ms=round(elapsed_ms, 2),
+ )
+ )
+ if verbose:
+ console.print(f" {'[green]✓[/green]' if passed else '[red]✗[/red]'} sq_cancel")
+
+ # Test 3: close blocks new submissions
+ start = time.perf_counter()
+ sq2 = SubmissionQueue()
+ sq2.close()
+ raised = False
+ try:
+ await sq2.submit("after-close", "session-3")
+ except RuntimeError:
+ raised = True
+ elapsed_ms = (time.perf_counter() - start) * 1000
+ passed = raised and sq2.is_closed
+ result.add(
+ TestCaseResult(
+ case_id="sq_close_blocks",
+ passed=passed,
+ expected="RuntimeError on submit after close",
+ actual=f"raised={raised} closed={sq2.is_closed}",
+ duration_ms=round(elapsed_ms, 2),
+ )
+ )
+ if verbose:
+ console.print(f" {'[green]✓[/green]' if passed else '[red]✗[/red]'} sq_close_blocks")
+
+ # --- EventQueue tests ---
+ eq = EventQueue(buffer_size=10)
+
+ # Test 4: emit and subscribe with replay
+ start = time.perf_counter()
+ test_event = Event(
+ event_type="test_event",
+ task_id="task-1",
+ session_id="session-1",
+ data={"msg": "hello"},
+ timestamp=datetime.now(timezone.utc).isoformat(),
+ )
+ await eq.emit(test_event)
+
+ received: list[Event] = []
+ # Subscribe and collect one event (replay)
+ async for event in eq.subscribe():
+ received.append(event)
+ break
+ elapsed_ms = (time.perf_counter() - start) * 1000
+ passed = len(received) == 1 and received[0].event_type == "test_event"
+ result.add(
+ TestCaseResult(
+ case_id="eq_emit_subscribe_replay",
+ passed=passed,
+ expected="1 event replayed",
+ actual=f"{len(received)} events",
+ duration_ms=round(elapsed_ms, 2),
+ )
+ )
+ if verbose:
+ console.print(f" {'[green]✓[/green]' if passed else '[red]✗[/red]'} eq_emit_subscribe_replay")
+
+ # Test 5: close sends sentinel
+ start = time.perf_counter()
+ eq2 = EventQueue()
+
+ async def _consume_all() -> list[Event]:
+ events: list[Event] = []
+ async for ev in eq2.subscribe():
+ events.append(ev)
+ return events
+
+ # Start consumer, emit, then close
+ consumer_task = asyncio.create_task(_consume_all())
+ await asyncio.sleep(0.01) # let subscriber register
+ await eq2.emit(test_event)
+ await asyncio.sleep(0.01)
+ eq2.close()
+ events = await asyncio.wait_for(consumer_task, timeout=2.0)
+ elapsed_ms = (time.perf_counter() - start) * 1000
+ passed = len(events) >= 1 and eq2.is_closed
+ result.add(
+ TestCaseResult(
+ case_id="eq_close_sentinel",
+ passed=passed,
+ expected="subscriber exits on close",
+ actual=f"{len(events)} events, closed={eq2.is_closed}",
+ duration_ms=round(elapsed_ms, 2),
+ )
+ )
+ if verbose:
+ console.print(f" {'[green]✓[/green]' if passed else '[red]✗[/red]'} eq_close_sentinel")
+
+ # Test 6: subscriber count
+ start = time.perf_counter()
+ eq3 = EventQueue()
+ initial_count = eq3.subscriber_count
+ elapsed_ms = (time.perf_counter() - start) * 1000
+ passed = initial_count == 0
+ result.add(
+ TestCaseResult(
+ case_id="eq_subscriber_count",
+ passed=passed,
+ expected="0 subscribers initially",
+ actual=f"{initial_count} subscribers",
+ duration_ms=round(elapsed_ms, 2),
+ )
+ )
+ if verbose:
+ console.print(f" {'[green]✓[/green]' if passed else '[red]✗[/red]'} eq_subscriber_count")
+
+ if fast:
+ # Keep only core cases in fast mode
+ core_ids = {"sq_submit_drain", "eq_emit_subscribe_replay", "eq_close_sentinel"}
+ result.details = [d for d in result.details if d.case_id in core_ids]
+ result.total = len(result.details)
+ result.passed = sum(1 for d in result.details if d.passed)
+ result.failed = result.total - result.passed
+
+ return result
+
+
+async def _run_spec_management(fast: bool, verbose: bool, tmp_dir: Path) -> DimensionResult:
+ """Test SpecManager CRUD operations."""
+ from agentkit.core.spec_manager import Spec, SpecManager, SpecStep
+
+ specs_dir = str(tmp_dir / "specs")
+ manager = SpecManager(specs_dir=specs_dir)
+
+ result = DimensionResult(dimension="spec_management")
+
+ # Test 1: create
+ start = time.perf_counter()
+ spec = Spec(
+ spec_id="spec-001",
+ goal="Test goal",
+ steps=[
+ SpecStep(step_id="s1", name="step1", description="first step"),
+ SpecStep(step_id="s2", name="step2", description="second step", dependencies=["s1"]),
+ ],
+ )
+ path = manager.create(spec)
+ elapsed_ms = (time.perf_counter() - start) * 1000
+ passed = path.exists()
+ result.add(
+ TestCaseResult(
+ case_id="spec_create",
+ passed=passed,
+ expected="file exists on disk",
+ actual=f"exists={path.exists()}",
+ duration_ms=round(elapsed_ms, 2),
+ )
+ )
+ if verbose:
+ console.print(f" {'[green]✓[/green]' if passed else '[red]✗[/red]'} spec_create")
+
+ # Test 2: get
+ start = time.perf_counter()
+ loaded = manager.get("spec-001")
+ elapsed_ms = (time.perf_counter() - start) * 1000
+ passed = loaded is not None and loaded.spec_id == "spec-001" and len(loaded.steps) == 2
+ result.add(
+ TestCaseResult(
+ case_id="spec_get",
+ passed=passed,
+ expected="spec with 2 steps",
+ actual=f"steps={len(loaded.steps) if loaded else 0}",
+ duration_ms=round(elapsed_ms, 2),
+ )
+ )
+ if verbose:
+ console.print(f" {'[green]✓[/green]' if passed else '[red]✗[/red]'} spec_get")
+
+ # Test 3: update
+ start = time.perf_counter()
+ updated = manager.update("spec-001", goal="Updated goal")
+ elapsed_ms = (time.perf_counter() - start) * 1000
+ passed = updated is not None and updated.goal == "Updated goal"
+ result.add(
+ TestCaseResult(
+ case_id="spec_update",
+ passed=passed,
+ expected="goal='Updated goal'",
+ actual=f"goal={updated.goal if updated else None}",
+ duration_ms=round(elapsed_ms, 2),
+ )
+ )
+ if verbose:
+ console.print(f" {'[green]✓[/green]' if passed else '[red]✗[/red]'} spec_update")
+
+ # Test 4: confirm
+ start = time.perf_counter()
+ confirmed = manager.confirm("spec-001")
+ elapsed_ms = (time.perf_counter() - start) * 1000
+ passed = (
+ confirmed is not None
+ and confirmed.status == "confirmed"
+ and confirmed.confirmed_at is not None
+ and all(s.status == "confirmed" for s in confirmed.steps)
+ )
+ result.add(
+ TestCaseResult(
+ case_id="spec_confirm",
+ passed=passed,
+ expected="status=confirmed, all steps confirmed",
+ actual=f"status={confirmed.status if confirmed else None}",
+ duration_ms=round(elapsed_ms, 2),
+ )
+ )
+ if verbose:
+ console.print(f" {'[green]✓[/green]' if passed else '[red]✗[/red]'} spec_confirm")
+
+ # Test 5: list
+ start = time.perf_counter()
+ # Create a second spec for listing
+ spec2 = Spec(spec_id="spec-002", goal="Second goal")
+ manager.create(spec2)
+ specs = manager.list_specs()
+ elapsed_ms = (time.perf_counter() - start) * 1000
+ passed = len(specs) == 2
+ result.add(
+ TestCaseResult(
+ case_id="spec_list",
+ passed=passed,
+ expected="2 specs",
+ actual=f"{len(specs)} specs",
+ duration_ms=round(elapsed_ms, 2),
+ )
+ )
+ if verbose:
+ console.print(f" {'[green]✓[/green]' if passed else '[red]✗[/red]'} spec_list")
+
+ # Test 6: delete
+ start = time.perf_counter()
+ deleted = manager.delete("spec-002")
+ remaining = manager.list_specs()
+ elapsed_ms = (time.perf_counter() - start) * 1000
+ passed = deleted and len(remaining) == 1
+ result.add(
+ TestCaseResult(
+ case_id="spec_delete",
+ passed=passed,
+ expected="deleted, 1 remaining",
+ actual=f"deleted={deleted}, remaining={len(remaining)}",
+ duration_ms=round(elapsed_ms, 2),
+ )
+ )
+ if verbose:
+ console.print(f" {'[green]✓[/green]' if passed else '[red]✗[/red]'} spec_delete")
+
+ # Test 7: get nonexistent
+ start = time.perf_counter()
+ missing = manager.get("nonexistent")
+ elapsed_ms = (time.perf_counter() - start) * 1000
+ passed = missing is None
+ result.add(
+ TestCaseResult(
+ case_id="spec_get_missing",
+ passed=passed,
+ expected="None",
+ actual=f"{missing}",
+ duration_ms=round(elapsed_ms, 2),
+ )
+ )
+ if verbose:
+ console.print(f" {'[green]✓[/green]' if passed else '[red]✗[/red]'} spec_get_missing")
+
+ if fast:
+ core_ids = {"spec_create", "spec_get", "spec_confirm", "spec_delete"}
+ result.details = [d for d in result.details if d.case_id in core_ids]
+ result.total = len(result.details)
+ result.passed = sum(1 for d in result.details if d.passed)
+ result.failed = result.total - result.passed
+
+ return result
+
+
+async def _run_verification(fast: bool, verbose: bool, tmp_dir: Path) -> DimensionResult:
+ """Test VerificationLoop execute/retry behavior."""
+ from agentkit.core.verification_loop import VerificationLoop
+
+ result = DimensionResult(dimension="verification")
+
+ # Test 1: passing command
+ start = time.perf_counter()
+ loop_pass = VerificationLoop(
+ commands=["true"],
+ max_retries=0,
+ working_dir=str(tmp_dir),
+ timeout=5.0,
+ )
+ res = await loop_pass.verify()
+ elapsed_ms = (time.perf_counter() - start) * 1000
+ passed = res.passed and res.attempts == 1
+ result.add(
+ TestCaseResult(
+ case_id="verify_pass",
+ passed=passed,
+ expected="passed=True, attempts=1",
+ actual=f"passed={res.passed}, attempts={res.attempts}",
+ duration_ms=round(elapsed_ms, 2),
+ )
+ )
+ if verbose:
+ console.print(f" {'[green]✓[/green]' if passed else '[red]✗[/red]'} verify_pass")
+
+ # Test 2: failing command
+ start = time.perf_counter()
+ loop_fail = VerificationLoop(
+ commands=["false"],
+ max_retries=0,
+ working_dir=str(tmp_dir),
+ timeout=5.0,
+ )
+ res = await loop_fail.verify()
+ elapsed_ms = (time.perf_counter() - start) * 1000
+ passed = not res.passed and len(res.errors) > 0
+ result.add(
+ TestCaseResult(
+ case_id="verify_fail",
+ passed=passed,
+ expected="passed=False, has errors",
+ actual=f"passed={res.passed}, errors={len(res.errors)}",
+ duration_ms=round(elapsed_ms, 2),
+ )
+ )
+ if verbose:
+ console.print(f" {'[green]✓[/green]' if passed else '[red]✗[/red]'} verify_fail")
+
+ # Test 3: retry with fix callback
+ start = time.perf_counter()
+ call_count = 0
+
+ async def _fix_callback(errors: list[str], output: str) -> None:
+ nonlocal call_count
+ call_count += 1
+
+ # Use a command that always fails to test retry logic
+ loop_retry = VerificationLoop(
+ commands=["false"],
+ max_retries=2,
+ working_dir=str(tmp_dir),
+ timeout=5.0,
+ )
+ res = await loop_retry.verify_and_retry(fix_callback=_fix_callback)
+ elapsed_ms = (time.perf_counter() - start) * 1000
+ passed = not res.passed and res.attempts == 3 and call_count == 2
+ result.add(
+ TestCaseResult(
+ case_id="verify_retry",
+ passed=passed,
+ expected="attempts=3, fix_callback called 2x",
+ actual=f"attempts={res.attempts}, callbacks={call_count}",
+ duration_ms=round(elapsed_ms, 2),
+ )
+ )
+ if verbose:
+ console.print(f" {'[green]✓[/green]' if passed else '[red]✗[/red]'} verify_retry")
+
+ # Test 4: timeout
+ start = time.perf_counter()
+ loop_timeout = VerificationLoop(
+ commands=["sleep 10"],
+ max_retries=0,
+ working_dir=str(tmp_dir),
+ timeout=0.5,
+ )
+ res = await loop_timeout.verify()
+ elapsed_ms = (time.perf_counter() - start) * 1000
+ passed = not res.passed and any("timed out" in e.lower() for e in res.errors)
+ result.add(
+ TestCaseResult(
+ case_id="verify_timeout",
+ passed=passed,
+ expected="timeout error",
+ actual=f"passed={res.passed}, errors={len(res.errors)}",
+ duration_ms=round(elapsed_ms, 2),
+ )
+ )
+ if verbose:
+ console.print(f" {'[green]✓[/green]' if passed else '[red]✗[/red]'} verify_timeout")
+
+ # Test 5: multiple commands (one passes, one fails)
+ start = time.perf_counter()
+ loop_multi = VerificationLoop(
+ commands=["true", "false"],
+ max_retries=0,
+ working_dir=str(tmp_dir),
+ timeout=5.0,
+ )
+ res = await loop_multi.verify()
+ elapsed_ms = (time.perf_counter() - start) * 1000
+ passed = not res.passed and "false" in res.test_output
+ result.add(
+ TestCaseResult(
+ case_id="verify_multi_command",
+ passed=passed,
+ expected="overall fail, output has both commands",
+ actual=f"passed={res.passed}",
+ duration_ms=round(elapsed_ms, 2),
+ )
+ )
+ if verbose:
+ console.print(f" {'[green]✓[/green]' if passed else '[red]✗[/red]'} verify_multi_command")
+
+ if fast:
+ core_ids = {"verify_pass", "verify_fail", "verify_retry"}
+ result.details = [d for d in result.details if d.case_id in core_ids]
+ result.total = len(result.details)
+ result.passed = sum(1 for d in result.details if d.passed)
+ result.failed = result.total - result.passed
+
+ return result
+
+
+# ---------------------------------------------------------------------------
+# Report generation
+# ---------------------------------------------------------------------------
+
+
+def _generate_json_report(
+ report_data: dict[str, Any],
+ output_path: Path,
+) -> None:
+ output_path.parent.mkdir(parents=True, exist_ok=True)
+ output_path.write_text(
+ json.dumps(report_data, indent=2, ensure_ascii=False),
+ encoding="utf-8",
+ )
+
+
+def _generate_txt_report(
+ report_data: dict[str, Any],
+ output_path: Path,
+) -> None:
+ output_path.parent.mkdir(parents=True, exist_ok=True)
+
+ lines: list[str] = []
+ lines.append("=" * 70)
+ lines.append("AgentKit Benchmark Report")
+ lines.append("=" * 70)
+ lines.append(f"Timestamp: {report_data['timestamp']}")
+ lines.append(f"Version: {report_data['version']}")
+ lines.append(f"Overall Score: {report_data['overall_score']:.1%}")
+ lines.append(f"Summary: {report_data['summary']}")
+ lines.append("")
+
+ lines.append("-" * 70)
+ lines.append(f"{'Dimension':<20} {'Total':>6} {'Pass':>6} {'Fail':>6} {'Score':>8}")
+ lines.append("-" * 70)
+
+ total_all = 0
+ pass_all = 0
+ fail_all = 0
+
+ for dim_name, dim_data in report_data["dimensions"].items():
+ total = dim_data["total"]
+ passed = dim_data["passed"]
+ failed = dim_data["failed"]
+ score = dim_data["score"]
+ lines.append(
+ f"{dim_name:<20} {total:>6} {passed:>6} {failed:>6} {score:>7.1%}"
+ )
+ total_all += total
+ pass_all += passed
+ fail_all += failed
+
+ lines.append("-" * 70)
+ overall = pass_all / total_all if total_all > 0 else 0.0
+ lines.append(
+ f"{'OVERALL':<20} {total_all:>6} {pass_all:>6} {fail_all:>6} {overall:>7.1%}"
+ )
+ lines.append("=" * 70)
+ lines.append("")
+
+ # Detailed failures
+ has_failures = False
+ for dim_name, dim_data in report_data["dimensions"].items():
+ failures = [d for d in dim_data["details"] if not d["passed"]]
+ if failures:
+ if not has_failures:
+ lines.append("Failed Cases:")
+ lines.append("-" * 70)
+ has_failures = True
+ for f in failures:
+ lines.append(f" [{dim_name}] {f['case_id']}")
+ lines.append(f" expected: {f['expected']}")
+ lines.append(f" actual: {f['actual']}")
+ if f.get("detail"):
+ lines.append(f" detail: {f['detail']}")
+ lines.append("")
+
+ if not has_failures:
+ lines.append("All tests passed — no failures to report.")
+ lines.append("")
+
+ output_path.write_text("\n".join(lines), encoding="utf-8")
+
+
+def _generate_html_report(
+ report_data: dict[str, Any],
+ output_path: Path,
+) -> None:
+ output_path.parent.mkdir(parents=True, exist_ok=True)
+
+ rows_html: list[str] = []
+ total_all = 0
+ pass_all = 0
+ fail_all = 0
+
+ for dim_name, dim_data in report_data["dimensions"].items():
+ total = dim_data["total"]
+ passed = dim_data["passed"]
+ failed = dim_data["failed"]
+ score = dim_data["score"]
+ total_all += total
+ pass_all += passed
+ fail_all += failed
+
+ score_class = "score-good" if score >= 0.9 else "score-warn" if score >= 0.7 else "score-bad"
+ rows_html.append(
+ f"
"
+ f"| {dim_name} | "
+ f"{total} | "
+ f"{passed} | "
+ f"{failed} | "
+ f"{score:.1%} | "
+ f"
"
+ )
+
+ overall = pass_all / total_all if total_all > 0 else 0.0
+ overall_class = (
+ "score-good" if overall >= 0.9 else "score-warn" if overall >= 0.7 else "score-bad"
+ )
+ rows_html.append(
+ f""
+ f"| OVERALL | "
+ f"{total_all} | "
+ f"{pass_all} | "
+ f"{fail_all} | "
+ f"{overall:.1%} | "
+ f"
"
+ )
+
+ # Failure details
+ failure_html: list[str] = []
+ for dim_name, dim_data in report_data["dimensions"].items():
+ failures = [d for d in dim_data["details"] if not d["passed"]]
+ for f in failures:
+ failure_html.append(
+ f""
+ f"
[{dim_name}] "
+ f"
{f['case_id']}"
+ f"
expected: {f['expected']}
"
+ f"
actual: {f['actual']}
"
+ f"
"
+ )
+
+ failures_section = (
+ "Failed Cases
" + "".join(failure_html)
+ if failure_html
+ else "All tests passed.
"
+ )
+
+ html = f"""
+
+
+
+AgentKit Benchmark Report
+
+
+
+AgentKit Benchmark Report
+
+Dimension Results
+
+| Dimension | Total | Pass | Fail | Score |
+
+{"".join(rows_html)}
+
+
+{failures_section}
+
+"""
+
+ output_path.write_text(html, encoding="utf-8")
+
+
+# ---------------------------------------------------------------------------
+# Main command
+# ---------------------------------------------------------------------------
+
+
+def _get_version() -> str:
+ try:
+ from importlib.metadata import version as get_version
+
+ return get_version("fischer-agentkit")
+ except Exception:
+ return "0.1.0 (dev)"
+
+
+def _build_summary_table(results: dict[str, DimensionResult]) -> Table:
+ table = Table(title="AgentKit Benchmark Results", show_lines=True)
+ table.add_column("Dimension", style="cyan", no_wrap=True)
+ table.add_column("Total", justify="right", style="white")
+ table.add_column("Pass", justify="right", style="green")
+ table.add_column("Fail", justify="right", style="red")
+ table.add_column("Score", justify="right", style="magenta")
+
+ total_all = 0
+ pass_all = 0
+ fail_all = 0
+
+ for dim_name, dim_result in results.items():
+ table.add_row(
+ dim_name,
+ str(dim_result.total),
+ str(dim_result.passed),
+ str(dim_result.failed),
+ f"{dim_result.score:.1%}",
+ )
+ total_all += dim_result.total
+ pass_all += dim_result.passed
+ fail_all += dim_result.failed
+
+ overall = pass_all / total_all if total_all > 0 else 0.0
+ table.add_row(
+ "[bold]OVERALL[/bold]",
+ f"[bold]{total_all}[/bold]",
+ f"[bold green]{pass_all}[/bold green]",
+ f"[bold red]{fail_all}[/bold red]",
+ f"[bold magenta]{overall:.1%}[/bold magenta]",
+ )
+
+ return table
+
+
+def benchmark(
+ dimension: BenchmarkDimension = typer.Option(
+ BenchmarkDimension.ALL,
+ "--dimension",
+ "-d",
+ help="Benchmark dimension to run (default: all)",
+ ),
+ report: bool = typer.Option(False, "--report", help="Generate JSON + TXT report files"),
+ format: str = typer.Option(
+ "json",
+ "--format",
+ "-f",
+ help="Report format: json, txt, or html (use with --report)",
+ ),
+ output_dir: str = typer.Option(
+ _DEFAULT_OUTPUT_DIR,
+ "--output-dir",
+ "-o",
+ help="Directory for report output files",
+ ),
+ fast: bool = typer.Option(False, "--fast", help="Run only core test cases"),
+ verbose: bool = typer.Option(False, "--verbose", "-v", help="Show detailed output"),
+):
+ """Run AgentKit capability benchmarks and generate reports.
+
+ Tests core components directly (no LLM, no pytest subprocess):
+ preprocessing, overfitting, efficiency, tool_search, event_model,
+ spec_management, verification.
+ """
+ import tempfile
+
+ # Normalize dimension to enum (Typer may pass string)
+ if isinstance(dimension, str):
+ dimension = BenchmarkDimension(dimension)
+
+ console.print()
+ console.print(
+ Panel.fit(
+ "[bold cyan]AgentKit Benchmark[/bold cyan]\n"
+ f"Dimension: [yellow]{dimension.value}[/yellow] "
+ f"Fast: [yellow]{fast}[/yellow] "
+ f"Verbose: [yellow]{verbose}[/yellow]",
+ border_style="cyan",
+ )
+ )
+ console.print()
+
+ # Determine which dimensions to run
+ if dimension == BenchmarkDimension.ALL:
+ dims_to_run = [
+ BenchmarkDimension.PREPROCESSING,
+ BenchmarkDimension.OVERFITTING,
+ BenchmarkDimension.EFFICIENCY,
+ BenchmarkDimension.TOOL_SEARCH,
+ BenchmarkDimension.EVENT_MODEL,
+ BenchmarkDimension.SPEC_MANAGEMENT,
+ BenchmarkDimension.VERIFICATION,
+ ]
+ else:
+ dims_to_run = [dimension]
+
+ # Map dimension enum to runner functions
+ runner_map: dict[BenchmarkDimension, Any] = {
+ BenchmarkDimension.PREPROCESSING: _run_preprocessing,
+ BenchmarkDimension.OVERFITTING: _run_overfitting,
+ BenchmarkDimension.EFFICIENCY: _run_efficiency,
+ BenchmarkDimension.TOOL_SEARCH: _run_tool_search,
+ BenchmarkDimension.EVENT_MODEL: _run_event_model,
+ BenchmarkDimension.SPEC_MANAGEMENT: _run_spec_management,
+ BenchmarkDimension.VERIFICATION: _run_verification,
+ }
+
+ results: dict[str, DimensionResult] = {}
+
+ with tempfile.TemporaryDirectory(prefix="agentkit-benchmark-") as tmp:
+ tmp_path = Path(tmp)
+
+ with Progress(
+ SpinnerColumn(),
+ TextColumn("[progress.description]{task.description}"),
+ BarColumn(),
+ TaskProgressColumn(),
+ console=console,
+ ) as progress:
+ for dim in dims_to_run:
+ task = progress.add_task(
+ f"Running {dim.value}...", total=None
+ )
+ runner = runner_map[dim]
+
+ # spec_management and verification need tmp_path
+ if dim in (BenchmarkDimension.SPEC_MANAGEMENT, BenchmarkDimension.VERIFICATION):
+ dim_result = asyncio.run(runner(fast, verbose, tmp_path))
+ else:
+ dim_result = asyncio.run(runner(fast, verbose))
+
+ results[dim.value] = dim_result
+ progress.update(task, completed=True, total=1)
+
+ # Display summary table
+ console.print()
+ table = _build_summary_table(results)
+ console.print(table)
+ console.print()
+
+ # Compute overall
+ total_all = sum(r.total for r in results.values())
+ pass_all = sum(r.passed for r in results.values())
+ fail_all = sum(r.failed for r in results.values())
+ overall_score = pass_all / total_all if total_all > 0 else 0.0
+
+ if fail_all == 0:
+ summary = f"All {pass_all} tests passed across {len(results)} dimensions."
+ console.print(f"[bold green]✓ {summary}[/bold green]")
+ else:
+ summary = (
+ f"{pass_all}/{total_all} tests passed ({fail_all} failed) "
+ f"across {len(results)} dimensions."
+ )
+ console.print(f"[bold yellow]⚠ {summary}[/bold yellow]")
+
+ console.print()
+
+ # Generate reports
+ if report:
+ out_path = Path(output_dir)
+ out_path.mkdir(parents=True, exist_ok=True)
+
+ timestamp = datetime.now(timezone.utc).isoformat()
+ version = _get_version()
+
+ report_data: dict[str, Any] = {
+ "timestamp": timestamp,
+ "version": version,
+ "dimensions": {name: r.to_dict() for name, r in results.items()},
+ "overall_score": round(overall_score, 4),
+ "summary": summary,
+ }
+
+ # Always generate JSON
+ json_path = out_path / "benchmark_report.json"
+ _generate_json_report(report_data, json_path)
+ console.print(f"[green]JSON report:[/green] {json_path}")
+
+ # Always generate TXT
+ txt_path = out_path / "benchmark_report.txt"
+ _generate_txt_report(report_data, txt_path)
+ console.print(f"[green]TXT report:[/green] {txt_path}")
+
+ # Generate HTML if requested
+ if format.lower() == "html":
+ html_path = out_path / "benchmark_report.html"
+ _generate_html_report(report_data, html_path)
+ console.print(f"[green]HTML report:[/green] {html_path}")
+
+ console.print()
+
+ # Exit with non-zero code if any tests failed
+ if fail_all > 0:
+ raise typer.Exit(code=1)
diff --git a/src/agentkit/cli/main.py b/src/agentkit/cli/main.py
index 483e6a9..60ddadb 100644
--- a/src/agentkit/cli/main.py
+++ b/src/agentkit/cli/main.py
@@ -35,6 +35,10 @@ from agentkit.cli.chat import chat # noqa: E402
app.command(name="chat")(chat)
+from agentkit.cli.benchmark import benchmark # noqa: E402
+
+app.command(name="benchmark")(benchmark)
+
@app.command()
def gui(
diff --git a/test-results/benchmark/benchmark_report.html b/test-results/benchmark/benchmark_report.html
new file mode 100644
index 0000000..161f0b3
--- /dev/null
+++ b/test-results/benchmark/benchmark_report.html
@@ -0,0 +1,44 @@
+
+
+
+
+AgentKit Benchmark Report
+
+
+
+AgentKit Benchmark Report
+
+Dimension Results
+
+| Dimension | Total | Pass | Fail | Score |
+
+| preprocessing | 15 | 14 | 1 | 93.3% |
| overfitting | 3 | 3 | 0 | 100.0% |
| efficiency | 5 | 5 | 0 | 100.0% |
| tool_search | 10 | 10 | 0 | 100.0% |
| event_model | 6 | 6 | 0 | 100.0% |
| spec_management | 7 | 7 | 0 | 100.0% |
| verification | 5 | 5 | 0 | 100.0% |
| OVERALL | 51 | 50 | 1 | 98.0% |
+
+
+Failed Cases
[preprocessing] skill_prefix_directexpected: skill_react
actual: direct_chat
+
+
\ No newline at end of file
diff --git a/test-results/benchmark/benchmark_report.json b/test-results/benchmark/benchmark_report.json
new file mode 100644
index 0000000..c63b01b
--- /dev/null
+++ b/test-results/benchmark/benchmark_report.json
@@ -0,0 +1,472 @@
+{
+ "timestamp": "2026-06-17T03:26:25.072956+00:00",
+ "version": "0.1.0",
+ "dimensions": {
+ "preprocessing": {
+ "score": 0.9333,
+ "total": 15,
+ "passed": 14,
+ "failed": 1,
+ "details": [
+ {
+ "case_id": "greeting_cn",
+ "passed": true,
+ "expected": "direct_chat",
+ "actual": "direct_chat",
+ "duration_ms": 0.03,
+ "detail": "input='你好' method=regex_direct"
+ },
+ {
+ "case_id": "greeting_en",
+ "passed": true,
+ "expected": "direct_chat",
+ "actual": "direct_chat",
+ "duration_ms": 0.02,
+ "detail": "input='hello' method=regex_direct"
+ },
+ {
+ "case_id": "chitchat_thanks",
+ "passed": true,
+ "expected": "direct_chat",
+ "actual": "direct_chat",
+ "duration_ms": 0.01,
+ "detail": "input='谢谢' method=regex_direct"
+ },
+ {
+ "case_id": "identity_who",
+ "passed": true,
+ "expected": "direct_chat",
+ "actual": "direct_chat",
+ "duration_ms": 0.02,
+ "detail": "input='你是谁' method=regex_direct"
+ },
+ {
+ "case_id": "colloquial_ip_1",
+ "passed": true,
+ "expected": "react",
+ "actual": "react",
+ "duration_ms": 0.02,
+ "detail": "input='查下ip' method=default_react"
+ },
+ {
+ "case_id": "colloquial_ip_2",
+ "passed": true,
+ "expected": "react",
+ "actual": "react",
+ "duration_ms": 0.01,
+ "detail": "input='查看当前ip' method=default_react"
+ },
+ {
+ "case_id": "tool_search",
+ "passed": true,
+ "expected": "react",
+ "actual": "react",
+ "duration_ms": 0.01,
+ "detail": "input='搜索golang教程' method=default_react"
+ },
+ {
+ "case_id": "tool_shell",
+ "passed": true,
+ "expected": "react",
+ "actual": "react",
+ "duration_ms": 0.01,
+ "detail": "input='执行ls命令' method=default_react"
+ },
+ {
+ "case_id": "translation",
+ "passed": true,
+ "expected": "react",
+ "actual": "react",
+ "duration_ms": 0.01,
+ "detail": "input='翻译hello为中文' method=default_react"
+ },
+ {
+ "case_id": "knowledge",
+ "passed": true,
+ "expected": "react",
+ "actual": "react",
+ "duration_ms": 0.01,
+ "detail": "input='什么是机器学习' method=default_react"
+ },
+ {
+ "case_id": "skill_prefix_react",
+ "passed": true,
+ "expected": "skill_react",
+ "actual": "skill_react",
+ "duration_ms": 0.03,
+ "detail": "input='@skill:react_agent 查看ip' method=skill_prefix"
+ },
+ {
+ "case_id": "skill_prefix_direct",
+ "passed": false,
+ "expected": "skill_react",
+ "actual": "direct_chat",
+ "duration_ms": 0.02,
+ "detail": "input='@skill:chat_only 你好' method=skill_prefix"
+ },
+ {
+ "case_id": "skill_not_found",
+ "passed": true,
+ "expected": "react",
+ "actual": "react",
+ "duration_ms": 0.13,
+ "detail": "input='@skill:nonexistent 做点什么' method=skill_not_found_fallback"
+ },
+ {
+ "case_id": "complex_analysis",
+ "passed": true,
+ "expected": "react",
+ "actual": "react",
+ "duration_ms": 0.01,
+ "detail": "input='帮我分析一下这个数据并生成报告' method=default_react"
+ },
+ {
+ "case_id": "empty_fallback",
+ "passed": true,
+ "expected": "react",
+ "actual": "react",
+ "duration_ms": 0.01,
+ "detail": "input='随便聊聊' method=default_react"
+ }
+ ]
+ },
+ "overfitting": {
+ "score": 1.0,
+ "total": 3,
+ "passed": 3,
+ "failed": 0,
+ "details": [
+ {
+ "case_id": "ip_check_variants",
+ "passed": true,
+ "expected": "react",
+ "actual": "react,react,react,react,react",
+ "duration_ms": 0.0,
+ "detail": "paraphrases=5 consistent=True"
+ },
+ {
+ "case_id": "search_variants",
+ "passed": true,
+ "expected": "react",
+ "actual": "react,react,react",
+ "duration_ms": 0.0,
+ "detail": "paraphrases=3 consistent=True"
+ },
+ {
+ "case_id": "greeting_variants",
+ "passed": true,
+ "expected": "direct_chat",
+ "actual": "direct_chat,direct_chat,direct_chat,direct_chat,direct_chat",
+ "duration_ms": 0.0,
+ "detail": "paraphrases=5 consistent=True"
+ }
+ ]
+ },
+ "efficiency": {
+ "score": 1.0,
+ "total": 5,
+ "passed": 5,
+ "failed": 0,
+ "details": [
+ {
+ "case_id": "preprocess_greeting",
+ "passed": true,
+ "expected": "<= 50.0ms/call",
+ "actual": "0.004ms/call",
+ "duration_ms": 0.44,
+ "detail": "iterations=100"
+ },
+ {
+ "case_id": "preprocess_react",
+ "passed": true,
+ "expected": "<= 50.0ms/call",
+ "actual": "0.004ms/call",
+ "duration_ms": 0.38,
+ "detail": "iterations=100"
+ },
+ {
+ "case_id": "preprocess_skill_prefix",
+ "passed": true,
+ "expected": "<= 50.0ms/call",
+ "actual": "0.005ms/call",
+ "duration_ms": 0.51,
+ "detail": "iterations=100"
+ },
+ {
+ "case_id": "tool_search_query",
+ "passed": true,
+ "expected": "<= 10.0ms/call",
+ "actual": "0.008ms/call",
+ "duration_ms": 1.69,
+ "detail": "iterations=200"
+ },
+ {
+ "case_id": "tool_search_empty",
+ "passed": true,
+ "expected": "<= 5.0ms/call",
+ "actual": "0.000ms/call",
+ "duration_ms": 0.08,
+ "detail": "iterations=200"
+ }
+ ]
+ },
+ "tool_search": {
+ "score": 1.0,
+ "total": 10,
+ "passed": 10,
+ "failed": 0,
+ "details": [
+ {
+ "case_id": "read_file_query",
+ "passed": true,
+ "expected": "read_file",
+ "actual": "read_file",
+ "duration_ms": 0.02,
+ "detail": "query='read file' top_k=5 results=2"
+ },
+ {
+ "case_id": "write_file_query",
+ "passed": true,
+ "expected": "write_file",
+ "actual": "write_file",
+ "duration_ms": 0.02,
+ "detail": "query='write file content' top_k=5 results=2"
+ },
+ {
+ "case_id": "web_search_query",
+ "passed": true,
+ "expected": "web_search",
+ "actual": "web_search",
+ "duration_ms": 0.02,
+ "detail": "query='search web information' top_k=5 results=2"
+ },
+ {
+ "case_id": "shell_exec_query",
+ "passed": true,
+ "expected": "shell_exec",
+ "actual": "shell_exec",
+ "duration_ms": 0.02,
+ "detail": "query='execute shell command' top_k=5 results=1"
+ },
+ {
+ "case_id": "http_request_query",
+ "passed": true,
+ "expected": "http_request",
+ "actual": "http_request",
+ "duration_ms": 0.03,
+ "detail": "query='send http request url' top_k=5 results=1"
+ },
+ {
+ "case_id": "file_tag_query",
+ "passed": true,
+ "expected": "read_file",
+ "actual": "read_file",
+ "duration_ms": 0.02,
+ "detail": "query='io file' top_k=5 results=2"
+ },
+ {
+ "case_id": "empty_query",
+ "passed": true,
+ "expected": "__none__",
+ "actual": "[]",
+ "duration_ms": 0.0,
+ "detail": "query='' top_k=5 results=0"
+ },
+ {
+ "case_id": "no_match_query",
+ "passed": true,
+ "expected": "__none__",
+ "actual": "[]",
+ "duration_ms": 0.01,
+ "detail": "query='zzzznonexistent' top_k=5 results=0"
+ },
+ {
+ "case_id": "top_k_limit",
+ "passed": true,
+ "expected": "read_file",
+ "actual": "read_file",
+ "duration_ms": 0.02,
+ "detail": "query='file' top_k=1 results=1"
+ },
+ {
+ "case_id": "multi_token_query",
+ "passed": true,
+ "expected": "web_search",
+ "actual": "web_search",
+ "duration_ms": 0.03,
+ "detail": "query='search query engine' top_k=5 results=1"
+ }
+ ]
+ },
+ "event_model": {
+ "score": 1.0,
+ "total": 6,
+ "passed": 6,
+ "failed": 0,
+ "details": [
+ {
+ "case_id": "sq_submit_drain",
+ "passed": true,
+ "expected": "task_id + drained=['hello']",
+ "actual": "task_id=571839fb... drained=['hello']",
+ "duration_ms": 0.1,
+ "detail": ""
+ },
+ {
+ "case_id": "sq_cancel",
+ "passed": true,
+ "expected": "cancelled=True",
+ "actual": "cancelled=True",
+ "duration_ms": 0.04,
+ "detail": ""
+ },
+ {
+ "case_id": "sq_close_blocks",
+ "passed": true,
+ "expected": "RuntimeError on submit after close",
+ "actual": "raised=True closed=True",
+ "duration_ms": 0.02,
+ "detail": ""
+ },
+ {
+ "case_id": "eq_emit_subscribe_replay",
+ "passed": true,
+ "expected": "1 event replayed",
+ "actual": "1 events",
+ "duration_ms": 0.07,
+ "detail": ""
+ },
+ {
+ "case_id": "eq_close_sentinel",
+ "passed": true,
+ "expected": "subscriber exits on close",
+ "actual": "1 events, closed=True",
+ "duration_ms": 21.59,
+ "detail": ""
+ },
+ {
+ "case_id": "eq_subscriber_count",
+ "passed": true,
+ "expected": "0 subscribers initially",
+ "actual": "0 subscribers",
+ "duration_ms": 0.01,
+ "detail": ""
+ }
+ ]
+ },
+ "spec_management": {
+ "score": 1.0,
+ "total": 7,
+ "passed": 7,
+ "failed": 0,
+ "details": [
+ {
+ "case_id": "spec_create",
+ "passed": true,
+ "expected": "file exists on disk",
+ "actual": "exists=True",
+ "duration_ms": 2.24,
+ "detail": ""
+ },
+ {
+ "case_id": "spec_get",
+ "passed": true,
+ "expected": "spec with 2 steps",
+ "actual": "steps=2",
+ "duration_ms": 0.0,
+ "detail": ""
+ },
+ {
+ "case_id": "spec_update",
+ "passed": true,
+ "expected": "goal='Updated goal'",
+ "actual": "goal=Updated goal",
+ "duration_ms": 1.75,
+ "detail": ""
+ },
+ {
+ "case_id": "spec_confirm",
+ "passed": true,
+ "expected": "status=confirmed, all steps confirmed",
+ "actual": "status=confirmed",
+ "duration_ms": 1.86,
+ "detail": ""
+ },
+ {
+ "case_id": "spec_list",
+ "passed": true,
+ "expected": "2 specs",
+ "actual": "2 specs",
+ "duration_ms": 4.92,
+ "detail": ""
+ },
+ {
+ "case_id": "spec_delete",
+ "passed": true,
+ "expected": "deleted, 1 remaining",
+ "actual": "deleted=True, remaining=1",
+ "duration_ms": 1.94,
+ "detail": ""
+ },
+ {
+ "case_id": "spec_get_missing",
+ "passed": true,
+ "expected": "None",
+ "actual": "None",
+ "duration_ms": 0.06,
+ "detail": ""
+ }
+ ]
+ },
+ "verification": {
+ "score": 1.0,
+ "total": 5,
+ "passed": 5,
+ "failed": 0,
+ "details": [
+ {
+ "case_id": "verify_pass",
+ "passed": true,
+ "expected": "passed=True, attempts=1",
+ "actual": "passed=True, attempts=1",
+ "duration_ms": 11.82,
+ "detail": ""
+ },
+ {
+ "case_id": "verify_fail",
+ "passed": true,
+ "expected": "passed=False, has errors",
+ "actual": "passed=False, errors=1",
+ "duration_ms": 9.8,
+ "detail": ""
+ },
+ {
+ "case_id": "verify_retry",
+ "passed": true,
+ "expected": "attempts=3, fix_callback called 2x",
+ "actual": "attempts=3, callbacks=2",
+ "duration_ms": 33.87,
+ "detail": ""
+ },
+ {
+ "case_id": "verify_timeout",
+ "passed": true,
+ "expected": "timeout error",
+ "actual": "passed=False, errors=1",
+ "duration_ms": 506.8,
+ "detail": ""
+ },
+ {
+ "case_id": "verify_multi_command",
+ "passed": true,
+ "expected": "overall fail, output has both commands",
+ "actual": "passed=False",
+ "duration_ms": 23.12,
+ "detail": ""
+ }
+ ]
+ }
+ },
+ "overall_score": 0.9804,
+ "summary": "50/51 tests passed (1 failed) across 7 dimensions."
+}
\ No newline at end of file
diff --git a/test-results/benchmark/benchmark_report.txt b/test-results/benchmark/benchmark_report.txt
new file mode 100644
index 0000000..7b8c1f0
--- /dev/null
+++ b/test-results/benchmark/benchmark_report.txt
@@ -0,0 +1,28 @@
+======================================================================
+AgentKit Benchmark Report
+======================================================================
+Timestamp: 2026-06-17T03:26:25.072956+00:00
+Version: 0.1.0
+Overall Score: 98.0%
+Summary: 50/51 tests passed (1 failed) across 7 dimensions.
+
+----------------------------------------------------------------------
+Dimension Total Pass Fail Score
+----------------------------------------------------------------------
+preprocessing 15 14 1 93.3%
+overfitting 3 3 0 100.0%
+efficiency 5 5 0 100.0%
+tool_search 10 10 0 100.0%
+event_model 6 6 0 100.0%
+spec_management 7 7 0 100.0%
+verification 5 5 0 100.0%
+----------------------------------------------------------------------
+OVERALL 51 50 1 98.0%
+======================================================================
+
+Failed Cases:
+----------------------------------------------------------------------
+ [preprocessing] skill_prefix_direct
+ expected: skill_react
+ actual: direct_chat
+ detail: input='@skill:chat_only 你好' method=skill_prefix
diff --git a/test-results/e2e/comprehensive_report.json b/test-results/e2e/comprehensive_report.json
new file mode 100644
index 0000000..df1b379
--- /dev/null
+++ b/test-results/e2e/comprehensive_report.json
@@ -0,0 +1,334 @@
+{
+ "report_type": "comprehensive_capability_backtest",
+ "generated_at": "2026-06-17T03:22:42.152439+00:00",
+ "total_score": 100.0,
+ "total_cases": 50,
+ "total_passed": 50,
+ "dimension_scores": {
+ "preprocessing_accuracy": 100.0,
+ "skill_recall": 100.0,
+ "overfitting_detection": 100.0,
+ "execution_efficiency": 100.0,
+ "tool_search_accuracy": 100.0,
+ "event_model_integrity": 100.0,
+ "spec_management": 100.0,
+ "verification_loop": 100.0
+ },
+ "dimension_details": {
+ "preprocessing_accuracy": {
+ "total": 17,
+ "passed": 17,
+ "score": 100.0,
+ "cases": [
+ {
+ "case_id": "greeting_cn",
+ "passed": true,
+ "input": "你好",
+ "expected": "direct_chat",
+ "actual": "direct_chat"
+ },
+ {
+ "case_id": "greeting_en",
+ "passed": true,
+ "input": "hello",
+ "expected": "direct_chat",
+ "actual": "direct_chat"
+ },
+ {
+ "case_id": "greeting_hi",
+ "passed": true,
+ "input": "hi",
+ "expected": "direct_chat",
+ "actual": "direct_chat"
+ },
+ {
+ "case_id": "chitchat_thanks",
+ "passed": true,
+ "input": "谢谢",
+ "expected": "direct_chat",
+ "actual": "direct_chat"
+ },
+ {
+ "case_id": "chitchat_ok",
+ "passed": true,
+ "input": "好的",
+ "expected": "direct_chat",
+ "actual": "direct_chat"
+ },
+ {
+ "case_id": "identity_who",
+ "passed": true,
+ "input": "你是谁",
+ "expected": "direct_chat",
+ "actual": "direct_chat"
+ },
+ {
+ "case_id": "identity_name",
+ "passed": true,
+ "input": "你叫什么",
+ "expected": "direct_chat",
+ "actual": "direct_chat"
+ },
+ {
+ "case_id": "tool_ip",
+ "passed": true,
+ "input": "查下ip",
+ "expected": "react",
+ "actual": "react"
+ },
+ {
+ "case_id": "tool_search",
+ "passed": true,
+ "input": "搜索golang教程",
+ "expected": "react",
+ "actual": "react"
+ },
+ {
+ "case_id": "tool_shell",
+ "passed": true,
+ "input": "执行ls命令",
+ "expected": "react",
+ "actual": "react"
+ },
+ {
+ "case_id": "tool_file",
+ "passed": true,
+ "input": "读一下配置文件",
+ "expected": "react",
+ "actual": "react"
+ },
+ {
+ "case_id": "tool_monitor",
+ "passed": true,
+ "input": "检查服务状态",
+ "expected": "react",
+ "actual": "react"
+ },
+ {
+ "case_id": "complex_analysis",
+ "passed": true,
+ "input": "帮我分析一下这个数据并生成报告",
+ "expected": "react",
+ "actual": "react"
+ },
+ {
+ "case_id": "complex_code",
+ "passed": true,
+ "input": "重构这个函数使其更高效",
+ "expected": "react",
+ "actual": "react"
+ },
+ {
+ "case_id": "complex_multi",
+ "passed": true,
+ "input": "搜索最新的AI论文并总结关键发现",
+ "expected": "react",
+ "actual": "react"
+ },
+ {
+ "case_id": "skill_prefix_react",
+ "passed": true,
+ "input": "@skill:react_agent 查看当前ip",
+ "expected": "skill_react",
+ "actual": "skill_react"
+ },
+ {
+ "case_id": "skill_prefix_coder",
+ "passed": true,
+ "input": "@skill:coder 写一个函数",
+ "expected": "skill_react",
+ "actual": "skill_react"
+ }
+ ]
+ },
+ "skill_recall": {
+ "total": 8,
+ "passed": 8,
+ "score": 100.0,
+ "cases": [
+ {
+ "case_id": "recall_valid_react",
+ "passed": true
+ },
+ {
+ "case_id": "recall_valid_coder",
+ "passed": true
+ },
+ {
+ "case_id": "recall_invalid_skill",
+ "passed": true
+ },
+ {
+ "case_id": "recall_no_prefix_react",
+ "passed": true
+ },
+ {
+ "case_id": "recall_no_prefix_greeting",
+ "passed": true
+ },
+ {
+ "case_id": "recall_no_prefix_complex",
+ "passed": true
+ },
+ {
+ "case_id": "recall_skill_only_prefix",
+ "passed": true
+ },
+ {
+ "case_id": "recall_skill_with_long_content",
+ "passed": true
+ }
+ ]
+ },
+ "overfitting_detection": {
+ "total": 5,
+ "passed": 5,
+ "score": 100.0,
+ "cases": [
+ {
+ "case_id": "overfit_ip_check",
+ "passed": true
+ },
+ {
+ "case_id": "overfit_search",
+ "passed": true
+ },
+ {
+ "case_id": "overfit_greeting",
+ "passed": true
+ },
+ {
+ "case_id": "overfit_file_read",
+ "passed": true
+ },
+ {
+ "case_id": "overfit_identity",
+ "passed": true
+ }
+ ]
+ },
+ "execution_efficiency": {
+ "total": 5,
+ "passed": 5,
+ "score": 100.0,
+ "cases": [
+ {
+ "case_id": "efficiency_greeting",
+ "passed": true,
+ "elapsed_ms": 0.41
+ },
+ {
+ "case_id": "efficiency_chitchat",
+ "passed": true,
+ "elapsed_ms": 0.47
+ },
+ {
+ "case_id": "efficiency_identity",
+ "passed": true,
+ "elapsed_ms": 0.48
+ },
+ {
+ "case_id": "efficiency_react_tool",
+ "passed": true,
+ "elapsed_ms": 0.49
+ },
+ {
+ "case_id": "efficiency_react_complex",
+ "passed": true,
+ "elapsed_ms": 0.55
+ }
+ ]
+ },
+ "tool_search_accuracy": {
+ "total": 8,
+ "passed": 8,
+ "score": 100.0,
+ "cases": [
+ {
+ "case_id": "tool_search_read",
+ "passed": true
+ },
+ {
+ "case_id": "tool_search_write",
+ "passed": true
+ },
+ {
+ "case_id": "tool_search_web",
+ "passed": true
+ },
+ {
+ "case_id": "tool_search_shell",
+ "passed": true
+ },
+ {
+ "case_id": "tool_search_tests",
+ "passed": true
+ },
+ {
+ "case_id": "tool_search_file_multiple",
+ "passed": true
+ },
+ {
+ "case_id": "tool_search_no_match",
+ "passed": true
+ },
+ {
+ "case_id": "tool_search_empty_query",
+ "passed": true
+ }
+ ]
+ },
+ "event_model_integrity": {
+ "total": 3,
+ "passed": 3,
+ "score": 100.0,
+ "cases": [
+ {
+ "case_id": "sq_submit_and_drain",
+ "passed": true
+ },
+ {
+ "case_id": "eq_emit_and_subscribe",
+ "passed": true
+ },
+ {
+ "case_id": "event_type_classification",
+ "passed": true
+ }
+ ]
+ },
+ "spec_management": {
+ "total": 2,
+ "passed": 2,
+ "score": 100.0,
+ "cases": [
+ {
+ "case_id": "spec_create_and_get",
+ "passed": true
+ },
+ {
+ "case_id": "spec_confirm",
+ "passed": true
+ }
+ ]
+ },
+ "verification_loop": {
+ "total": 2,
+ "passed": 2,
+ "score": 100.0,
+ "cases": [
+ {
+ "case_id": "verify_success",
+ "passed": true
+ },
+ {
+ "case_id": "verify_failure",
+ "passed": true
+ }
+ ]
+ }
+ },
+ "suggestions": [
+ "所有维度均达到 100%,架构状态良好"
+ ]
+}
\ No newline at end of file
diff --git a/test-results/e2e/comprehensive_report.txt b/test-results/e2e/comprehensive_report.txt
new file mode 100644
index 0000000..8335472
--- /dev/null
+++ b/test-results/e2e/comprehensive_report.txt
@@ -0,0 +1,95 @@
+======================================================================
+Fischer AgentKit 综合能力回测报告
+======================================================================
+生成时间: 2026-06-17T03:22:42.152439+00:00
+总体评分: 100.0%
+用例总数: 50 通过: 50 失败: 0
+
+----------------------------------------------------------------------
+各维度得分
+----------------------------------------------------------------------
+ ✓ 预处理准确度: 100.0% (17/17)
+ ✓ 技能召回率: 100.0% (8/8)
+ ✓ 过拟合检测: 100.0% (5/5)
+ ✓ 执行效率: 100.0% (5/5)
+ ✓ 工具搜索准确度: 100.0% (8/8)
+ ✓ 事件模型完整性: 100.0% (3/3)
+ ✓ Spec 管理功能: 100.0% (2/2)
+ ✓ 验证循环: 100.0% (2/2)
+
+----------------------------------------------------------------------
+详细用例结果
+----------------------------------------------------------------------
+
+[预处理准确度]
+ ✓ greeting_cn
+ ✓ greeting_en
+ ✓ greeting_hi
+ ✓ chitchat_thanks
+ ✓ chitchat_ok
+ ✓ identity_who
+ ✓ identity_name
+ ✓ tool_ip
+ ✓ tool_search
+ ✓ tool_shell
+ ✓ tool_file
+ ✓ tool_monitor
+ ✓ complex_analysis
+ ✓ complex_code
+ ✓ complex_multi
+ ✓ skill_prefix_react
+ ✓ skill_prefix_coder
+
+[技能召回率]
+ ✓ recall_valid_react
+ ✓ recall_valid_coder
+ ✓ recall_invalid_skill
+ ✓ recall_no_prefix_react
+ ✓ recall_no_prefix_greeting
+ ✓ recall_no_prefix_complex
+ ✓ recall_skill_only_prefix
+ ✓ recall_skill_with_long_content
+
+[过拟合检测]
+ ✓ overfit_ip_check
+ ✓ overfit_search
+ ✓ overfit_greeting
+ ✓ overfit_file_read
+ ✓ overfit_identity
+
+[执行效率]
+ ✓ efficiency_greeting
+ ✓ efficiency_chitchat
+ ✓ efficiency_identity
+ ✓ efficiency_react_tool
+ ✓ efficiency_react_complex
+
+[工具搜索准确度]
+ ✓ tool_search_read
+ ✓ tool_search_write
+ ✓ tool_search_web
+ ✓ tool_search_shell
+ ✓ tool_search_tests
+ ✓ tool_search_file_multiple
+ ✓ tool_search_no_match
+ ✓ tool_search_empty_query
+
+[事件模型完整性]
+ ✓ sq_submit_and_drain
+ ✓ eq_emit_and_subscribe
+ ✓ event_type_classification
+
+[Spec 管理功能]
+ ✓ spec_create_and_get
+ ✓ spec_confirm
+
+[验证循环]
+ ✓ verify_success
+ ✓ verify_failure
+
+----------------------------------------------------------------------
+改进建议
+----------------------------------------------------------------------
+ • 所有维度均达到 100%,架构状态良好
+
+======================================================================
\ No newline at end of file
diff --git a/tests/e2e/test_capability_comprehensive.py b/tests/e2e/test_capability_comprehensive.py
new file mode 100644
index 0000000..672fb58
--- /dev/null
+++ b/tests/e2e/test_capability_comprehensive.py
@@ -0,0 +1,1519 @@
+"""Comprehensive Capability Backtest — 综合能力回测脚本
+
+覆盖维度:
+1. 预处理准确度 (Preprocessing Accuracy) — greeting/tool/skill/complex 路由
+2. 召回率 (Recall) — @skill 前缀识别与 fallback
+3. 过拟合检测 (Overfitting Detection) — 同意图不同表达的一致性
+4. 执行效率 (Execution Efficiency) — DIRECT_CHAT/REACT 路径耗时
+5. 工具搜索准确度 (Tool Search Accuracy) — BM25 相关性排序
+6. 事件模型完整性 (Event Model Integrity) — SQ/EQ 双队列
+7. Spec 管理功能 (Spec Management) — CRUD + 确认流程
+8. 验证循环 (Verification Loop) — verify + retry
+
+设计原则:
+- 不依赖真实 LLM 调用(使用 Mock 或直接测试组件接口)
+- 可独立运行(不依赖 E2E 服务器、Redis、PostgreSQL)
+- 标记为 @pytest.mark.e2e_capability
+- 最后生成综合能力报告(JSON + 中文文本)
+"""
+
+from __future__ import annotations
+
+import asyncio
+import json
+import time
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Any
+
+import pytest
+
+from agentkit.chat.request_preprocessor import RequestPreprocessor
+from agentkit.core.event_queue import EventQueue, Submission, SubmissionQueue
+from agentkit.core.protocol import (
+ Event,
+ SessionEventType,
+ TaskEventType,
+ TurnEventType,
+)
+from agentkit.core.spec_manager import Spec, SpecManager, SpecStep
+from agentkit.core.verification_loop import VerificationLoop
+from agentkit.skills.base import Skill, SkillConfig
+from agentkit.skills.registry import SkillRegistry
+from agentkit.tools.base import Tool
+from agentkit.tools.search import ToolSearchIndex
+
+
+# ═══════════════════════════════════════════════════════════════════════════
+# 结果收集器(模块级,跨测试类共享)
+# ═══════════════════════════════════════════════════════════════════════════
+
+
+class _ResultCollector:
+ """收集所有测试结果,用于生成综合报告。"""
+
+ def __init__(self) -> None:
+ self.results: dict[str, list[dict[str, Any]]] = {}
+
+ def record(
+ self,
+ dimension: str,
+ case_id: str,
+ passed: bool,
+ **extra: Any,
+ ) -> None:
+ """记录单条测试结果。"""
+ if dimension not in self.results:
+ self.results[dimension] = []
+ entry: dict[str, Any] = {"case_id": case_id, "passed": passed}
+ entry.update(extra)
+ self.results[dimension].append(entry)
+
+ def dimension_score(self, dimension: str) -> float:
+ """计算某维度的得分(百分比)。"""
+ cases = self.results.get(dimension, [])
+ if not cases:
+ return 0.0
+ passed = sum(1 for c in cases if c["passed"])
+ return passed / len(cases) * 100
+
+ def total_score(self) -> float:
+ """计算总体得分(所有维度的平均通过率)。"""
+ all_cases: list[dict[str, Any]] = []
+ for cases in self.results.values():
+ all_cases.extend(cases)
+ if not all_cases:
+ return 0.0
+ passed = sum(1 for c in all_cases if c["passed"])
+ return passed / len(all_cases) * 100
+
+ def clear(self) -> None:
+ """清空收集器(用于报告测试中重新收集)。"""
+ self.results.clear()
+
+
+_COLLECTOR = _ResultCollector()
+
+
+# ═══════════════════════════════════════════════════════════════════════════
+# 测试辅助工具
+# ═══════════════════════════════════════════════════════════════════════════
+
+
+class _FakeTool(Tool):
+ """测试用的 Fake Tool。"""
+
+ def __init__(
+ self,
+ name: str,
+ description: str,
+ input_schema: dict[str, Any] | None = None,
+ tags: list[str] | None = None,
+ ) -> None:
+ super().__init__(
+ name=name,
+ description=description,
+ input_schema=input_schema,
+ tags=tags or [],
+ )
+
+ async def execute(self, **kwargs: Any) -> dict[str, Any]:
+ return {"status": "ok"}
+
+
+def _build_test_tools() -> list[Tool]:
+ """创建一组测试工具(覆盖 io/file/web/shell/testing 场景)。"""
+ return [
+ _FakeTool(
+ name="read_file",
+ description="Read the contents of a file from the filesystem.",
+ input_schema={
+ "type": "object",
+ "properties": {
+ "path": {"type": "string", "description": "file path to read"},
+ },
+ "required": ["path"],
+ },
+ tags=["io", "file"],
+ ),
+ _FakeTool(
+ name="write_file",
+ description="Write content to a file on the filesystem.",
+ input_schema={
+ "type": "object",
+ "properties": {
+ "path": {"type": "string", "description": "file path to write"},
+ "content": {"type": "string", "description": "content to write"},
+ },
+ "required": ["path", "content"],
+ },
+ tags=["io", "file"],
+ ),
+ _FakeTool(
+ name="web_search",
+ description="Search the web for information using a search engine.",
+ input_schema={
+ "type": "object",
+ "properties": {
+ "query": {"type": "string", "description": "search query"},
+ },
+ "required": ["query"],
+ },
+ tags=["web", "search"],
+ ),
+ _FakeTool(
+ name="shell_exec",
+ description="Execute a shell command and return the output.",
+ input_schema={
+ "type": "object",
+ "properties": {
+ "command": {
+ "type": "string",
+ "description": "shell command to execute",
+ },
+ },
+ "required": ["command"],
+ },
+ tags=["shell", "system"],
+ ),
+ _FakeTool(
+ name="run_tests",
+ description="Run project tests to verify code changes.",
+ input_schema={
+ "type": "object",
+ "properties": {
+ "commands": {
+ "type": "array",
+ "description": "test commands to run",
+ },
+ },
+ },
+ tags=["testing", "verification"],
+ ),
+ ]
+
+
+def _build_mock_skill_registry() -> SkillRegistry:
+ """构建包含测试 Skill 的 SkillRegistry(不依赖真实 LLM)。"""
+ registry = SkillRegistry()
+ tools = _build_test_tools()
+
+ # react_agent skill — 使用 web_search 工具
+ react_config = SkillConfig(
+ name="react_agent",
+ agent_type="react_agent",
+ version="1.0.0",
+ description="ReAct agent skill for tool-augmented reasoning",
+ execution_mode="react",
+ prompt={
+ "identity": "You are a ReAct agent.",
+ "instructions": "Use tools to answer questions step by step.",
+ },
+ )
+ registry.register(Skill(react_config, tools=[tools[2]])) # web_search
+
+ # coder skill — 使用 read_file + write_file 工具
+ coder_config = SkillConfig(
+ name="coder",
+ agent_type="coder",
+ version="1.0.0",
+ description="Code generation and review skill",
+ execution_mode="react",
+ prompt={
+ "identity": "You are a coding assistant.",
+ "instructions": "Help with code generation, review, and refactoring.",
+ },
+ )
+ registry.register(
+ Skill(coder_config, tools=[tools[0], tools[1]]) # read_file, write_file
+ )
+
+ return registry
+
+
+def _build_preprocessor() -> RequestPreprocessor:
+ """构建带 mock skill_registry 的 RequestPreprocessor。"""
+ return RequestPreprocessor(
+ skill_registry=_build_mock_skill_registry(),
+ default_tools=_build_test_tools(),
+ default_system_prompt="You are a helpful assistant.",
+ default_model="test-model",
+ default_agent_name="default",
+ )
+
+
+# ═══════════════════════════════════════════════════════════════════════════
+# 测试数据定义
+# ═══════════════════════════════════════════════════════════════════════════
+
+# 1. 预处理准确度测试用例(≥15)
+PREPROCESSING_CASES: list[dict[str, Any]] = [
+ # Greeting/Chitchat → DIRECT_CHAT
+ {"id": "greeting_cn", "input": "你好", "expected_mode": "direct_chat"},
+ {"id": "greeting_en", "input": "hello", "expected_mode": "direct_chat"},
+ {"id": "greeting_hi", "input": "hi", "expected_mode": "direct_chat"},
+ {"id": "chitchat_thanks", "input": "谢谢", "expected_mode": "direct_chat"},
+ {"id": "chitchat_ok", "input": "好的", "expected_mode": "direct_chat"},
+ {"id": "identity_who", "input": "你是谁", "expected_mode": "direct_chat"},
+ {"id": "identity_name", "input": "你叫什么", "expected_mode": "direct_chat"},
+ # Tool-requiring queries → REACT
+ {"id": "tool_ip", "input": "查下ip", "expected_mode": "react"},
+ {"id": "tool_search", "input": "搜索golang教程", "expected_mode": "react"},
+ {"id": "tool_shell", "input": "执行ls命令", "expected_mode": "react"},
+ {"id": "tool_file", "input": "读一下配置文件", "expected_mode": "react"},
+ {"id": "tool_monitor", "input": "检查服务状态", "expected_mode": "react"},
+ # Complex queries → REACT
+ {"id": "complex_analysis", "input": "帮我分析一下这个数据并生成报告", "expected_mode": "react"},
+ {"id": "complex_code", "input": "重构这个函数使其更高效", "expected_mode": "react"},
+ {"id": "complex_multi", "input": "搜索最新的AI论文并总结关键发现", "expected_mode": "react"},
+ # @skill prefix → SKILL_REACT
+ {
+ "id": "skill_prefix_react",
+ "input": "@skill:react_agent 查看当前ip",
+ "expected_mode": "skill_react",
+ },
+ {
+ "id": "skill_prefix_coder",
+ "input": "@skill:coder 写一个函数",
+ "expected_mode": "skill_react",
+ },
+]
+
+# 2. 召回率测试用例(≥8)
+RECALL_CASES: list[dict[str, Any]] = [
+ {
+ "id": "recall_valid_react",
+ "input": "@skill:react_agent 查看ip",
+ "expected_matched": True,
+ "expected_skill": "react_agent",
+ "expected_mode": "skill_react",
+ },
+ {
+ "id": "recall_valid_coder",
+ "input": "@skill:coder 写代码",
+ "expected_matched": True,
+ "expected_skill": "coder",
+ "expected_mode": "skill_react",
+ },
+ {
+ "id": "recall_invalid_skill",
+ "input": "@skill:nonexistent 做点什么",
+ "expected_matched": False,
+ "expected_skill": None,
+ "expected_mode": "react",
+ },
+ {
+ "id": "recall_no_prefix_react",
+ "input": "查下ip地址",
+ "expected_matched": False,
+ "expected_skill": None,
+ "expected_mode": "react",
+ },
+ {
+ "id": "recall_no_prefix_greeting",
+ "input": "你好",
+ "expected_matched": False,
+ "expected_skill": None,
+ "expected_mode": "direct_chat",
+ },
+ {
+ "id": "recall_no_prefix_complex",
+ "input": "分析数据并生成报告",
+ "expected_matched": False,
+ "expected_skill": None,
+ "expected_mode": "react",
+ },
+ {
+ "id": "recall_skill_only_prefix",
+ "input": "@skill:react_agent",
+ "expected_matched": True,
+ "expected_skill": "react_agent",
+ "expected_mode": "skill_react",
+ },
+ {
+ "id": "recall_skill_with_long_content",
+ "input": "@skill:coder 请帮我重构这个函数,使其时间复杂度从 O(n²) 降到 O(n)",
+ "expected_matched": True,
+ "expected_skill": "coder",
+ "expected_mode": "skill_react",
+ },
+]
+
+# 3. 过拟合检测测试用例(≥5 组,每组原始 + 3 个改写)
+OVERFITTING_CASES: list[dict[str, Any]] = [
+ {
+ "id": "overfit_ip_check",
+ "original": "查看当前ip",
+ "paraphrases": ["查下ip", "获取ip地址", "看下ip"],
+ "expected_mode": "react",
+ },
+ {
+ "id": "overfit_search",
+ "original": "搜索golang教程",
+ "paraphrases": ["搜一下golang教程", "找下golang学习资料", "帮我搜golang入门"],
+ "expected_mode": "react",
+ },
+ {
+ "id": "overfit_greeting",
+ "original": "你好",
+ "paraphrases": ["hello", "hi", "嗨"],
+ "expected_mode": "direct_chat",
+ },
+ {
+ "id": "overfit_file_read",
+ "original": "读一下配置文件",
+ "paraphrases": ["看一下配置文件", "帮我读配置", "查看配置文件内容"],
+ "expected_mode": "react",
+ },
+ {
+ "id": "overfit_identity",
+ "original": "你是谁",
+ "paraphrases": ["你叫什么", "自我介绍", "你是什么"],
+ "expected_mode": "direct_chat",
+ },
+]
+
+# 4. 执行效率测试用例(≥5)
+EFFICIENCY_CASES: list[dict[str, Any]] = [
+ {
+ "id": "efficiency_greeting",
+ "input": "你好",
+ "expected_mode": "direct_chat",
+ "max_time_ms": 2000,
+ },
+ {
+ "id": "efficiency_chitchat",
+ "input": "谢谢",
+ "expected_mode": "direct_chat",
+ "max_time_ms": 2000,
+ },
+ {
+ "id": "efficiency_identity",
+ "input": "你是谁",
+ "expected_mode": "direct_chat",
+ "max_time_ms": 2000,
+ },
+ {
+ "id": "efficiency_react_tool",
+ "input": "查下ip",
+ "expected_mode": "react",
+ "max_time_ms": 5000,
+ },
+ {
+ "id": "efficiency_react_complex",
+ "input": "帮我分析一下这个数据并生成报告",
+ "expected_mode": "react",
+ "max_time_ms": 5000,
+ },
+]
+
+# 5. 工具搜索准确度测试用例(≥8)
+TOOL_SEARCH_CASES: list[dict[str, Any]] = [
+ {
+ "id": "tool_search_read",
+ "query": "read file",
+ "expected_top1": "read_file",
+ },
+ {
+ "id": "tool_search_write",
+ "query": "write file",
+ "expected_top1": "write_file",
+ },
+ {
+ "id": "tool_search_web",
+ "query": "web search",
+ "expected_top1": "web_search",
+ },
+ {
+ "id": "tool_search_shell",
+ "query": "shell command execute",
+ "expected_top1": "shell_exec",
+ },
+ {
+ "id": "tool_search_tests",
+ "query": "run tests verify",
+ "expected_top1": "run_tests",
+ },
+ {
+ "id": "tool_search_file_multiple",
+ "query": "file",
+ "expected_contains": ["read_file", "write_file"],
+ },
+ {
+ "id": "tool_search_no_match",
+ "query": "xyzzy_nonexistent_xyz",
+ "expected_empty": True,
+ },
+ {
+ "id": "tool_search_empty_query",
+ "query": "",
+ "expected_empty": True,
+ },
+]
+
+
+# ═══════════════════════════════════════════════════════════════════════════
+# 1. 预处理准确度
+# ═══════════════════════════════════════════════════════════════════════════
+
+
+@pytest.mark.e2e_capability
+class TestPreprocessingAccuracy:
+ """预处理准确度测试:验证 RequestPreprocessor 的路由决策。
+
+ 覆盖场景:
+ - Greeting/Chitchat → DIRECT_CHAT(零成本快速路径)
+ - Tool-requiring queries → REACT(LLM 决定工具使用)
+ - @skill prefix → SKILL_REACT(显式技能选择)
+ - Complex queries → REACT(默认 agent 循环)
+ """
+
+ @pytest.mark.parametrize(
+ "case",
+ PREPROCESSING_CASES,
+ ids=[c["id"] for c in PREPROCESSING_CASES],
+ )
+ def test_preprocessing_routing(self, case: dict[str, Any]) -> None:
+ """验证每个输入被路由到正确的执行模式。"""
+ preprocessor = _build_preprocessor()
+ result = asyncio.run(preprocessor.preprocess(content=case["input"]))
+
+ actual_mode = result.execution_mode.value
+ expected_mode = case["expected_mode"]
+ passed = actual_mode == expected_mode
+
+ _COLLECTOR.record(
+ dimension="preprocessing_accuracy",
+ case_id=case["id"],
+ passed=passed,
+ input=case["input"],
+ expected=expected_mode,
+ actual=actual_mode,
+ match_method=result.match_method,
+ )
+
+ assert actual_mode == expected_mode, (
+ f"'{case['input']}': expected {expected_mode}, got {actual_mode} "
+ f"(method={result.match_method})"
+ )
+
+
+# ═══════════════════════════════════════════════════════════════════════════
+# 2. 召回率 — 技能匹配
+# ═══════════════════════════════════════════════════════════════════════════
+
+
+@pytest.mark.e2e_capability
+class TestSkillRecall:
+ """技能召回率测试:验证 @skill 前缀识别和 fallback 机制。
+
+ 覆盖场景:
+ - 有效 @skill 前缀 → 正确匹配技能
+ - 无效 @skill 前缀 → fallback 到 REACT
+ - 无前缀 → 默认 REACT 或 DIRECT_CHAT
+ """
+
+ @pytest.mark.parametrize(
+ "case",
+ RECALL_CASES,
+ ids=[c["id"] for c in RECALL_CASES],
+ )
+ def test_skill_recall(self, case: dict[str, Any]) -> None:
+ """验证 @skill 前缀的召回和 fallback 行为。"""
+ preprocessor = _build_preprocessor()
+ result = asyncio.run(preprocessor.preprocess(content=case["input"]))
+
+ actual_matched = result.matched
+ actual_skill = result.skill_name
+ actual_mode = result.execution_mode.value
+ expected_matched = case["expected_matched"]
+ expected_skill = case["expected_skill"]
+ expected_mode = case["expected_mode"]
+
+ passed = (
+ actual_matched == expected_matched
+ and actual_skill == expected_skill
+ and actual_mode == expected_mode
+ )
+
+ _COLLECTOR.record(
+ dimension="skill_recall",
+ case_id=case["id"],
+ passed=passed,
+ input=case["input"],
+ expected_matched=expected_matched,
+ actual_matched=actual_matched,
+ expected_skill=expected_skill,
+ actual_skill=actual_skill,
+ expected_mode=expected_mode,
+ actual_mode=actual_mode,
+ )
+
+ assert actual_matched == expected_matched, (
+ f"'{case['input']}': matched expected {expected_matched}, got {actual_matched}"
+ )
+ assert actual_skill == expected_skill, (
+ f"'{case['input']}': skill expected {expected_skill}, got {actual_skill}"
+ )
+ assert actual_mode == expected_mode, (
+ f"'{case['input']}': mode expected {expected_mode}, got {actual_mode}"
+ )
+
+
+# ═══════════════════════════════════════════════════════════════════════════
+# 3. 过拟合检测
+# ═══════════════════════════════════════════════════════════════════════════
+
+
+@pytest.mark.e2e_capability
+class TestOverfittingDetection:
+ """过拟合检测:验证同一意图的不同表达方式产生一致的执行模式。
+
+ 核心思路:如果路由器对 "查看当前ip" 和 "查下ip" 给出不同的执行模式,
+ 说明路由器对特定表达过拟合,泛化能力不足。
+ """
+
+ @pytest.mark.parametrize(
+ "case",
+ OVERFITTING_CASES,
+ ids=[c["id"] for c in OVERFITTING_CASES],
+ )
+ def test_paraphrase_consistency(self, case: dict[str, Any]) -> None:
+ """验证原始输入和改写输入产生相同的执行模式。"""
+ preprocessor = _build_preprocessor()
+ expected_mode = case["expected_mode"]
+
+ # 测试原始输入
+ original_result = asyncio.run(preprocessor.preprocess(content=case["original"]))
+ original_mode = original_result.execution_mode.value
+
+ # 测试所有改写
+ paraphrase_modes: list[str] = []
+ for para in case["paraphrases"]:
+ result = asyncio.run(preprocessor.preprocess(content=para))
+ paraphrase_modes.append(result.execution_mode.value)
+
+ all_modes = [original_mode] + paraphrase_modes
+ all_consistent = all(m == expected_mode for m in all_modes)
+
+ _COLLECTOR.record(
+ dimension="overfitting_detection",
+ case_id=case["id"],
+ passed=all_consistent,
+ original=case["original"],
+ original_mode=original_mode,
+ paraphrases=case["paraphrases"],
+ paraphrase_modes=paraphrase_modes,
+ expected_mode=expected_mode,
+ )
+
+ assert all_consistent, (
+ f"Overfitting detected for '{case['id']}': "
+ f"original='{case['original']}' → {original_mode}, "
+ f"paraphrases={case['paraphrases']} → {paraphrase_modes}, "
+ f"expected={expected_mode}"
+ )
+
+
+# ═══════════════════════════════════════════════════════════════════════════
+# 4. 执行效率
+# ═══════════════════════════════════════════════════════════════════════════
+
+
+@pytest.mark.e2e_capability
+class TestExecutionEfficiency:
+ """执行效率测试:测量预处理阶段的响应时间。
+
+ 约束:
+ - DIRECT_CHAT 路径:< 2s(零成本快速路径)
+ - REACT 路径启动:< 5s(预处理阶段,不含 LLM 调用)
+ """
+
+ @pytest.mark.parametrize(
+ "case",
+ EFFICIENCY_CASES,
+ ids=[c["id"] for c in EFFICIENCY_CASES],
+ )
+ def test_preprocessing_latency(self, case: dict[str, Any]) -> None:
+ """验证预处理阶段耗时在阈值范围内。"""
+ preprocessor = _build_preprocessor()
+
+ start = time.perf_counter()
+ result = asyncio.run(preprocessor.preprocess(content=case["input"]))
+ elapsed_ms = (time.perf_counter() - start) * 1000
+
+ actual_mode = result.execution_mode.value
+ max_time_ms = case["max_time_ms"]
+ passed = elapsed_ms < max_time_ms and actual_mode == case["expected_mode"]
+
+ _COLLECTOR.record(
+ dimension="execution_efficiency",
+ case_id=case["id"],
+ passed=passed,
+ input=case["input"],
+ elapsed_ms=round(elapsed_ms, 2),
+ max_time_ms=max_time_ms,
+ actual_mode=actual_mode,
+ expected_mode=case["expected_mode"],
+ )
+
+ assert elapsed_ms < max_time_ms, (
+ f"'{case['input']}': elapsed {elapsed_ms:.1f}ms > limit {max_time_ms}ms"
+ )
+ assert actual_mode == case["expected_mode"], (
+ f"'{case['input']}': mode expected {case['expected_mode']}, got {actual_mode}"
+ )
+
+
+# ═══════════════════════════════════════════════════════════════════════════
+# 5. 工具搜索准确度
+# ═══════════════════════════════════════════════════════════════════════════
+
+
+@pytest.mark.e2e_capability
+class TestToolSearchAccuracy:
+ """工具搜索准确度测试:验证 BM25 搜索的相关性排序。
+
+ 覆盖场景:
+ - 精确匹配工具名/描述
+ - 模糊匹配关键词
+ - 无匹配返回空
+ - 空查询返回空
+ """
+
+ @pytest.mark.parametrize(
+ "case",
+ TOOL_SEARCH_CASES,
+ ids=[c["id"] for c in TOOL_SEARCH_CASES],
+ )
+ def test_tool_search(self, case: dict[str, Any]) -> None:
+ """验证工具搜索返回正确的结果。"""
+ index = ToolSearchIndex(_build_test_tools())
+ results = index.search(case["query"], top_k=5)
+ result_names = [r.name for r in results]
+
+ passed = False
+ detail: dict[str, Any] = {"query": case["query"], "results": result_names}
+
+ if case.get("expected_empty"):
+ passed = len(results) == 0
+ detail["expected_empty"] = True
+ elif "expected_top1" in case:
+ passed = len(results) > 0 and results[0].name == case["expected_top1"]
+ detail["expected_top1"] = case["expected_top1"]
+ elif "expected_contains" in case:
+ expected = case["expected_contains"]
+ passed = all(name in result_names for name in expected)
+ detail["expected_contains"] = expected
+
+ _COLLECTOR.record(
+ dimension="tool_search_accuracy",
+ case_id=case["id"],
+ passed=passed,
+ **detail,
+ )
+
+ if case.get("expected_empty"):
+ assert len(results) == 0, f"Query '{case['query']}': expected empty, got {result_names}"
+ elif "expected_top1" in case:
+ assert len(results) > 0, f"Query '{case['query']}': no results"
+ assert results[0].name == case["expected_top1"], (
+ f"Query '{case['query']}': expected top1={case['expected_top1']}, "
+ f"got {results[0].name}"
+ )
+ elif "expected_contains" in case:
+ for name in case["expected_contains"]:
+ assert name in result_names, (
+ f"Query '{case['query']}': expected '{name}' in results, got {result_names}"
+ )
+
+
+# ═══════════════════════════════════════════════════════════════════════════
+# 6. 事件模型完整性
+# ═══════════════════════════════════════════════════════════════════════════
+
+
+@pytest.mark.e2e_capability
+class TestEventModelIntegrity:
+ """事件模型完整性测试:验证 SQ/EQ 双队列的基本功能。
+
+ 覆盖场景:
+ - SQ 提交和消费
+ - SQ 取消任务
+ - EQ 事件推送和订阅
+ - EQ 多订阅者广播
+ - EQ 缓冲回放
+ - 事件类型分类
+ """
+
+ async def test_sq_submit_and_drain(self) -> None:
+ """SQ 正确接收用户输入并按顺序消费。"""
+ sq = SubmissionQueue()
+ task_id = await sq.submit("hello", "session-1")
+
+ received: list[Submission] = []
+
+ async def consumer() -> None:
+ async for sub in sq.drain():
+ received.append(sub)
+ if len(received) >= 1:
+ break
+
+ consumer_task = asyncio.create_task(consumer())
+ await asyncio.wait_for(consumer_task, timeout=1.0)
+
+ passed = len(received) == 1 and received[0].content == "hello"
+ _COLLECTOR.record(
+ dimension="event_model_integrity",
+ case_id="sq_submit_and_drain",
+ passed=passed,
+ )
+ assert passed
+ assert received[0].task_id == task_id
+
+ async def test_sq_cancel_task(self) -> None:
+ """SQ 取消任务后 drain 跳过该提交。"""
+ sq = SubmissionQueue()
+ task_id_1 = await sq.submit("first", "session-1")
+ await sq.submit("second", "session-1")
+ await sq.cancel(task_id_1)
+
+ received: list[str] = []
+
+ async def consumer() -> None:
+ async for sub in sq.drain():
+ received.append(sub.content)
+ if len(received) >= 1:
+ break
+
+ consumer_task = asyncio.create_task(consumer())
+ await asyncio.wait_for(consumer_task, timeout=1.0)
+
+ passed = received == ["second"]
+ _COLLECTOR.record(
+ dimension="event_model_integrity",
+ case_id="sq_cancel_task",
+ passed=passed,
+ )
+ assert passed
+
+ async def test_eq_emit_and_subscribe(self) -> None:
+ """EQ 正确推送事件给订阅者。"""
+ eq = EventQueue()
+ event = Event.create(
+ event_type=TurnEventType.TOKEN,
+ task_id="task-1",
+ session_id="session-1",
+ data={"text": "hello"},
+ )
+
+ received: list[Event] = []
+
+ async def subscriber() -> None:
+ async for evt in eq.subscribe():
+ received.append(evt)
+ break
+
+ sub_task = asyncio.create_task(subscriber())
+ await asyncio.sleep(0.05)
+ await eq.emit(event)
+ await asyncio.wait_for(sub_task, timeout=1.0)
+
+ passed = (
+ len(received) == 1
+ and received[0].event_type == TurnEventType.TOKEN
+ and received[0].data == {"text": "hello"}
+ )
+ _COLLECTOR.record(
+ dimension="event_model_integrity",
+ case_id="eq_emit_and_subscribe",
+ passed=passed,
+ )
+ assert passed
+
+ async def test_eq_broadcast_to_multiple_subscribers(self) -> None:
+ """EQ 多订阅者同时接收事件(广播)。"""
+ eq = EventQueue()
+ received_a: list[Event] = []
+ received_b: list[Event] = []
+
+ async def subscriber_a() -> None:
+ async for evt in eq.subscribe():
+ received_a.append(evt)
+ if len(received_a) >= 2:
+ break
+
+ async def subscriber_b() -> None:
+ async for evt in eq.subscribe():
+ received_b.append(evt)
+ if len(received_b) >= 2:
+ break
+
+ task_a = asyncio.create_task(subscriber_a())
+ task_b = asyncio.create_task(subscriber_b())
+ await asyncio.sleep(0.05)
+
+ await eq.emit(Event.create(TurnEventType.TOKEN, "t1", "s1", {"seq": 1}))
+ await eq.emit(Event.create(TurnEventType.TOKEN, "t1", "s1", {"seq": 2}))
+
+ await asyncio.wait_for(task_a, timeout=1.0)
+ await asyncio.wait_for(task_b, timeout=1.0)
+
+ passed = len(received_a) == 2 and len(received_b) == 2
+ _COLLECTOR.record(
+ dimension="event_model_integrity",
+ case_id="eq_broadcast",
+ passed=passed,
+ )
+ assert passed
+
+ async def test_eq_buffer_replay(self) -> None:
+ """EQ 事件缓冲对新订阅者的回放。"""
+ eq = EventQueue(buffer_size=100)
+
+ await eq.emit(Event.create(TurnEventType.TOKEN, "t1", "s1", {"seq": 1}))
+ await eq.emit(Event.create(TurnEventType.TOKEN, "t1", "s1", {"seq": 2}))
+
+ received: list[Event] = []
+
+ async def subscriber() -> None:
+ async for evt in eq.subscribe():
+ received.append(evt)
+ if len(received) >= 2:
+ break
+
+ sub_task = asyncio.create_task(subscriber())
+ await asyncio.wait_for(sub_task, timeout=1.0)
+
+ passed = (
+ len(received) == 2 and received[0].data == {"seq": 1} and received[1].data == {"seq": 2}
+ )
+ _COLLECTOR.record(
+ dimension="event_model_integrity",
+ case_id="eq_buffer_replay",
+ passed=passed,
+ )
+ assert passed
+
+ def test_event_type_classification(self) -> None:
+ """事件类型按前缀正确分类(session/task/turn)。"""
+ session_events = [
+ SessionEventType.SESSION_STARTED,
+ SessionEventType.SESSION_ENDED,
+ ]
+ task_events = [
+ TaskEventType.TASK_CREATED,
+ TaskEventType.TASK_STARTED,
+ TaskEventType.TASK_COMPLETED,
+ TaskEventType.TASK_FAILED,
+ ]
+ turn_events = [
+ TurnEventType.TURN_STARTED,
+ TurnEventType.THINKING,
+ TurnEventType.TOOL_CALL,
+ TurnEventType.TOKEN,
+ TurnEventType.FINAL_ANSWER,
+ ]
+
+ all_correct = (
+ all(e.startswith("session.") for e in session_events)
+ and all(e.startswith("task.") for e in task_events)
+ and all(e.startswith("turn.") for e in turn_events)
+ )
+
+ _COLLECTOR.record(
+ dimension="event_model_integrity",
+ case_id="event_type_classification",
+ passed=all_correct,
+ )
+ assert all_correct
+
+
+# ═══════════════════════════════════════════════════════════════════════════
+# 7. Spec 管理功能
+# ═══════════════════════════════════════════════════════════════════════════
+
+
+@pytest.mark.e2e_capability
+class TestSpecManagement:
+ """Spec 管理功能测试:验证 Spec 的创建/读取/更新/确认流程。
+
+ 覆盖场景:
+ - 创建 Spec 并持久化到 YAML
+ - 读取 Spec(缓存 + 磁盘)
+ - 更新 Spec 字段
+ - 确认 Spec(状态 + 时间戳 + 步骤)
+ - 删除 Spec
+ """
+
+ def test_spec_create_and_get(self, tmp_path: Path) -> None:
+ """创建 Spec 并读取,验证字段完整。"""
+ mgr = SpecManager(specs_dir=str(tmp_path / "specs"))
+ spec = Spec(
+ spec_id="test-spec-1",
+ goal="Test goal",
+ steps=[
+ SpecStep(step_id="s1", name="Step 1", description="First"),
+ SpecStep(
+ step_id="s2",
+ name="Step 2",
+ description="Second",
+ dependencies=["s1"],
+ ),
+ ],
+ )
+ path = mgr.create(spec)
+ loaded = mgr.get("test-spec-1")
+
+ passed = (
+ path.exists()
+ and loaded is not None
+ and loaded.spec_id == "test-spec-1"
+ and loaded.goal == "Test goal"
+ and len(loaded.steps) == 2
+ and loaded.steps[1].dependencies == ["s1"]
+ )
+ _COLLECTOR.record(
+ dimension="spec_management",
+ case_id="spec_create_and_get",
+ passed=passed,
+ )
+ assert passed
+
+ def test_spec_update(self, tmp_path: Path) -> None:
+ """更新 Spec 字段并持久化。"""
+ mgr = SpecManager(specs_dir=str(tmp_path / "specs"))
+ spec = Spec(spec_id="test-spec-2", goal="Original")
+ mgr.create(spec)
+
+ updated = mgr.update("test-spec-2", goal="Updated goal")
+ reloaded = mgr.get("test-spec-2")
+
+ passed = (
+ updated is not None
+ and updated.goal == "Updated goal"
+ and reloaded is not None
+ and reloaded.goal == "Updated goal"
+ )
+ _COLLECTOR.record(
+ dimension="spec_management",
+ case_id="spec_update",
+ passed=passed,
+ )
+ assert passed
+
+ def test_spec_confirm(self, tmp_path: Path) -> None:
+ """确认 Spec 后状态和步骤状态正确变更。"""
+ mgr = SpecManager(specs_dir=str(tmp_path / "specs"))
+ spec = Spec(
+ spec_id="test-spec-3",
+ goal="Confirm test",
+ steps=[
+ SpecStep(step_id="s1", name="Step 1", description="First"),
+ ],
+ )
+ mgr.create(spec)
+
+ confirmed = mgr.confirm("test-spec-3")
+
+ passed = (
+ confirmed is not None
+ and confirmed.status == "confirmed"
+ and confirmed.confirmed_at is not None
+ and all(s.status == "confirmed" for s in confirmed.steps)
+ )
+ _COLLECTOR.record(
+ dimension="spec_management",
+ case_id="spec_confirm",
+ passed=passed,
+ )
+ assert passed
+
+ def test_spec_list_and_filter(self, tmp_path: Path) -> None:
+ """列出 Spec 并按状态过滤。"""
+ mgr = SpecManager(specs_dir=str(tmp_path / "specs"))
+ mgr.create(Spec(spec_id="draft-1", goal="Draft"))
+ mgr.create(Spec(spec_id="confirmed-1", goal="Confirmed"))
+ mgr.confirm("confirmed-1")
+
+ all_specs = mgr.list_specs()
+ draft_specs = mgr.list_specs(status="draft")
+ confirmed_specs = mgr.list_specs(status="confirmed")
+
+ passed = (
+ len(all_specs) == 2
+ and len(draft_specs) == 1
+ and len(confirmed_specs) == 1
+ and confirmed_specs[0].spec_id == "confirmed-1"
+ )
+ _COLLECTOR.record(
+ dimension="spec_management",
+ case_id="spec_list_and_filter",
+ passed=passed,
+ )
+ assert passed
+
+ def test_spec_delete(self, tmp_path: Path) -> None:
+ """删除 Spec 后文件和缓存均移除。"""
+ mgr = SpecManager(specs_dir=str(tmp_path / "specs"))
+ mgr.create(Spec(spec_id="delete-me", goal="To be deleted"))
+
+ result = mgr.delete("delete-me")
+ loaded = mgr.get("delete-me")
+
+ passed = result is True and loaded is None
+ _COLLECTOR.record(
+ dimension="spec_management",
+ case_id="spec_delete",
+ passed=passed,
+ )
+ assert passed
+
+
+# ═══════════════════════════════════════════════════════════════════════════
+# 8. 验证循环
+# ═══════════════════════════════════════════════════════════════════════════
+
+
+@pytest.mark.e2e_capability
+class TestVerificationLoop:
+ """验证循环测试:验证 VerificationLoop 的 verify 和 retry 机制。
+
+ 覆盖场景:
+ - 成功命令返回 passed=True
+ - 失败命令返回 passed=False
+ - 超时命令返回 passed=False
+ - 重试机制(无 fix_callback / 有 fix_callback)
+ """
+
+ async def test_verify_success(self) -> None:
+ """成功命令返回 passed=True。"""
+ loop = VerificationLoop(commands=["echo ok"], timeout=10.0)
+ result = await loop.verify()
+
+ passed = result.passed is True and "ok" in result.test_output
+ _COLLECTOR.record(
+ dimension="verification_loop",
+ case_id="verify_success",
+ passed=passed,
+ )
+ assert passed
+
+ async def test_verify_failure(self) -> None:
+ """失败命令返回 passed=False。"""
+ loop = VerificationLoop(commands=["false"], timeout=10.0)
+ result = await loop.verify()
+
+ passed = result.passed is False and len(result.errors) > 0
+ _COLLECTOR.record(
+ dimension="verification_loop",
+ case_id="verify_failure",
+ passed=passed,
+ )
+ assert passed
+
+ async def test_verify_timeout(self) -> None:
+ """超时命令返回 passed=False。"""
+ loop = VerificationLoop(commands=["sleep 10"], timeout=0.5)
+ result = await loop.verify()
+
+ passed = result.passed is False and any("timed out" in e for e in result.errors)
+ _COLLECTOR.record(
+ dimension="verification_loop",
+ case_id="verify_timeout",
+ passed=passed,
+ )
+ assert passed
+
+ async def test_verify_and_retry_no_callback(self) -> None:
+ """无 fix_callback 时重试指定次数。"""
+ loop = VerificationLoop(commands=["false"], max_retries=2, timeout=5.0)
+ result = await loop.verify_and_retry()
+
+ passed = result.passed is False and result.attempts == 3
+ _COLLECTOR.record(
+ dimension="verification_loop",
+ case_id="verify_and_retry_no_callback",
+ passed=passed,
+ )
+ assert passed
+
+ async def test_verify_and_retry_with_callback(self) -> None:
+ """fix_callback 被调用并接收 errors 和 test_output。"""
+ call_count = 0
+
+ async def fix_cb(errors: list[str], test_output: str) -> None:
+ nonlocal call_count
+ call_count += 1
+
+ loop = VerificationLoop(commands=["false"], max_retries=1, timeout=5.0)
+ result = await loop.verify_and_retry(fix_callback=fix_cb)
+
+ passed = result.passed is False and call_count == 1
+ _COLLECTOR.record(
+ dimension="verification_loop",
+ case_id="verify_and_retry_with_callback",
+ passed=passed,
+ )
+ assert passed
+
+
+# ═══════════════════════════════════════════════════════════════════════════
+# 9. 综合报告生成
+# ═══════════════════════════════════════════════════════════════════════════
+
+
+def _run_all_checks_for_report() -> _ResultCollector:
+ """运行所有维度的检查,返回填充好的收集器(用于报告生成)。
+
+ 这确保报告测试自包含,不依赖其他测试的执行顺序。
+ """
+ collector = _ResultCollector()
+ preprocessor = _build_preprocessor()
+ tools = _build_test_tools()
+ search_index = ToolSearchIndex(tools)
+
+ # --- 1. 预处理准确度 ---
+ for case in PREPROCESSING_CASES:
+ result = asyncio.run(preprocessor.preprocess(content=case["input"]))
+ actual = result.execution_mode.value
+ passed = actual == case["expected_mode"]
+ collector.record(
+ dimension="preprocessing_accuracy",
+ case_id=case["id"],
+ passed=passed,
+ input=case["input"],
+ expected=case["expected_mode"],
+ actual=actual,
+ )
+
+ # --- 2. 召回率 ---
+ for case in RECALL_CASES:
+ result = asyncio.run(preprocessor.preprocess(content=case["input"]))
+ passed = (
+ result.matched == case["expected_matched"]
+ and result.skill_name == case["expected_skill"]
+ and result.execution_mode.value == case["expected_mode"]
+ )
+ collector.record(
+ dimension="skill_recall",
+ case_id=case["id"],
+ passed=passed,
+ )
+
+ # --- 3. 过拟合检测 ---
+ for case in OVERFITTING_CASES:
+ original_result = asyncio.run(preprocessor.preprocess(content=case["original"]))
+ modes = [original_result.execution_mode.value]
+ for para in case["paraphrases"]:
+ r = asyncio.run(preprocessor.preprocess(content=para))
+ modes.append(r.execution_mode.value)
+ passed = all(m == case["expected_mode"] for m in modes)
+ collector.record(
+ dimension="overfitting_detection",
+ case_id=case["id"],
+ passed=passed,
+ )
+
+ # --- 4. 执行效率 ---
+ for case in EFFICIENCY_CASES:
+ start = time.perf_counter()
+ result = asyncio.run(preprocessor.preprocess(content=case["input"]))
+ elapsed_ms = (time.perf_counter() - start) * 1000
+ passed = (
+ elapsed_ms < case["max_time_ms"]
+ and result.execution_mode.value == case["expected_mode"]
+ )
+ collector.record(
+ dimension="execution_efficiency",
+ case_id=case["id"],
+ passed=passed,
+ elapsed_ms=round(elapsed_ms, 2),
+ )
+
+ # --- 5. 工具搜索准确度 ---
+ for case in TOOL_SEARCH_CASES:
+ results = search_index.search(case["query"], top_k=5)
+ names = [r.name for r in results]
+ if case.get("expected_empty"):
+ passed = len(results) == 0
+ elif "expected_top1" in case:
+ passed = len(results) > 0 and results[0].name == case["expected_top1"]
+ elif "expected_contains" in case:
+ passed = all(n in names for n in case["expected_contains"])
+ else:
+ passed = False
+ collector.record(
+ dimension="tool_search_accuracy",
+ case_id=case["id"],
+ passed=passed,
+ )
+
+ # --- 6. 事件模型完整性 ---
+ async def _run_event_checks() -> None:
+ # SQ submit + drain
+ sq = SubmissionQueue()
+ await sq.submit("test", "s1")
+ sq_received: list[Submission] = []
+
+ async def sq_consumer() -> None:
+ async for sub in sq.drain():
+ sq_received.append(sub)
+ break
+
+ sq_task = asyncio.create_task(sq_consumer())
+ await asyncio.wait_for(sq_task, timeout=1.0)
+ collector.record(
+ dimension="event_model_integrity",
+ case_id="sq_submit_and_drain",
+ passed=len(sq_received) == 1,
+ )
+
+ # EQ emit + subscribe
+ eq = EventQueue()
+ event = Event.create(TurnEventType.TOKEN, "t1", "s1", {"text": "hi"})
+ eq_received: list[Event] = []
+
+ async def eq_sub() -> None:
+ async for evt in eq.subscribe():
+ eq_received.append(evt)
+ break
+
+ eq_task = asyncio.create_task(eq_sub())
+ await asyncio.sleep(0.05)
+ await eq.emit(event)
+ await asyncio.wait_for(eq_task, timeout=1.0)
+ collector.record(
+ dimension="event_model_integrity",
+ case_id="eq_emit_and_subscribe",
+ passed=len(eq_received) == 1,
+ )
+
+ asyncio.run(_run_event_checks())
+
+ # 事件类型分类
+ type_ok = (
+ SessionEventType.SESSION_STARTED.startswith("session.")
+ and TaskEventType.TASK_STARTED.startswith("task.")
+ and TurnEventType.TOKEN.startswith("turn.")
+ )
+ collector.record(
+ dimension="event_model_integrity",
+ case_id="event_type_classification",
+ passed=type_ok,
+ )
+
+ # --- 7. Spec 管理 ---
+ import tempfile
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ mgr = SpecManager(specs_dir=tmpdir)
+ spec = Spec(
+ spec_id="report-spec",
+ goal="Report test",
+ steps=[SpecStep(step_id="s1", name="S1", description="Step 1")],
+ )
+ mgr.create(spec)
+ loaded = mgr.get("report-spec")
+ collector.record(
+ dimension="spec_management",
+ case_id="spec_create_and_get",
+ passed=loaded is not None and loaded.goal == "Report test",
+ )
+
+ confirmed = mgr.confirm("report-spec")
+ collector.record(
+ dimension="spec_management",
+ case_id="spec_confirm",
+ passed=confirmed is not None and confirmed.status == "confirmed",
+ )
+
+ # --- 8. 验证循环 ---
+ async def _run_verification_checks() -> None:
+ loop_ok = VerificationLoop(commands=["echo ok"], timeout=10.0)
+ result_ok = await loop_ok.verify()
+ collector.record(
+ dimension="verification_loop",
+ case_id="verify_success",
+ passed=result_ok.passed is True,
+ )
+
+ loop_fail = VerificationLoop(commands=["false"], timeout=5.0)
+ result_fail = await loop_fail.verify()
+ collector.record(
+ dimension="verification_loop",
+ case_id="verify_failure",
+ passed=result_fail.passed is False,
+ )
+
+ asyncio.run(_run_verification_checks())
+
+ return collector
+
+
+def _generate_json_report(collector: _ResultCollector) -> dict[str, Any]:
+ """生成 JSON 格式的综合报告。"""
+ dimensions = [
+ "preprocessing_accuracy",
+ "skill_recall",
+ "overfitting_detection",
+ "execution_efficiency",
+ "tool_search_accuracy",
+ "event_model_integrity",
+ "spec_management",
+ "verification_loop",
+ ]
+
+ dimension_scores: dict[str, float] = {}
+ dimension_details: dict[str, Any] = {}
+ for dim in dimensions:
+ score = collector.dimension_score(dim)
+ dimension_scores[dim] = round(score, 1)
+ dimension_details[dim] = {
+ "total": len(collector.results.get(dim, [])),
+ "passed": sum(1 for c in collector.results.get(dim, []) if c["passed"]),
+ "score": round(score, 1),
+ "cases": collector.results.get(dim, []),
+ }
+
+ total_score = collector.total_score()
+
+ # 改进建议
+ suggestions: list[str] = []
+ for dim, score in dimension_scores.items():
+ if score < 100:
+ suggestions.append(f"[{dim}] 得分 {score:.1f}%,存在失败用例,需检查相关组件")
+ if not suggestions:
+ suggestions.append("所有维度均达到 100%,架构状态良好")
+
+ return {
+ "report_type": "comprehensive_capability_backtest",
+ "generated_at": datetime.now(timezone.utc).isoformat(),
+ "total_score": round(total_score, 1),
+ "total_cases": sum(len(cases) for cases in collector.results.values()),
+ "total_passed": sum(
+ 1 for cases in collector.results.values() for c in cases if c["passed"]
+ ),
+ "dimension_scores": dimension_scores,
+ "dimension_details": dimension_details,
+ "suggestions": suggestions,
+ }
+
+
+def _generate_text_report(json_report: dict[str, Any]) -> str:
+ """生成中文文本格式的综合报告。"""
+ lines: list[str] = []
+ sep = "=" * 70
+
+ lines.append(sep)
+ lines.append("Fischer AgentKit 综合能力回测报告")
+ lines.append(sep)
+ lines.append(f"生成时间: {json_report['generated_at']}")
+ lines.append(f"总体评分: {json_report['total_score']:.1f}%")
+ lines.append(
+ f"用例总数: {json_report['total_cases']} "
+ f"通过: {json_report['total_passed']} "
+ f"失败: {json_report['total_cases'] - json_report['total_passed']}"
+ )
+ lines.append("")
+
+ # 各维度得分
+ lines.append("-" * 70)
+ lines.append("各维度得分")
+ lines.append("-" * 70)
+ dim_names: dict[str, str] = {
+ "preprocessing_accuracy": "预处理准确度",
+ "skill_recall": "技能召回率",
+ "overfitting_detection": "过拟合检测",
+ "execution_efficiency": "执行效率",
+ "tool_search_accuracy": "工具搜索准确度",
+ "event_model_integrity": "事件模型完整性",
+ "spec_management": "Spec 管理功能",
+ "verification_loop": "验证循环",
+ }
+ for dim, score in json_report["dimension_scores"].items():
+ name = dim_names.get(dim, dim)
+ detail = json_report["dimension_details"][dim]
+ status = "✓" if score == 100 else "✗"
+ lines.append(f" {status} {name}: {score:.1f}% ({detail['passed']}/{detail['total']})")
+ lines.append("")
+
+ # 详细用例结果
+ lines.append("-" * 70)
+ lines.append("详细用例结果")
+ lines.append("-" * 70)
+ for dim, details in json_report["dimension_details"].items():
+ name = dim_names.get(dim, dim)
+ lines.append(f"\n[{name}]")
+ for case in details["cases"]:
+ status = "✓" if case["passed"] else "✗"
+ lines.append(f" {status} {case['case_id']}")
+ lines.append("")
+
+ # 改进建议
+ lines.append("-" * 70)
+ lines.append("改进建议")
+ lines.append("-" * 70)
+ for suggestion in json_report["suggestions"]:
+ lines.append(f" • {suggestion}")
+ lines.append("")
+ lines.append(sep)
+
+ return "\n".join(lines)
+
+
+@pytest.mark.e2e_capability
+class TestComprehensiveReport:
+ """综合报告生成测试:在所有测试完成后生成综合能力报告。
+
+ 输出:
+ - JSON 报告: test-results/e2e/comprehensive_report.json
+ - 文本报告: test-results/e2e/comprehensive_report.txt
+ """
+
+ def test_generate_comprehensive_report(self, tmp_path: Path) -> None:
+ """运行所有维度的检查并生成综合报告。"""
+ # 自包含运行所有检查(不依赖其他测试的执行顺序)
+ collector = _run_all_checks_for_report()
+
+ # 合并已有收集器结果(如果其他测试已运行)
+ for dim, cases in _COLLECTOR.results.items():
+ if dim not in collector.results:
+ collector.results[dim] = cases
+
+ # 生成报告
+ json_report = _generate_json_report(collector)
+ text_report = _generate_text_report(json_report)
+
+ # 确保输出目录存在
+ output_dir = Path("test-results/e2e")
+ output_dir.mkdir(parents=True, exist_ok=True)
+
+ # 保存 JSON 报告
+ json_path = output_dir / "comprehensive_report.json"
+ json_path.write_text(
+ json.dumps(json_report, ensure_ascii=False, indent=2),
+ encoding="utf-8",
+ )
+
+ # 保存文本报告
+ text_path = output_dir / "comprehensive_report.txt"
+ text_path.write_text(text_report, encoding="utf-8")
+
+ # 打印报告到控制台
+ print(f"\n{text_report}")
+ print(f"\nJSON 报告: {json_path}")
+ print(f"文本报告: {text_path}")
+
+ # 验证报告文件已生成
+ assert json_path.exists(), "JSON report file not generated"
+ assert text_path.exists(), "Text report file not generated"
+
+ # 验证报告内容完整
+ assert json_report["total_cases"] > 0, "No test cases in report"
+ assert len(json_report["dimension_scores"]) == 8, "Expected 8 dimensions in report"
+
+ # 验证总体通过率不低于阈值(允许部分用例失败,但总体应 > 80%)
+ total_score = json_report["total_score"]
+ print(f"\n总体评分: {total_score:.1f}%")
+ assert total_score >= 80.0, f"Total score {total_score:.1f}% is below 80% threshold"
diff --git a/tests/e2e/test_capability_router_direct.py b/tests/e2e/test_capability_router_direct.py
deleted file mode 100644
index 0536d00..0000000
--- a/tests/e2e/test_capability_router_direct.py
+++ /dev/null
@@ -1,405 +0,0 @@
-"""E2E Agent Capability Tests — Router Direct Backtest Layer (Real LLM).
-
-Directly tests CostAwareRouter.route() using real LLM configuration
-loaded from agentkit.yaml. Records full SkillRoutingResult for precise
-root cause analysis:
- - match_method (layer0/layer1/layer1.5/layer2)
- - match_confidence
- - complexity score
- - execution_trace
-"""
-
-import asyncio
-import os
-from pathlib import Path
-
-import pytest
-
-from agentkit.chat.skill_routing import CostAwareRouter
-from agentkit.router.intent import IntentRouter
-from agentkit.server.app import _build_llm_gateway, _build_skill_registry
-from agentkit.server.config import ServerConfig
-from agentkit.skills.registry import SkillRegistry
-
-from tests.e2e.benchmark_dataset import (
- ALL_BENCHMARKS,
- ROUTING_KEYWORD_BENCHMARKS,
- ROUTING_EDGE_BENCHMARKS,
- SEMANTIC_ROUTER_BENCHMARKS,
- BenchmarkCase,
-)
-from tests.e2e.capability_metrics import MetricsCollector
-
-
-# ═══════════════════════════════════════════════════════════════════════════
-# Real component initialization from agentkit.yaml
-# ═══════════════════════════════════════════════════════════════════════════
-
-
-def _find_config_path() -> str | None:
- """Find agentkit.yaml in standard search paths."""
- candidates = [
- os.environ.get("AGENTKIT_CONFIG", ""),
- str(Path.cwd() / "agentkit.yaml"),
- str(Path.home() / ".agentkit" / "agentkit.yaml"),
- ]
- for path in candidates:
- if path and Path(path).is_file():
- return path
- return None
-
-
-def _build_real_components() -> tuple[CostAwareRouter, SkillRegistry, IntentRouter]:
- """Build real components from agentkit.yaml configuration.
-
- Returns (router, skill_registry, intent_router).
- Raises skip if no valid LLM provider is configured.
- """
- config_path = _find_config_path()
- if not config_path:
- pytest.skip("No agentkit.yaml found — cannot build real components")
-
- # Load .env if present
- env_path = Path(config_path).parent / ".env"
- if env_path.exists():
- try:
- from dotenv import load_dotenv
-
- load_dotenv(env_path)
- except ImportError:
- # python-dotenv not installed, manually parse .env
- with open(env_path) as f:
- for line in f:
- line = line.strip()
- if line and not line.startswith("#") and "=" in line:
- key, _, value = line.partition("=")
- os.environ.setdefault(key.strip(), value.strip().strip("'\""))
-
- server_config = ServerConfig.from_yaml(config_path)
-
- # Check if any LLM provider has a valid API key
- if not server_config.has_llm_provider():
- # Try to inject DASHSCOPE_API_KEY from environment
- dashscope_key = os.environ.get("DASHSCOPE_API_KEY", "")
- if dashscope_key:
- # Inject into the test provider config
- for name, pconf in server_config.llm_config.providers.items():
- if not pconf.api_key:
- pconf.api_key = dashscope_key
- # Set base_url for dashscope if missing
- # Use coding base_url for bailian-coding keys (sk-sp-* prefix)
- if not pconf.base_url:
- if dashscope_key.startswith("sk-sp-"):
- pconf.base_url = "https://coding.dashscope.aliyuncs.com/v1"
- else:
- pconf.base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
- break
-
- if not server_config.has_llm_provider():
- pytest.skip("No LLM provider with valid API key — skipping real LLM tests")
-
- # Build real LLM gateway
- llm_gateway = _build_llm_gateway(server_config)
-
- # Build real skill registry from configs/skills
- skill_registry = _build_skill_registry(server_config)
-
- # Build real intent router
- intent_router = IntentRouter(llm_gateway=llm_gateway)
-
- # Build real CostAwareRouter
- router_conf = server_config.router or {}
-
- # Build SemanticRouter if enabled or if embedding is available
- semantic_router = None
- semantic_conf = router_conf.get("semantic", {})
- if semantic_conf.get("enabled", False):
- try:
- from agentkit.chat.semantic_router import SemanticRouter
- from agentkit.memory.embedder import OpenAIEmbedder
-
- # Try to get embedder from LLM gateway cache first
- embedder = getattr(llm_gateway, "_embedder", None)
-
- # If no cache embedder, create one directly from provider config
- if embedder is None:
- # Find a provider with an API key to use for embedding
- for pname, pconf in server_config.llm_config.providers.items():
- if pconf.api_key:
- # Use correct base_url based on key prefix
- if pconf.api_key.startswith("sk-sp-"):
- base_url = pconf.base_url or "https://coding.dashscope.aliyuncs.com/v1"
- else:
- base_url = pconf.base_url or "https://dashscope.aliyuncs.com/compatible-mode/v1"
- embedder = OpenAIEmbedder(
- api_key=pconf.api_key,
- base_url=base_url,
- model="text-embedding-v3",
- )
- print(f"Created embedder from provider '{pname}' (base_url={base_url})")
- break
-
- if embedder is not None:
- semantic_router = SemanticRouter(
- embedder=embedder,
- similarity_high=semantic_conf.get("similarity_high", 0.85),
- similarity_low=semantic_conf.get("similarity_low", 0.4),
- )
- # Build skill embedding index
- import asyncio
-
- try:
- loop = asyncio.get_running_loop()
- except RuntimeError:
- loop = None
-
- if loop and loop.is_running():
- # Already in async context (pytest-asyncio), schedule in background
- import concurrent.futures
-
- with concurrent.futures.ThreadPoolExecutor() as pool:
- pool.submit(asyncio.run, semantic_router.build_index(skill_registry)).result()
- else:
- asyncio.run(semantic_router.build_index(skill_registry))
- print(f"SemanticRouter built: {semantic_router._index.size} skills indexed")
- else:
- print("Warning: No embedder available for SemanticRouter")
- except Exception as e:
- print(f"Warning: SemanticRouter not available: {e}")
-
- router = CostAwareRouter(
- llm_gateway=llm_gateway,
- model="default",
- org_context=None,
- auction_enabled=router_conf.get("auction_enabled", False),
- classifier=router_conf.get("classifier", "heuristic"),
- merged_llm_classify=router_conf.get("merged_llm_classify", True),
- semantic_router=semantic_router,
- )
-
- return router, skill_registry, intent_router
-
-
-# Cache components at module level to avoid rebuilding for every test
-_cached_components: tuple[CostAwareRouter, SkillRegistry, IntentRouter] | None = None
-
-
-def _get_components() -> tuple[CostAwareRouter, SkillRegistry, IntentRouter]:
- """Get or build real components (cached for session)."""
- global _cached_components
- if _cached_components is None:
- _cached_components = _build_real_components()
- return _cached_components
-
-
-# ═══════════════════════════════════════════════════════════════════════════
-# Helper: Run a single benchmark through the real router
-# ═══════════════════════════════════════════════════════════════════════════
-
-
-async def _run_router_benchmark(
- benchmark: BenchmarkCase,
- collector: MetricsCollector,
- test_name: str,
- is_paraphrase: bool = False,
- input_override: str | None = None,
-) -> dict:
- """Run a single benchmark through the real router."""
- router, skill_registry, intent_router = _get_components()
- query = input_override or benchmark.input
-
- collector.start_timer(benchmark.id)
-
- try:
- result = await router.route(
- content=query,
- skill_registry=skill_registry,
- intent_router=intent_router,
- default_tools=[],
- default_system_prompt=None,
- )
-
- actual_skill = result.skill_name
- actual_exec_mode = result.execution_mode.value if result.execution_mode else None
- actual_complexity = result.complexity
- actual_match_method = result.match_method
- actual_match_confidence = result.match_confidence
- task_succeeded = True
- error_msg = None
- except Exception as e:
- actual_skill = None
- actual_exec_mode = None
- actual_complexity = 0.0
- actual_match_method = None
- actual_match_confidence = 0.0
- task_succeeded = False
- error_msg = str(e)[:200]
-
- # Map complexity score to level
- if actual_complexity < 0.3:
- actual_complexity_level = "low"
- elif actual_complexity < 0.7:
- actual_complexity_level = "medium"
- else:
- actual_complexity_level = "high"
-
- # Judge correctness
- skill_correct = None
- if benchmark.expected_skill is not None and actual_skill is not None:
- skill_correct = actual_skill == benchmark.expected_skill
- elif benchmark.expected_skill is None:
- skill_correct = actual_skill is None or task_succeeded
-
- execution_mode_correct = None
- if actual_exec_mode is not None and benchmark.expected_execution_mode:
- mode_map = {
- "direct": "DIRECT_CHAT",
- "react": "SKILL_REACT",
- "rewoo": "REWOO",
- "reflexion": "REFLEXION",
- "plan_exec": "PLAN_EXEC",
- "team_collab": "TEAM_COLLAB",
- "llm_generate": "SKILL_REACT",
- "tool_call": "SKILL_REACT",
- "custom": "SKILL_REACT",
- }
- expected_normalized = mode_map.get(
- benchmark.expected_execution_mode, benchmark.expected_execution_mode.upper()
- )
- execution_mode_correct = actual_exec_mode.upper() == expected_normalized
-
- complexity_correct = actual_complexity_level == benchmark.expected_complexity
-
- obs = collector.record_benchmark_result(
- benchmark,
- test_name=test_name,
- actual_skill=actual_skill,
- actual_execution_mode=actual_exec_mode,
- actual_status_code=200 if task_succeeded else 500,
- task_succeeded=task_succeeded,
- is_paraphrase=is_paraphrase,
- error_message=error_msg,
- )
- obs.complexity_correct = complexity_correct
-
- return {
- "skill_correct": skill_correct,
- "execution_mode_correct": execution_mode_correct,
- "complexity_correct": complexity_correct,
- "actual_skill": actual_skill,
- "actual_exec_mode": actual_exec_mode,
- "actual_complexity": actual_complexity,
- "actual_complexity_level": actual_complexity_level,
- "actual_match_method": actual_match_method,
- "actual_match_confidence": actual_match_confidence,
- "task_succeeded": task_succeeded,
- }
-
-
-# ═══════════════════════════════════════════════════════════════════════════
-# Layer 0: Rule Matching Tests
-# ═══════════════════════════════════════════════════════════════════════════
-
-
-@pytest.mark.e2e_capability
-class TestRouterLayer0:
- """Test Layer 0 rule matching with real router."""
-
- @pytest.mark.parametrize(
- "benchmark",
- [
- b
- for b in ROUTING_EDGE_BENCHMARKS
- if b.subcategory in ("greeting", "identity", "explicit_prefix")
- ],
- ids=[
- b.id
- for b in ROUTING_EDGE_BENCHMARKS
- if b.subcategory in ("greeting", "identity", "explicit_prefix")
- ],
- )
- def test_layer0_rules(self, benchmark: BenchmarkCase, metrics_collector: MetricsCollector):
- """Layer 0 should correctly match greetings, identity, and @skill: prefix."""
- result = asyncio.run(
- _run_router_benchmark(benchmark, metrics_collector, f"layer0_{benchmark.id}")
- )
- if benchmark.subcategory == "greeting":
- assert result["actual_match_method"] in ("layer0", None) or result["task_succeeded"]
- if benchmark.subcategory == "explicit_prefix":
- assert result["actual_skill"] == benchmark.expected_skill or result["task_succeeded"]
-
-
-# ═══════════════════════════════════════════════════════════════════════════
-# Layer 1: Complexity Classification Tests
-# ═══════════════════════════════════════════════════════════════════════════
-
-
-@pytest.mark.e2e_capability
-class TestRouterLayer1:
- """Test Layer 1 complexity classification with real router."""
-
- @pytest.mark.parametrize(
- "benchmark",
- ROUTING_KEYWORD_BENCHMARKS,
- ids=[b.id for b in ROUTING_KEYWORD_BENCHMARKS],
- )
- def test_complexity_classification(
- self, benchmark: BenchmarkCase, metrics_collector: MetricsCollector
- ):
- """HeuristicClassifier should correctly estimate complexity."""
- asyncio.run(_run_router_benchmark(benchmark, metrics_collector, f"layer1_{benchmark.id}"))
-
-
-# ═══════════════════════════════════════════════════════════════════════════
-# Semantic Router Tests
-# ═══════════════════════════════════════════════════════════════════════════
-
-
-@pytest.mark.e2e_capability
-class TestSemanticRouter:
- """Test semantic router matching with real router."""
-
- @pytest.mark.parametrize(
- "benchmark",
- SEMANTIC_ROUTER_BENCHMARKS,
- ids=[b.id for b in SEMANTIC_ROUTER_BENCHMARKS],
- )
- def test_semantic_match(self, benchmark: BenchmarkCase, metrics_collector: MetricsCollector):
- """SemanticRouter should match skill descriptions."""
- asyncio.run(_run_router_benchmark(benchmark, metrics_collector, f"semantic_{benchmark.id}"))
-
-
-# ═══════════════════════════════════════════════════════════════════════════
-# Paraphrase Consistency Tests (Overfitting Detection)
-# ═══════════════════════════════════════════════════════════════════════════
-
-
-@pytest.mark.e2e_capability
-class TestRouterParaphraseConsistency:
- """Test that paraphrased inputs route to the same skill as originals."""
-
- @pytest.mark.parametrize(
- "benchmark",
- [b for b in ALL_BENCHMARKS if b.paraphrases and b.expected_skill is not None][:10],
- ids=[b.id for b in ALL_BENCHMARKS if b.paraphrases and b.expected_skill is not None][:10],
- )
- def test_paraphrase_routes_same_skill(
- self, benchmark: BenchmarkCase, metrics_collector: MetricsCollector
- ):
- """Original and paraphrased inputs should route to the same skill."""
- # Run original
- asyncio.run(
- _run_router_benchmark(benchmark, metrics_collector, f"para_orig_{benchmark.id}")
- )
-
- # Run paraphrases
- for i, para in enumerate(benchmark.paraphrases):
- asyncio.run(
- _run_router_benchmark(
- benchmark,
- metrics_collector,
- f"para_{benchmark.id}_{i}",
- is_paraphrase=True,
- input_override=para,
- )
- )