fischer-agentkit/tests/e2e/capability_metrics.py

1367 lines
63 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Agent Capability Metrics — Collection, Analysis, and Reporting.
Core components:
1. CapabilityMetrics: data model for a single test observation
2. MetricsCollector: session-scoped collector that gathers all observations
3. MetricsAnalyzer: computes recall/precision/F1, overfitting scores, weakness analysis
4. MetricsReporter: generates human-readable and machine-readable reports
Design:
- Collector is a pytest fixture (session-scoped), injected into capability tests
- Each test records what actually happened vs what was expected
- After all tests, analyzer computes aggregate metrics
- Reporter outputs JSON + plain-text summary
"""
import json
import os
import time
from collections import defaultdict
from datetime import datetime, timezone
from typing import Any
from pydantic import BaseModel, ConfigDict
from tests.e2e.benchmark_dataset import BenchmarkCase
# ═══════════════════════════════════════════════════════════════════════════
# 1. Data Models
# ═══════════════════════════════════════════════════════════════════════════
class CapabilityObservation(BaseModel):
"""A single test observation: what was expected vs what actually happened."""
model_config = ConfigDict()
# Identity
benchmark_id: str
test_name: str
timestamp: str
# Input
input_query: str
is_paraphrase: bool = False # True if this is a paraphrase test (overfitting detection)
# Expected (ground truth)
expected_skill: str | None = None
expected_execution_mode: str = "direct"
expected_complexity: str = "low"
# Actual (observed)
actual_skill: str | None = None
actual_execution_mode: str | None = None
actual_status_code: int = 0
actual_response_keys: list[str] = []
actual_complexity_score: float | None = None
actual_match_method: str | None = None
actual_match_confidence: float | None = None
# Judgments
skill_correct: bool | None = None # None = couldn't determine
execution_mode_correct: bool | None = None
complexity_correct: bool | None = None
task_succeeded: bool = False # HTTP 200 + valid response
# Metadata
category: str = ""
subcategory: str = ""
response_time_ms: float = 0.0
error_message: str | None = None
# Alignment & Cascade fields (U5)
alignment_violations: int = 0 # Number of constraint violations detected
cascade_alert: bool = False # Whether a cascade alert was triggered
class CategoryMetrics(BaseModel):
"""Aggregate metrics for a specific category/subcategory."""
model_config = ConfigDict()
category: str
subcategory: str
total: int = 0
skill_correct: int = 0
skill_recall: float = 0.0
skill_precision: float = 0.0
skill_f1: float = 0.0
execution_mode_correct: int = 0
execution_mode_accuracy: float = 0.0
complexity_correct: int = 0
complexity_accuracy: float = 0.0
task_success_rate: float = 0.0
avg_response_time_ms: float = 0.0
class OverfittingResult(BaseModel):
"""Overfitting detection result for a single benchmark case."""
model_config = ConfigDict()
benchmark_id: str
original_correct: bool
paraphrase_results: list[bool] # True = correct for each paraphrase
consistency_rate: float = 0.0 # % of paraphrases that match original result
is_overfitted: bool = False # True if original correct but paraphrases mostly wrong
class WeaknessItem(BaseModel):
"""A single identified weakness."""
model_config = ConfigDict()
dimension: str # routing / execution / quality / team / consistency
subcategory: str
severity: str # critical / high / medium / low
description: str
evidence: str
suggestion: str
class RootCause(BaseModel):
"""Root cause analysis for a weakness."""
model_config = ConfigDict()
cause_type: str # keyword_gap / complexity_misjudge / intent_ambiguous / fallback_missing / overfit_pattern / tool_missing / config_error / quality_threshold
cause_description: str
confidence: float = 0.0 # 0.0~1.0, how confident we are about this root cause
affected_cases: list[str] = [] # benchmark IDs affected by this cause
detail: str = "" # additional technical detail
class ImprovementAction(BaseModel):
"""A single actionable improvement step."""
model_config = ConfigDict()
action_id: str
title: str
description: str
target_module: str # which code module to modify
priority: str # P0 / P1 / P2 / P3
expected_impact: str # what improvement to expect
effort: str # small / medium / large
related_causes: list[str] = [] # cause_types this action addresses
verification: str = "" # how to verify the fix works
class ImprovementPlan(BaseModel):
"""Improvement plan for a specific weakness."""
model_config = ConfigDict()
weakness_description: str
root_causes: list[RootCause]
actions: list[ImprovementAction]
overall_strategy: str
class CapabilityReport(BaseModel):
"""Full capability analysis report."""
model_config = ConfigDict()
generated_at: str
total_observations: int
overall_skill_recall: float
overall_skill_precision: float
overall_skill_f1: float
overall_execution_mode_accuracy: float
overall_task_success_rate: float
category_metrics: list[CategoryMetrics]
overfitting_results: list[OverfittingResult]
overfitting_score: float # 0.0 = no overfitting, 1.0 = fully overfitted
weaknesses: list[WeaknessItem]
root_causes: list[RootCause]
improvement_plans: list[ImprovementPlan]
raw_observations: list[CapabilityObservation]
# ═══════════════════════════════════════════════════════════════════════════
# 2. Metrics Collector
# ═══════════════════════════════════════════════════════════════════════════
class MetricsCollector:
"""Collects capability observations during E2E test execution.
Usage in tests:
collector.record(observation)
collector.record_benchmark_result(benchmark, actual_skill, ...)
"""
def __init__(self) -> None:
self._observations: list[CapabilityObservation] = []
self._start_times: dict[str, float] = {}
def start_timer(self, benchmark_id: str) -> None:
self._start_times[benchmark_id] = time.monotonic()
def stop_timer(self, benchmark_id: str) -> float:
start = self._start_times.pop(benchmark_id, None)
if start is None:
return 0.0
return (time.monotonic() - start) * 1000 # ms
def record(self, observation: CapabilityObservation) -> None:
self._observations.append(observation)
def record_benchmark_result(
self,
benchmark: BenchmarkCase,
*,
test_name: str,
actual_skill: str | None = None,
actual_execution_mode: str | None = None,
actual_status_code: int = 0,
actual_response_keys: list[str] | None = None,
task_succeeded: bool = False,
is_paraphrase: bool = False,
error_message: str | None = None,
) -> CapabilityObservation:
"""Record a benchmark test result with automatic correctness judgment."""
response_time = self.stop_timer(benchmark.id)
# Judge skill correctness
skill_correct: bool | None = None
if benchmark.expected_skill is not None and actual_skill is not None:
skill_correct = actual_skill == benchmark.expected_skill
elif benchmark.expected_skill is None:
# Expected no specific skill, so any non-error is acceptable
skill_correct = actual_skill is None or task_succeeded
# Judge execution mode correctness
execution_mode_correct: bool | None = None
if actual_execution_mode is not None:
# Normalize both sides for comparison:
# actual: "skill_react" / "rewoo" / "direct_chat" etc.
# expected: "react" / "rewoo" / "direct" etc.
_MODE_EQUIVALENCE: dict[str, str] = {
"skill_react": "react",
"direct_chat": "direct",
"team_collab": "team_collab",
}
actual_norm = _MODE_EQUIVALENCE.get(actual_execution_mode, actual_execution_mode)
execution_mode_correct = actual_norm == benchmark.expected_execution_mode
# Judge complexity correctness (approximate: based on execution mode match)
complexity_correct: bool | None = None
if execution_mode_correct is not None:
complexity_correct = execution_mode_correct
obs = CapabilityObservation(
benchmark_id=benchmark.id,
test_name=test_name,
timestamp=datetime.now(timezone.utc).isoformat(),
input_query=benchmark.input,
is_paraphrase=is_paraphrase,
expected_skill=benchmark.expected_skill,
expected_execution_mode=benchmark.expected_execution_mode,
expected_complexity=benchmark.expected_complexity,
actual_skill=actual_skill,
actual_execution_mode=actual_execution_mode,
actual_status_code=actual_status_code,
actual_response_keys=actual_response_keys or [],
skill_correct=skill_correct,
execution_mode_correct=execution_mode_correct,
complexity_correct=complexity_correct,
task_succeeded=task_succeeded,
category=benchmark.category,
subcategory=benchmark.subcategory,
response_time_ms=response_time,
error_message=error_message,
)
self._observations.append(obs)
return obs
@property
def observations(self) -> list[CapabilityObservation]:
return self._observations
def get_observations_by_category(self, category: str) -> list[CapabilityObservation]:
return [o for o in self._observations if o.category == category]
def get_observations_by_subcategory(self, subcategory: str) -> list[CapabilityObservation]:
return [o for o in self._observations if o.subcategory == subcategory]
def get_original_observations(self) -> list[CapabilityObservation]:
"""Get non-paraphrase observations."""
return [o for o in self._observations if not o.is_paraphrase]
def get_paraphrase_observations(self) -> list[CapabilityObservation]:
"""Get paraphrase observations only."""
return [o for o in self._observations if o.is_paraphrase]
# ═══════════════════════════════════════════════════════════════════════════
# 3. Metrics Analyzer
# ═══════════════════════════════════════════════════════════════════════════
class MetricsAnalyzer:
"""Analyzes collected metrics to compute recall/precision/F1, overfitting, weaknesses."""
@staticmethod
def _safe_div(numerator: float, denominator: float) -> float:
return numerator / denominator if denominator > 0 else 0.0
@staticmethod
def compute_prf(tp: int, fp: int, fn: int) -> tuple[float, float, float]:
"""Compute precision, recall, F1 from counts."""
precision = MetricsAnalyzer._safe_div(tp, tp + fp)
recall = MetricsAnalyzer._safe_div(tp, tp + fn)
f1 = MetricsAnalyzer._safe_div(2 * precision * recall, precision + recall)
return precision, recall, f1
def analyze_category(
self, observations: list[CapabilityObservation], category: str, subcategory: str
) -> CategoryMetrics:
"""Compute aggregate metrics for a category/subcategory."""
filtered = [
o
for o in observations
if o.category == category and (not subcategory or o.subcategory == subcategory)
]
if not filtered:
return CategoryMetrics(category=category, subcategory=subcategory)
total = len(filtered)
skill_correct_count = sum(1 for o in filtered if o.skill_correct is True)
exec_correct_count = sum(1 for o in filtered if o.execution_mode_correct is True)
complexity_correct_count = sum(1 for o in filtered if o.complexity_correct is True)
task_success_count = sum(1 for o in filtered if o.task_succeeded)
avg_response_time = sum(o.response_time_ms for o in filtered) / total
# For skill routing: compute per-skill PRF
# TP = correctly routed to expected skill
# FP = routed to wrong skill
# FN = expected skill but not routed to it
tp = skill_correct_count
fp = sum(1 for o in filtered if o.skill_correct is False and o.actual_skill is not None)
fn = sum(1 for o in filtered if o.skill_correct is False and o.expected_skill is not None)
precision, recall, f1 = self.compute_prf(tp, fp, fn)
return CategoryMetrics(
category=category,
subcategory=subcategory,
total=total,
skill_correct=skill_correct_count,
skill_recall=round(recall, 4),
skill_precision=round(precision, 4),
skill_f1=round(f1, 4),
execution_mode_correct=exec_correct_count,
execution_mode_accuracy=round(self._safe_div(exec_correct_count, total), 4),
complexity_correct=complexity_correct_count,
complexity_accuracy=round(self._safe_div(complexity_correct_count, total), 4),
task_success_rate=round(self._safe_div(task_success_count, total), 4),
avg_response_time_ms=round(avg_response_time, 2),
)
def detect_overfitting(
self, observations: list[CapabilityObservation]
) -> tuple[list[OverfittingResult], float]:
"""Detect overfitting by comparing original vs paraphrase results.
Returns (overfitting_results, overall_overfitting_score).
overfitting_score = 0.0 means no overfitting (paraphrases work as well as originals).
overfitting_score = 1.0 means complete overfitting (originals correct, paraphrases all wrong).
"""
originals = {o.benchmark_id: o for o in observations if not o.is_paraphrase}
paraphrases: dict[str, list[CapabilityObservation]] = defaultdict(list)
for o in observations:
if o.is_paraphrase:
paraphrases[o.benchmark_id].append(o)
results: list[OverfittingResult] = []
total_inconsistency = 0.0
total_comparisons = 0
for bid, orig in originals.items():
paras = paraphrases.get(bid, [])
if not paras:
continue
orig_correct = orig.skill_correct is True
para_corrects = [p.skill_correct is True for p in paras]
# Consistency: how many paraphrases match the original result
matches = sum(1 for pc in para_corrects if pc == orig_correct)
consistency_rate = self._safe_div(matches, len(para_corrects))
# Overfitted: original correct but paraphrases mostly wrong
is_overfitted = orig_correct and consistency_rate < 0.5
results.append(
OverfittingResult(
benchmark_id=bid,
original_correct=orig_correct,
paraphrase_results=para_corrects,
consistency_rate=round(consistency_rate, 4),
is_overfitted=is_overfitted,
)
)
if orig_correct:
# Only count inconsistency when original was correct
total_inconsistency += 1.0 - consistency_rate
total_comparisons += 1
overfitting_score = self._safe_div(total_inconsistency, total_comparisons)
return results, round(overfitting_score, 4)
def identify_weaknesses(
self,
category_metrics: list[CategoryMetrics],
overfitting_results: list[OverfittingResult],
) -> list[WeaknessItem]:
"""Identify intelligence weaknesses based on metrics analysis."""
weaknesses: list[WeaknessItem] = []
for cm in category_metrics:
# Low skill F1
if cm.skill_f1 < 0.5 and cm.total >= 2:
weaknesses.append(
WeaknessItem(
dimension=cm.category,
subcategory=cm.subcategory,
severity="critical" if cm.skill_f1 < 0.3 else "high",
description=f"技能路由F1过低 ({cm.skill_f1:.2f}),子类别: {cm.subcategory}",
evidence=f"召回率={cm.skill_recall:.2%}, 精确率={cm.skill_precision:.2%}, 样本数={cm.total}",
suggestion="改进该子类别的关键词匹配或意图分类逻辑",
)
)
elif cm.skill_f1 < 0.8 and cm.total >= 2:
weaknesses.append(
WeaknessItem(
dimension=cm.category,
subcategory=cm.subcategory,
severity="medium",
description=f"技能路由F1偏低 ({cm.skill_f1:.2f}),子类别: {cm.subcategory}",
evidence=f"召回率={cm.skill_recall:.2%}, 精确率={cm.skill_precision:.2%}, 样本数={cm.total}",
suggestion="微调路由阈值或增加更多意图示例",
)
)
# Low execution mode accuracy
if cm.execution_mode_accuracy < 0.6 and cm.total >= 2:
weaknesses.append(
WeaknessItem(
dimension=cm.category,
subcategory=cm.subcategory,
severity="high" if cm.execution_mode_accuracy < 0.4 else "medium",
description=f"执行模式准确率过低 ({cm.execution_mode_accuracy:.2%}),子类别: {cm.subcategory}",
evidence=f"正确数={cm.execution_mode_correct}/{cm.total}",
suggestion="检查复杂度估算和模式选择逻辑",
)
)
# Low task success rate
if cm.task_success_rate < 0.8 and cm.total >= 2:
weaknesses.append(
WeaknessItem(
dimension=cm.category,
subcategory=cm.subcategory,
severity="critical" if cm.task_success_rate < 0.5 else "high",
description=f"任务成功率过低 ({cm.task_success_rate:.2%}),子类别: {cm.subcategory}",
evidence=f"成功数={int(cm.task_success_rate * cm.total)}/{cm.total}",
suggestion="排查该子类别的任务执行失败原因",
)
)
# Overfitting weaknesses
overfitted_cases = [r for r in overfitting_results if r.is_overfitted]
if overfitted_cases:
weaknesses.append(
WeaknessItem(
dimension="routing",
subcategory="overfitting",
severity="high",
description=f"检测到 {len(overfitted_cases)} 个用例存在过拟合",
evidence=f"过拟合用例: {', '.join(r.benchmark_id for r in overfitted_cases)}",
suggestion="增加更多样化的训练样本和同义改写,提升泛化能力",
)
)
# Sort by severity
severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
weaknesses.sort(key=lambda w: severity_order.get(w.severity, 99))
return weaknesses
# ═════════════════════════════════════════════════════════════════════
# Root Cause Analysis Engine
# ═════════════════════════════════════════════════════════════════════
def analyze_root_causes(
self,
observations: list[CapabilityObservation],
category_metrics: list[CategoryMetrics],
overfitting_results: list[OverfittingResult],
weaknesses: list[WeaknessItem],
) -> list[RootCause]:
"""Perform root cause analysis based on observation data.
Strategy:
1. For each weakness, examine the raw observations to find patterns
2. Cross-reference paraphrase vs original results for overfitting clues
3. Analyze error messages for common failure modes
4. Check recall vs precision imbalance to distinguish cause types
"""
root_causes: list[RootCause] = []
originals = [o for o in observations if not o.is_paraphrase]
paraphrases = [o for o in observations if o.is_paraphrase]
# --- Cause 1: Keyword gap (low recall = keywords not matching) ---
low_recall_cases = [
o
for o in originals
if o.skill_correct is False and o.expected_skill is not None and o.actual_skill is None
]
if low_recall_cases:
affected = [o.benchmark_id for o in low_recall_cases]
# Check if paraphrases also fail → confirms keyword gap
para_also_fail = sum(
1 for p in paraphrases if p.benchmark_id in affected and p.skill_correct is False
)
confidence = min(1.0, 0.5 + 0.1 * para_also_fail) if paraphrases else 0.6
root_causes.append(
RootCause(
cause_type="keyword_gap",
cause_description="关键词覆盖不足:用户输入无法匹配到目标技能的关键词",
confidence=round(confidence, 2),
affected_cases=affected[:10],
detail=(
f"{len(low_recall_cases)} 个原始输入未能路由到期望技能。"
f"改写输入中也有 {para_also_fail} 个失败,"
f"说明关键词库对同义表达的覆盖不足。"
f"受影响子类别: {', '.join(set(o.subcategory for o in low_recall_cases))}"
),
)
)
# --- Cause 2: Precision gap (wrong skill routed = intent ambiguous) ---
wrong_route_cases = [
o
for o in originals
if o.skill_correct is False
and o.actual_skill is not None
and o.expected_skill is not None
]
if wrong_route_cases:
affected = [o.benchmark_id for o in wrong_route_cases]
# Check which skills are being confused
confusion_pairs: dict[tuple[str, str], int] = defaultdict(int)
for o in wrong_route_cases:
confusion_pairs[(o.expected_skill, o.actual_skill)] += 1
top_confusions = sorted(confusion_pairs.items(), key=lambda x: -x[1])[:5]
confusion_detail = "; ".join(
f"{exp}{act}({cnt}次)" for (exp, act), cnt in top_confusions
)
root_causes.append(
RootCause(
cause_type="intent_ambiguous",
cause_description="意图歧义:不同技能的关键词/意图描述重叠,导致路由混淆",
confidence=0.7,
affected_cases=affected[:10],
detail=f"技能混淆对: {confusion_detail}",
)
)
# --- Cause 3: Complexity misjudge (execution mode wrong) ---
exec_wrong_cases = [o for o in originals if o.execution_mode_correct is False]
if exec_wrong_cases:
affected = [o.benchmark_id for o in exec_wrong_cases]
# Analyze direction of misjudgment
over_simplified = sum(
1
for o in exec_wrong_cases
if o.expected_complexity in ("high", "medium")
and o.actual_execution_mode == "direct"
)
over_complicated = sum(
1
for o in exec_wrong_cases
if o.expected_complexity == "low"
and o.actual_execution_mode in ("react", "rewoo", "reflexion")
)
direction = ""
if over_simplified > over_complicated:
direction = "倾向低估复杂度(将复杂任务误判为简单直接调用)"
elif over_complicated > over_simplified:
direction = "倾向高估复杂度(将简单任务误判为需要多步推理)"
else:
direction = "复杂度误判方向不明确,双向均有偏差"
root_causes.append(
RootCause(
cause_type="complexity_misjudge",
cause_description=f"复杂度估算偏差:{direction}",
confidence=0.75,
affected_cases=affected[:10],
detail=(
f"{len(exec_wrong_cases)} 个执行模式判断错误。"
f"低估复杂度 {over_simplified} 次,高估复杂度 {over_complicated} 次。"
f"受影响子类别: {', '.join(set(o.subcategory for o in exec_wrong_cases))}"
),
)
)
# --- Cause 4: Fallback missing (no skill matched, task failed) ---
fallback_fail_cases = [
o for o in originals if o.expected_skill is None and not o.task_succeeded
]
if fallback_fail_cases:
affected = [o.benchmark_id for o in fallback_fail_cases]
root_causes.append(
RootCause(
cause_type="fallback_missing",
cause_description="回退机制不足:无匹配技能时,直接聊天模式未能正常处理",
confidence=0.65,
affected_cases=affected[:10],
detail=(
f"{len(fallback_fail_cases)} 个无技能匹配的任务执行失败。"
f"错误信息: {'; '.join(set(o.error_message or 'N/A' for o in fallback_fail_cases[:5]))}"
),
)
)
# --- Cause 5: Overfit pattern (paraphrases fail while original succeeds) ---
overfitted = [r for r in overfitting_results if r.is_overfitted]
if overfitted:
affected = [r.benchmark_id for r in overfitted]
# Analyze what kind of paraphrases fail
para_fail_details: list[str] = []
for r in overfitted:
fail_count = sum(1 for ok in r.paraphrase_results if not ok)
para_fail_details.append(
f"{r.benchmark_id}({fail_count}/{len(r.paraphrase_results)}改写失败)"
)
root_causes.append(
RootCause(
cause_type="overfit_pattern",
cause_description="路由过拟合:对特定表述形式过度敏感,同义改写后路由失败",
confidence=0.85,
affected_cases=affected,
detail=(
f"{len(overfitted)} 个用例存在过拟合。"
f"详情: {'; '.join(para_fail_details)}"
f"说明路由逻辑对输入的具体措辞过于敏感,缺乏语义层面的泛化能力。"
),
)
)
# --- Cause 6: Quality threshold (task succeeded but output poor) ---
success_but_wrong = [o for o in originals if o.task_succeeded and o.skill_correct is False]
if len(success_but_wrong) >= 2:
affected = [o.benchmark_id for o in success_but_wrong]
root_causes.append(
RootCause(
cause_type="quality_threshold",
cause_description="质量门控阈值过低:任务虽成功完成但输出了错误结果",
confidence=0.6,
affected_cases=affected[:10],
detail=(
f"{len(success_but_wrong)} 个任务虽然HTTP成功但路由到了错误技能。"
f"质量门控未能拦截这些错误路由的结果。"
),
)
)
# --- Cause 7: Config error (HTTP errors) ---
error_cases = [o for o in originals if o.error_message and not o.task_succeeded]
if error_cases:
# Group by error pattern
error_patterns: dict[str, int] = defaultdict(int)
for o in error_cases:
# Simplify error message to pattern
msg = (o.error_message or "")[:80]
error_patterns[msg] += 1
top_errors = sorted(error_patterns.items(), key=lambda x: -x[1])[:3]
error_detail = "; ".join(f"{msg}({cnt}次)" for msg, cnt in top_errors)
root_causes.append(
RootCause(
cause_type="config_error",
cause_description="配置或服务端错误:请求处理过程中出现异常",
confidence=0.5,
affected_cases=[o.benchmark_id for o in error_cases[:10]],
detail=f"常见错误: {error_detail}",
)
)
# Sort by confidence
root_causes.sort(key=lambda rc: -rc.confidence)
return root_causes
# ═════════════════════════════════════════════════════════════════════
# Improvement Strategy Planner
# ═════════════════════════════════════════════════════════════════════
def plan_improvements(
self,
weaknesses: list[WeaknessItem],
root_causes: list[RootCause],
) -> list[ImprovementPlan]:
"""Generate improvement plans based on weaknesses and root causes."""
plans: list[ImprovementPlan] = []
action_counter = 0
# Map root causes by type for quick lookup
causes_by_type: dict[str, list[RootCause]] = defaultdict(list)
for rc in root_causes:
causes_by_type[rc.cause_type].append(rc)
# --- Plan for keyword_gap ---
if "keyword_gap" in causes_by_type:
cause = causes_by_type["keyword_gap"][0]
actions: list[ImprovementAction] = []
action_counter += 1
actions.append(
ImprovementAction(
action_id=f"ACT-{action_counter:03d}",
title="扩展技能关键词同义词库",
description=(
"为每个技能的 intent.keywords 添加更多同义词、近义词和用户常见表述。"
"重点补充中文变体、口语化表达和行业术语。"
),
target_module="configs/skills/*.yaml → intent.keywords",
priority="P0",
expected_impact=f"预计提升召回率 15~30%,影响 {len(cause.affected_cases)} 个用例",
effort="small",
related_causes=["keyword_gap"],
verification="重新运行E2E回测验证受影响用例的召回率提升",
)
)
action_counter += 1
actions.append(
ImprovementAction(
action_id=f"ACT-{action_counter:03d}",
title="引入语义相似度匹配Layer 1.5",
description=(
"在 CostAwareRouter 的 Layer 1.5 SemanticRouter 中,"
"使用向量嵌入计算用户输入与技能描述的语义相似度,"
"弥补关键词精确匹配的不足。"
),
target_module="src/agentkit/chat/skill_routing.py",
priority="P1",
expected_impact="预计提升召回率 20~40%,显著改善同义改写场景",
effort="large",
related_causes=["keyword_gap", "overfit_pattern"],
verification="运行过拟合检测回测,验证改写一致性提升至 >80%",
)
)
plans.append(
ImprovementPlan(
weakness_description=cause.cause_description,
root_causes=causes_by_type["keyword_gap"],
actions=actions,
overall_strategy=(
"短期:扩充关键词库(低成本高收益);"
"中期:引入语义匹配层(高成本高收益);"
"长期:基于用户真实查询日志持续优化关键词库"
),
)
)
# --- Plan for intent_ambiguous ---
if "intent_ambiguous" in causes_by_type:
cause = causes_by_type["intent_ambiguous"][0]
actions = []
action_counter += 1
actions.append(
ImprovementAction(
action_id=f"ACT-{action_counter:03d}",
title="为易混淆技能添加互斥关键词",
description=(
"在技能配置中为容易混淆的技能对添加互斥关键词disambiguation_keywords"
"当用户输入同时匹配多个技能时,优先选择包含互斥关键词的技能。"
),
target_module="configs/skills/*.yaml → intent.disambiguation_keywords",
priority="P1",
expected_impact="预计提升精确率 10~25%,减少技能混淆",
effort="small",
related_causes=["intent_ambiguous"],
verification="运行歧义消解回测,验证路由精确率提升",
)
)
action_counter += 1
actions.append(
ImprovementAction(
action_id=f"ACT-{action_counter:03d}",
title="实现LLM二次分类消歧",
description=(
"当 Layer 0/1 路由到多个候选技能时,"
"调用 LLM quick_classify 进行二次意图判断,"
"选择最匹配的技能。"
),
target_module="src/agentkit/chat/skill_routing.py → Layer 1",
priority="P2",
expected_impact="预计提升精确率 15~30%,但增加 ~500ms 延迟和 ~100 tokens",
effort="medium",
related_causes=["intent_ambiguous"],
verification="运行歧义消解回测,对比延迟和精确率变化",
)
)
plans.append(
ImprovementPlan(
weakness_description=cause.cause_description,
root_causes=causes_by_type["intent_ambiguous"],
actions=actions,
overall_strategy=(
"短期:添加互斥关键词消歧;"
"中期启用LLM二次分类"
"长期:训练专用意图分类模型替代规则匹配"
),
)
)
# --- Plan for complexity_misjudge ---
if "complexity_misjudge" in causes_by_type:
cause = causes_by_type["complexity_misjudge"][0]
actions = []
action_counter += 1
actions.append(
ImprovementAction(
action_id=f"ACT-{action_counter:03d}",
title="优化复杂度估算启发式规则",
description=(
"调整 HeuristicClassifier 的复杂度评分权重:"
"增加任务动词(分析/研究/设计)的权重,"
"降低简单问答动词(是什么/多少)的权重。"
),
target_module="src/agentkit/chat/skill_routing.py → HeuristicClassifier",
priority="P1",
expected_impact="预计提升执行模式准确率 10~20%",
effort="small",
related_causes=["complexity_misjudge"],
verification="运行执行模式回测,验证准确率提升",
)
)
action_counter += 1
actions.append(
ImprovementAction(
action_id=f"ACT-{action_counter:03d}",
title="引入任务复杂度校准数据集",
description=(
"收集标注了复杂度等级的真实用户查询,"
"构建校准数据集,定期评估和调整复杂度阈值。"
),
target_module="tests/e2e/benchmark_dataset.py",
priority="P2",
expected_impact="持续提升复杂度判断准确性",
effort="medium",
related_causes=["complexity_misjudge"],
verification="每次调整后运行回测对比前后F1变化",
)
)
plans.append(
ImprovementPlan(
weakness_description=cause.cause_description,
root_causes=causes_by_type["complexity_misjudge"],
actions=actions,
overall_strategy=(
"短期:调整启发式规则权重;"
"中期:构建复杂度校准数据集;"
"长期:训练复杂度评估模型替代规则"
),
)
)
# --- Plan for fallback_missing ---
if "fallback_missing" in causes_by_type:
cause = causes_by_type["fallback_missing"][0]
actions = []
action_counter += 1
actions.append(
ImprovementAction(
action_id=f"ACT-{action_counter:03d}",
title="增强DIRECT_CHAT回退路径",
description=(
"当无技能匹配时确保DIRECT_CHAT模式能正常处理请求"
"1) 检查默认Agent是否正确初始化"
"2) 确保无技能时不会触发空指针异常;"
"3) 添加友好的降级提示。"
),
target_module="src/agentkit/chat/skill_routing.py → _fallback_direct_chat",
priority="P0",
expected_impact="确保100%的请求都有回退处理,消除任务失败",
effort="small",
related_causes=["fallback_missing"],
verification="运行回退场景回测,验证所有无匹配请求均成功",
)
)
plans.append(
ImprovementPlan(
weakness_description=cause.cause_description,
root_causes=causes_by_type["fallback_missing"],
actions=actions,
overall_strategy=(
"短期:修复回退路径确保基本可用;"
"中期:优化回退模式的回答质量;"
"长期:基于用户反馈自动发现新技能需求"
),
)
)
# --- Plan for overfit_pattern ---
if "overfit_pattern" in causes_by_type:
cause = causes_by_type["overfit_pattern"][0]
actions = []
action_counter += 1
actions.append(
ImprovementAction(
action_id=f"ACT-{action_counter:03d}",
title="添加意图描述和示例intent.description + examples",
description=(
"为每个技能添加 intent.description自然语言描述和 intent.examples示例查询"
"使路由器能理解语义层面的意图,而不仅依赖关键词精确匹配。"
),
target_module="configs/skills/*.yaml → intent.description / intent.examples",
priority="P0",
expected_impact="预计提升改写一致性 20~40%",
effort="small",
related_causes=["overfit_pattern", "keyword_gap"],
verification="运行过拟合检测回测,验证改写一致性提升",
)
)
action_counter += 1
actions.append(
ImprovementAction(
action_id=f"ACT-{action_counter:03d}",
title="实现意图泛化测试CI",
description=(
"在CI中集成意图泛化回测每次修改路由逻辑或技能配置后"
"自动运行包含改写的回测用例,确保不引入新的过拟合。"
),
target_module=".github/workflows/ + tests/e2e/",
priority="P2",
expected_impact="防止过拟合回归,持续监控泛化能力",
effort="medium",
related_causes=["overfit_pattern"],
verification="CI流水线中自动运行回测并检查过拟合分数",
)
)
plans.append(
ImprovementPlan(
weakness_description=cause.cause_description,
root_causes=causes_by_type["overfit_pattern"],
actions=actions,
overall_strategy=(
"短期:补充意图描述和示例;"
"中期引入语义匹配同keyword_gap方案"
"长期建立意图泛化CI防线"
),
)
)
# --- Plan for quality_threshold ---
if "quality_threshold" in causes_by_type:
cause = causes_by_type["quality_threshold"][0]
actions = []
action_counter += 1
actions.append(
ImprovementAction(
action_id=f"ACT-{action_counter:03d}",
title="增强质量门控的技能匹配验证",
description=(
"在QualityGate中增加技能匹配验证"
"检查输出是否与路由到的技能的能力范围一致,"
"如果不一致则触发重试或降级。"
),
target_module="src/agentkit/quality/gate.py",
priority="P1",
expected_impact="减少错误路由导致的低质量输出",
effort="medium",
related_causes=["quality_threshold"],
verification="运行质量门控回测,验证错误路由拦截率",
)
)
plans.append(
ImprovementPlan(
weakness_description=cause.cause_description,
root_causes=causes_by_type["quality_threshold"],
actions=actions,
overall_strategy=(
"短期:增加技能匹配验证;"
"中期:引入输出质量评分模型;"
"长期:实现自动质量回归检测"
),
)
)
# --- Plan for config_error ---
if "config_error" in causes_by_type:
cause = causes_by_type["config_error"][0]
actions = []
action_counter += 1
actions.append(
ImprovementAction(
action_id=f"ACT-{action_counter:03d}",
title="修复服务端配置和异常处理",
description=(
"根据错误信息排查服务端配置问题:"
"1) 检查API路由注册是否完整"
"2) 增加输入校验和错误提示;"
"3) 确保所有异常都有友好的错误响应。"
),
target_module="src/agentkit/server/routes/",
priority="P0",
expected_impact="消除服务端错误,提升任务成功率",
effort="small",
related_causes=["config_error"],
verification="重新运行E2E回测验证HTTP错误率降低",
)
)
plans.append(
ImprovementPlan(
weakness_description=cause.cause_description,
root_causes=causes_by_type["config_error"],
actions=actions,
overall_strategy=(
"短期:修复已知配置错误;"
"中期:增加输入校验和错误处理;"
"长期:建立配置变更的自动化验证"
),
)
)
return plans
def analyze_alignment(self, observations: list[CapabilityObservation]) -> dict[str, Any]:
"""Analyze alignment guard and cascade detector metrics.
Returns a dict with:
- total_alignment_tests: number of observations in alignment category
- violation_count: total constraint violations
- violation_rate: ratio of tests with at least one violation
- cascade_alert_count: number of cascade alerts triggered
- cascade_alert_rate: ratio of tests that triggered cascade
- neg_constraint_pass_rate: pass rate for negative constraints
- pos_constraint_pass_rate: pass rate for positive constraints
"""
alignment_obs = [o for o in observations if o.category == "alignment"]
if not alignment_obs:
return {
"total_alignment_tests": 0,
"violation_count": 0,
"violation_rate": 0.0,
"cascade_alert_count": 0,
"cascade_alert_rate": 0.0,
"neg_constraint_pass_rate": 0.0,
"pos_constraint_pass_rate": 0.0,
}
total = len(alignment_obs)
with_violations = sum(1 for o in alignment_obs if o.alignment_violations > 0)
total_violations = sum(o.alignment_violations for o in alignment_obs)
with_cascade = sum(1 for o in alignment_obs if o.cascade_alert)
# Separate by subcategory for neg/pos constraint pass rates
neg_obs = [o for o in alignment_obs if o.subcategory == "negative_constraint"]
pos_obs = [o for o in alignment_obs if o.subcategory == "positive_constraint"]
neg_pass_rate = self._safe_div(
sum(1 for o in neg_obs if o.alignment_violations == 0),
len(neg_obs),
)
pos_pass_rate = self._safe_div(
sum(1 for o in pos_obs if o.alignment_violations == 0),
len(pos_obs),
)
return {
"total_alignment_tests": total,
"violation_count": total_violations,
"violation_rate": round(self._safe_div(with_violations, total), 4),
"cascade_alert_count": with_cascade,
"cascade_alert_rate": round(self._safe_div(with_cascade, total), 4),
"neg_constraint_pass_rate": round(neg_pass_rate, 4),
"pos_constraint_pass_rate": round(pos_pass_rate, 4),
}
def generate_report(self, collector: MetricsCollector) -> CapabilityReport:
"""Generate a full capability analysis report from collected observations."""
observations = collector.observations
originals = collector.get_original_observations()
# Compute overall metrics
total = len(originals)
if total > 0:
tp = sum(1 for o in originals if o.skill_correct is True)
fp = sum(
1 for o in originals if o.skill_correct is False and o.actual_skill is not None
)
fn = sum(
1 for o in originals if o.skill_correct is False and o.expected_skill is not None
)
overall_precision, overall_recall, overall_f1 = self.compute_prf(tp, fp, fn)
exec_correct = sum(1 for o in originals if o.execution_mode_correct is True)
overall_exec_accuracy = self._safe_div(exec_correct, total)
task_success = sum(1 for o in originals if o.task_succeeded)
overall_success_rate = self._safe_div(task_success, total)
else:
overall_precision = overall_recall = overall_f1 = 0.0
overall_exec_accuracy = overall_success_rate = 0.0
# Compute per-category metrics
categories: set[tuple[str, str]] = {(o.category, o.subcategory) for o in originals}
category_metrics = [
self.analyze_category(observations, cat, subcat) for cat, subcat in sorted(categories)
]
# Detect overfitting
overfitting_results, overfitting_score = self.detect_overfitting(observations)
# Identify weaknesses
weaknesses = self.identify_weaknesses(category_metrics, overfitting_results)
# Root cause analysis
root_causes = self.analyze_root_causes(
observations, category_metrics, overfitting_results, weaknesses
)
# Improvement strategy planning
improvement_plans = self.plan_improvements(weaknesses, root_causes)
return CapabilityReport(
generated_at=datetime.now(timezone.utc).isoformat(),
total_observations=len(observations),
overall_skill_recall=round(overall_recall, 4),
overall_skill_precision=round(overall_precision, 4),
overall_skill_f1=round(overall_f1, 4),
overall_execution_mode_accuracy=round(overall_exec_accuracy, 4),
overall_task_success_rate=round(overall_success_rate, 4),
category_metrics=category_metrics,
overfitting_results=overfitting_results,
overfitting_score=overfitting_score,
weaknesses=weaknesses,
root_causes=root_causes,
improvement_plans=improvement_plans,
raw_observations=observations,
)
# ═══════════════════════════════════════════════════════════════════════════
# 4. Metrics Reporter
# ═══════════════════════════════════════════════════════════════════════════
class MetricsReporter:
"""Generate human-readable and machine-readable reports."""
@staticmethod
def to_json(report: CapabilityReport, path: str) -> None:
"""Save report as JSON."""
with open(path, "w", encoding="utf-8") as f:
json.dump(report.model_dump(), f, ensure_ascii=False, indent=2)
@staticmethod
def to_text(report: CapabilityReport) -> str:
"""Generate plain-text summary report in Chinese."""
lines: list[str] = []
lines.append("=" * 72)
lines.append(" AgentKit 智能化能力分析报告")
lines.append(f" 生成时间: {report.generated_at}")
lines.append("=" * 72)
lines.append("")
# Overall metrics
lines.append("── 总体指标 ──────────────────────────────────────────────")
lines.append(f" 观测总数: {report.total_observations}")
lines.append(f" 技能路由召回率: {report.overall_skill_recall:.2%}")
lines.append(f" 技能路由精确率: {report.overall_skill_precision:.2%}")
lines.append(f" 技能路由F1: {report.overall_skill_f1:.2%}")
lines.append(f" 执行模式准确率: {report.overall_execution_mode_accuracy:.2%}")
lines.append(f" 任务成功率: {report.overall_task_success_rate:.2%}")
lines.append(f" 过拟合分数: {report.overfitting_score:.2%}")
lines.append("")
# Per-category breakdown
lines.append("── 分类明细 ──────────────────────────────────────────────")
for cm in report.category_metrics:
cat_label = {
"routing": "路由",
"execution": "执行",
"quality": "质量",
"team": "团队",
"consistency": "一致性",
}.get(cm.category, cm.category)
subcat_label = {
"keyword_match": "关键词匹配",
"explicit_prefix": "显式前缀",
"greeting": "问候语",
"identity": "身份识别",
"disambiguation": "歧义消解",
"fallback": "回退处理",
"complexity_low": "低复杂度",
"complexity_high": "高复杂度",
"intent_variant": "意图变体",
"direct_mode": "直接模式",
"react_mode": "ReAct模式",
"quality_gate": "质量门控",
"output_std": "输出标准化",
"explicit_team": "显式团队",
"deterministic": "确定性",
"overfitting": "过拟合",
}.get(cm.subcategory, cm.subcategory)
lines.append(f" [{cat_label}/{subcat_label}]")
lines.append(
f" 样本数={cm.total} 召回率={cm.skill_recall:.2%} "
f"精确率={cm.skill_precision:.2%} F1={cm.skill_f1:.2%}"
)
lines.append(
f" 执行模式准确率={cm.execution_mode_accuracy:.2%} "
f"成功率={cm.task_success_rate:.2%} "
f"平均耗时={cm.avg_response_time_ms:.0f}ms"
)
lines.append("")
# Overfitting analysis
if report.overfitting_results:
lines.append("── 过拟合分析 ────────────────────────────────────────────")
for r in report.overfitting_results:
status = "⚠ 过拟合" if r.is_overfitted else "✓ 正常"
orig_label = "" if r.original_correct else ""
lines.append(
f" [{status}] {r.benchmark_id}: "
f"原始输入={orig_label}, "
f"改写一致性={r.consistency_rate:.0%}"
)
lines.append("")
# Semantic router analysis
semantic_cats = [cm for cm in report.category_metrics if cm.category == "semantic_router"]
if semantic_cats:
lines.append("── 语义路由分析 ──────────────────────────────────────────")
for cm in semantic_cats:
lines.append(
f" [{cm.subcategory}] 样本数={cm.total} "
f"精确率={cm.skill_precision:.2%} F1={cm.skill_f1:.2%}"
)
lines.append("")
# Team routing analysis
team_cats = [cm for cm in report.category_metrics if cm.category == "team"]
if team_cats:
lines.append("── 团队路由分析 ──────────────────────────────────────────")
for cm in team_cats:
lines.append(
f" [{cm.subcategory}] 样本数={cm.total} "
f"成功率={cm.task_success_rate:.2%} "
f"执行模式准确率={cm.execution_mode_accuracy:.2%}"
)
lines.append("")
# Alignment guard analysis
alignment_obs = [o for o in report.raw_observations if o.category == "alignment"]
if alignment_obs:
analyzer = MetricsAnalyzer()
alignment_metrics = analyzer.analyze_alignment(report.raw_observations)
lines.append("── 对齐守卫分析 ──────────────────────────────────────────")
lines.append(f" 测试总数: {alignment_metrics['total_alignment_tests']}")
lines.append(f" 约束违规总数: {alignment_metrics['violation_count']}")
lines.append(f" 违规率: {alignment_metrics['violation_rate']:.2%}")
lines.append(
f" 否定约束通过率: {alignment_metrics['neg_constraint_pass_rate']:.2%}"
)
lines.append(
f" 肯定约束通过率: {alignment_metrics['pos_constraint_pass_rate']:.2%}"
)
lines.append(f" 级联告警次数: {alignment_metrics['cascade_alert_count']}")
lines.append(f" 级联告警率: {alignment_metrics['cascade_alert_rate']:.2%}")
lines.append("")
# Weakness analysis
if report.weaknesses:
lines.append("── 智能化短板识别 ────────────────────────────────────────")
for w in report.weaknesses:
icon = {"critical": "🔴", "high": "🟠", "medium": "🟡", "low": "🟢"}.get(
w.severity, ""
)
severity_label = {
"critical": "严重",
"high": "",
"medium": "",
"low": "",
}.get(w.severity, w.severity)
lines.append(f" {icon} [{severity_label}] {w.description}")
lines.append(f" 证据: {w.evidence}")
lines.append(f" 建议: {w.suggestion}")
lines.append("")
else:
lines.append("── 未检测到显著短板 ────────────────────────────────────")
lines.append("")
# Root cause analysis
if report.root_causes:
lines.append("── 根因分析 ──────────────────────────────────────────────")
cause_type_labels = {
"keyword_gap": "关键词覆盖不足",
"intent_ambiguous": "意图歧义",
"complexity_misjudge": "复杂度估算偏差",
"fallback_missing": "回退机制不足",
"overfit_pattern": "路由过拟合",
"quality_threshold": "质量门控阈值过低",
"config_error": "配置/服务端错误",
"tool_missing": "工具缺失",
}
for rc in report.root_causes:
type_label = cause_type_labels.get(rc.cause_type, rc.cause_type)
conf_bar = "" * int(rc.confidence * 10) + "" * (10 - int(rc.confidence * 10))
lines.append(f" ▸ [{type_label}] 置信度: {conf_bar} {rc.confidence:.0%}")
lines.append(f" 原因: {rc.cause_description}")
if rc.detail:
lines.append(f" 详情: {rc.detail}")
if rc.affected_cases:
lines.append(
f" 受影响用例: {', '.join(rc.affected_cases[:5])}"
f"{'...' if len(rc.affected_cases) > 5 else ''}"
)
lines.append("")
# Improvement strategy
if report.improvement_plans:
lines.append("── 改进策略规划 ──────────────────────────────────────────")
for i, plan in enumerate(report.improvement_plans, 1):
lines.append(f" ┌─ 策略 {i}: {plan.weakness_description}")
lines.append(f" │ 总体策略: {plan.overall_strategy}")
lines.append("")
for action in plan.actions:
priority_icon = {"P0": "🔴", "P1": "🟠", "P2": "🟡", "P3": "🟢"}.get(
action.priority, ""
)
effort_label = {"small": "", "medium": "", "large": ""}.get(
action.effort, action.effort
)
lines.append(f"{priority_icon} [{action.priority}] {action.title}")
lines.append(f" │ 目标模块: {action.target_module}")
lines.append(f" │ 具体操作: {action.description}")
lines.append(f" │ 预期影响: {action.expected_impact}")
lines.append(f" │ 工作量: {effort_label}")
lines.append(f" │ 验证方式: {action.verification}")
lines.append("")
lines.append(f"{'' * 60}")
lines.append("")
lines.append("=" * 72)
return "\n".join(lines)
@staticmethod
def save_report(report: CapabilityReport, output_dir: str) -> dict[str, str]:
"""Save both JSON and text reports. Returns paths to saved files."""
os.makedirs(output_dir, exist_ok=True)
json_path = os.path.join(output_dir, "capability_report.json")
text_path = os.path.join(output_dir, "capability_report.txt")
MetricsReporter.to_json(report, json_path)
with open(text_path, "w", encoding="utf-8") as f:
f.write(MetricsReporter.to_text(report))
return {"json": json_path, "text": text_path}