1509 lines
69 KiB
Python
1509 lines
69 KiB
Python
"""Agent Capability Metrics — Collection, Analysis, and Reporting.
|
||
|
||
Core components:
|
||
1. CapabilityMetrics: data model for a single test observation
|
||
2. MetricsCollector: session-scoped collector that gathers all observations
|
||
3. MetricsAnalyzer: computes recall/precision/F1, overfitting scores, weakness analysis
|
||
4. MetricsReporter: generates human-readable and machine-readable reports
|
||
|
||
Design:
|
||
- Collector is a pytest fixture (session-scoped), injected into capability tests
|
||
- Each test records what actually happened vs what was expected
|
||
- After all tests, analyzer computes aggregate metrics
|
||
- Reporter outputs JSON + plain-text summary
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import time
|
||
from collections import defaultdict
|
||
from datetime import datetime, timezone
|
||
from typing import Any
|
||
|
||
from pydantic import BaseModel, ConfigDict
|
||
|
||
from tests.e2e.benchmark_dataset import BenchmarkCase
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# 1. Data Models
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
|
||
class CapabilityObservation(BaseModel):
|
||
"""A single test observation: what was expected vs what actually happened."""
|
||
|
||
model_config = ConfigDict()
|
||
|
||
# Identity
|
||
benchmark_id: str
|
||
test_name: str
|
||
timestamp: str
|
||
|
||
# Input
|
||
input_query: str
|
||
is_paraphrase: bool = False # True if this is a paraphrase test (overfitting detection)
|
||
|
||
# Expected (ground truth)
|
||
expected_skill: str | None = None
|
||
expected_execution_mode: str = "direct"
|
||
expected_complexity: str = "low"
|
||
|
||
# Actual (observed)
|
||
actual_skill: str | None = None
|
||
actual_execution_mode: str | None = None
|
||
actual_status_code: int = 0
|
||
actual_response_keys: list[str] = []
|
||
actual_complexity_score: float | None = None
|
||
actual_match_method: str | None = None
|
||
actual_match_confidence: float | None = None
|
||
|
||
# Judgments
|
||
skill_correct: bool | None = None # None = couldn't determine
|
||
execution_mode_correct: bool | None = None
|
||
complexity_correct: bool | None = None
|
||
task_succeeded: bool = False # HTTP 200 + valid response
|
||
|
||
# Metadata
|
||
category: str = ""
|
||
subcategory: str = ""
|
||
response_time_ms: float = 0.0
|
||
error_message: str | None = None
|
||
|
||
# Alignment & Cascade fields (U5)
|
||
alignment_violations: int = 0 # Number of constraint violations detected
|
||
cascade_alert: bool = False # Whether a cascade alert was triggered
|
||
|
||
# L3 Output Quality fields
|
||
output_quality_score: float | None = None # 1-5 LLM-as-Judge score
|
||
output_quality_reasoning: str | None = None # Judge's reasoning
|
||
|
||
|
||
class OutputQualityObservation(BaseModel):
|
||
"""L3 output quality evaluation result."""
|
||
|
||
model_config = ConfigDict()
|
||
|
||
benchmark_id: str
|
||
input_query: str
|
||
expected_skill: str | None = None
|
||
actual_skill: str | None = None
|
||
quality_score: float = 0.0 # 1-5
|
||
reasoning: str = ""
|
||
evaluated: bool = False
|
||
|
||
|
||
class CategoryMetrics(BaseModel):
|
||
"""Aggregate metrics for a specific category/subcategory."""
|
||
|
||
model_config = ConfigDict()
|
||
|
||
category: str
|
||
subcategory: str
|
||
total: int = 0
|
||
skill_correct: int = 0
|
||
skill_recall: float = 0.0
|
||
skill_precision: float = 0.0
|
||
skill_f1: float = 0.0
|
||
execution_mode_correct: int = 0
|
||
execution_mode_accuracy: float = 0.0
|
||
complexity_correct: int = 0
|
||
complexity_accuracy: float = 0.0
|
||
task_success_rate: float = 0.0
|
||
avg_response_time_ms: float = 0.0
|
||
|
||
|
||
class OverfittingResult(BaseModel):
|
||
"""Overfitting detection result for a single benchmark case."""
|
||
|
||
model_config = ConfigDict()
|
||
|
||
benchmark_id: str
|
||
original_correct: bool
|
||
paraphrase_results: list[bool] # True = correct for each paraphrase
|
||
consistency_rate: float = 0.0 # % of paraphrases that match original result
|
||
is_overfitted: bool = False # True if original correct but paraphrases mostly wrong
|
||
|
||
|
||
class WeaknessItem(BaseModel):
|
||
"""A single identified weakness."""
|
||
|
||
model_config = ConfigDict()
|
||
|
||
dimension: str # routing / execution / quality / team / consistency
|
||
subcategory: str
|
||
severity: str # critical / high / medium / low
|
||
description: str
|
||
evidence: str
|
||
suggestion: str
|
||
|
||
|
||
class RootCause(BaseModel):
|
||
"""Root cause analysis for a weakness."""
|
||
|
||
model_config = ConfigDict()
|
||
|
||
cause_type: str # keyword_gap / complexity_misjudge / intent_ambiguous / fallback_missing / overfit_pattern / tool_missing / config_error / quality_threshold
|
||
cause_description: str
|
||
confidence: float = 0.0 # 0.0~1.0, how confident we are about this root cause
|
||
affected_cases: list[str] = [] # benchmark IDs affected by this cause
|
||
detail: str = "" # additional technical detail
|
||
|
||
|
||
class ImprovementAction(BaseModel):
|
||
"""A single actionable improvement step."""
|
||
|
||
model_config = ConfigDict()
|
||
|
||
action_id: str
|
||
title: str
|
||
description: str
|
||
target_module: str # which code module to modify
|
||
priority: str # P0 / P1 / P2 / P3
|
||
expected_impact: str # what improvement to expect
|
||
effort: str # small / medium / large
|
||
related_causes: list[str] = [] # cause_types this action addresses
|
||
verification: str = "" # how to verify the fix works
|
||
|
||
|
||
class ImprovementPlan(BaseModel):
|
||
"""Improvement plan for a specific weakness."""
|
||
|
||
model_config = ConfigDict()
|
||
|
||
weakness_description: str
|
||
root_causes: list[RootCause]
|
||
actions: list[ImprovementAction]
|
||
overall_strategy: str
|
||
|
||
|
||
class CapabilityReport(BaseModel):
|
||
"""Full capability analysis report."""
|
||
|
||
model_config = ConfigDict()
|
||
|
||
generated_at: str
|
||
total_observations: int
|
||
overall_skill_recall: float
|
||
overall_skill_precision: float
|
||
overall_skill_f1: float
|
||
overall_execution_mode_accuracy: float
|
||
overall_task_success_rate: float
|
||
category_metrics: list[CategoryMetrics]
|
||
overfitting_results: list[OverfittingResult]
|
||
overfitting_score: float # 0.0 = no overfitting, 1.0 = fully overfitted
|
||
weaknesses: list[WeaknessItem]
|
||
root_causes: list[RootCause]
|
||
improvement_plans: list[ImprovementPlan]
|
||
raw_observations: list[CapabilityObservation]
|
||
output_quality_evaluations: list[OutputQualityObservation] = []
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# 2. Metrics Collector
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
|
||
class MetricsCollector:
|
||
"""Collects capability observations during E2E test execution.
|
||
|
||
Usage in tests:
|
||
collector.record(observation)
|
||
collector.record_benchmark_result(benchmark, actual_skill, ...)
|
||
"""
|
||
|
||
def __init__(self) -> None:
|
||
self._observations: list[CapabilityObservation] = []
|
||
self._start_times: dict[str, float] = {}
|
||
|
||
def start_timer(self, benchmark_id: str) -> None:
|
||
self._start_times[benchmark_id] = time.monotonic()
|
||
|
||
def stop_timer(self, benchmark_id: str) -> float:
|
||
start = self._start_times.pop(benchmark_id, None)
|
||
if start is None:
|
||
return 0.0
|
||
return (time.monotonic() - start) * 1000 # ms
|
||
|
||
def record(self, observation: CapabilityObservation) -> None:
|
||
self._observations.append(observation)
|
||
|
||
def record_benchmark_result(
|
||
self,
|
||
benchmark: BenchmarkCase,
|
||
*,
|
||
test_name: str,
|
||
actual_skill: str | None = None,
|
||
actual_execution_mode: str | None = None,
|
||
actual_status_code: int = 0,
|
||
actual_response_keys: list[str] | None = None,
|
||
task_succeeded: bool = False,
|
||
is_paraphrase: bool = False,
|
||
error_message: str | None = None,
|
||
) -> CapabilityObservation:
|
||
"""Record a benchmark test result with automatic correctness judgment."""
|
||
response_time = self.stop_timer(benchmark.id)
|
||
|
||
# Judge skill correctness
|
||
skill_correct: bool | None = None
|
||
if benchmark.expected_skill is not None and actual_skill is not None:
|
||
skill_correct = actual_skill == benchmark.expected_skill
|
||
elif benchmark.expected_skill is None:
|
||
# Expected no specific skill, so any non-error is acceptable
|
||
skill_correct = actual_skill is None or task_succeeded
|
||
|
||
# Judge execution mode correctness
|
||
execution_mode_correct: bool | None = None
|
||
if actual_execution_mode is not None:
|
||
# Normalize both sides for comparison:
|
||
# actual: "skill_react" / "rewoo" / "direct_chat" etc.
|
||
# expected: "react" / "rewoo" / "direct" etc.
|
||
_MODE_EQUIVALENCE: dict[str, str] = {
|
||
"skill_react": "react",
|
||
"direct_chat": "direct",
|
||
"team_collab": "team_collab",
|
||
}
|
||
actual_norm = _MODE_EQUIVALENCE.get(actual_execution_mode, actual_execution_mode)
|
||
execution_mode_correct = actual_norm == benchmark.expected_execution_mode
|
||
|
||
# Judge complexity correctness (approximate: based on execution mode match)
|
||
complexity_correct: bool | None = None
|
||
if execution_mode_correct is not None:
|
||
complexity_correct = execution_mode_correct
|
||
|
||
obs = CapabilityObservation(
|
||
benchmark_id=benchmark.id,
|
||
test_name=test_name,
|
||
timestamp=datetime.now(timezone.utc).isoformat(),
|
||
input_query=benchmark.input,
|
||
is_paraphrase=is_paraphrase,
|
||
expected_skill=benchmark.expected_skill,
|
||
expected_execution_mode=benchmark.expected_execution_mode,
|
||
expected_complexity=benchmark.expected_complexity,
|
||
actual_skill=actual_skill,
|
||
actual_execution_mode=actual_execution_mode,
|
||
actual_status_code=actual_status_code,
|
||
actual_response_keys=actual_response_keys or [],
|
||
skill_correct=skill_correct,
|
||
execution_mode_correct=execution_mode_correct,
|
||
complexity_correct=complexity_correct,
|
||
task_succeeded=task_succeeded,
|
||
category=benchmark.category,
|
||
subcategory=benchmark.subcategory,
|
||
response_time_ms=response_time,
|
||
error_message=error_message,
|
||
)
|
||
self._observations.append(obs)
|
||
return obs
|
||
|
||
@property
|
||
def observations(self) -> list[CapabilityObservation]:
|
||
return self._observations
|
||
|
||
def get_observations_by_category(self, category: str) -> list[CapabilityObservation]:
|
||
return [o for o in self._observations if o.category == category]
|
||
|
||
def get_observations_by_subcategory(self, subcategory: str) -> list[CapabilityObservation]:
|
||
return [o for o in self._observations if o.subcategory == subcategory]
|
||
|
||
def get_original_observations(self) -> list[CapabilityObservation]:
|
||
"""Get non-paraphrase observations."""
|
||
return [o for o in self._observations if not o.is_paraphrase]
|
||
|
||
def get_paraphrase_observations(self) -> list[CapabilityObservation]:
|
||
"""Get paraphrase observations only."""
|
||
return [o for o in self._observations if o.is_paraphrase]
|
||
|
||
def evaluate_output_quality(
|
||
self, llm_gateway: Any
|
||
) -> list[OutputQualityObservation]:
|
||
"""L3 Output Quality Evaluation using LLM-as-Judge.
|
||
|
||
Evaluates only keyword_match and semantic_match categories.
|
||
Returns list of OutputQualityObservation with quality scores.
|
||
"""
|
||
results: list[OutputQualityObservation] = []
|
||
eval_categories = {"routing", "semantic_router"}
|
||
|
||
for obs in self._observations:
|
||
if obs.category not in eval_categories:
|
||
continue
|
||
if obs.actual_skill is None:
|
||
continue
|
||
if not obs.task_succeeded:
|
||
continue
|
||
|
||
prompt = (
|
||
f"评估以下Agent路由-执行结果的质量(1-5分)。\n\n"
|
||
f"用户输入: {obs.input_query}\n"
|
||
f"期望技能: {obs.expected_skill}\n"
|
||
f"实际路由技能: {obs.actual_skill}\n"
|
||
f"执行模式: {obs.actual_execution_mode}\n\n"
|
||
f"评分标准:\n"
|
||
f"1分: 完全错误的路由,输出与用户意图无关\n"
|
||
f"2分: 路由有偏差,输出部分相关但缺少关键内容\n"
|
||
f"3分: 路由基本正确,输出相关但不完整\n"
|
||
f"4分: 路由正确,输出完整且相关\n"
|
||
f"5分: 路由精准,输出完全匹配用户意图且质量优秀\n\n"
|
||
f"请只输出JSON: {{\"score\": <1-5>, \"reasoning\": \"<一句话理由>\"}}"
|
||
)
|
||
|
||
try:
|
||
import asyncio
|
||
|
||
response = asyncio.run(
|
||
llm_gateway.chat(
|
||
messages=[{"role": "user", "content": prompt}],
|
||
model="default",
|
||
temperature=0.0,
|
||
max_tokens=200,
|
||
)
|
||
)
|
||
content = response.get("content", "") if isinstance(response, dict) else str(response)
|
||
|
||
# Parse JSON from response
|
||
import re
|
||
|
||
json_match = re.search(r'\{[^}]+\}', content)
|
||
if json_match:
|
||
import json as _json
|
||
|
||
parsed = _json.loads(json_match.group())
|
||
score = float(parsed.get("score", 0))
|
||
reasoning = parsed.get("reasoning", "")
|
||
else:
|
||
score = 0.0
|
||
reasoning = f"Parse failed: {content[:100]}"
|
||
|
||
results.append(
|
||
OutputQualityObservation(
|
||
benchmark_id=obs.benchmark_id,
|
||
input_query=obs.input_query,
|
||
expected_skill=obs.expected_skill,
|
||
actual_skill=obs.actual_skill,
|
||
quality_score=max(1.0, min(5.0, score)),
|
||
reasoning=reasoning,
|
||
evaluated=True,
|
||
)
|
||
)
|
||
except Exception as e:
|
||
results.append(
|
||
OutputQualityObservation(
|
||
benchmark_id=obs.benchmark_id,
|
||
input_query=obs.input_query,
|
||
expected_skill=obs.expected_skill,
|
||
actual_skill=obs.actual_skill,
|
||
quality_score=0.0,
|
||
reasoning=f"Evaluation error: {e}",
|
||
evaluated=False,
|
||
)
|
||
)
|
||
|
||
return results
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# 3. Metrics Analyzer
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
|
||
class MetricsAnalyzer:
|
||
"""Analyzes collected metrics to compute recall/precision/F1, overfitting, weaknesses."""
|
||
|
||
@staticmethod
|
||
def _safe_div(numerator: float, denominator: float) -> float:
|
||
return numerator / denominator if denominator > 0 else 0.0
|
||
|
||
@staticmethod
|
||
def compute_prf(tp: int, fp: int, fn: int) -> tuple[float, float, float]:
|
||
"""Compute precision, recall, F1 from counts."""
|
||
precision = MetricsAnalyzer._safe_div(tp, tp + fp)
|
||
recall = MetricsAnalyzer._safe_div(tp, tp + fn)
|
||
f1 = MetricsAnalyzer._safe_div(2 * precision * recall, precision + recall)
|
||
return precision, recall, f1
|
||
|
||
def analyze_category(
|
||
self, observations: list[CapabilityObservation], category: str, subcategory: str
|
||
) -> CategoryMetrics:
|
||
"""Compute aggregate metrics for a category/subcategory."""
|
||
filtered = [
|
||
o
|
||
for o in observations
|
||
if o.category == category and (not subcategory or o.subcategory == subcategory)
|
||
]
|
||
if not filtered:
|
||
return CategoryMetrics(category=category, subcategory=subcategory)
|
||
|
||
total = len(filtered)
|
||
skill_correct_count = sum(1 for o in filtered if o.skill_correct is True)
|
||
exec_correct_count = sum(1 for o in filtered if o.execution_mode_correct is True)
|
||
complexity_correct_count = sum(1 for o in filtered if o.complexity_correct is True)
|
||
task_success_count = sum(1 for o in filtered if o.task_succeeded)
|
||
avg_response_time = sum(o.response_time_ms for o in filtered) / total
|
||
|
||
# For skill routing: compute per-skill PRF
|
||
# TP = correctly routed to expected skill
|
||
# FP = routed to wrong skill
|
||
# FN = expected skill but not routed to it
|
||
tp = skill_correct_count
|
||
fp = sum(1 for o in filtered if o.skill_correct is False and o.actual_skill is not None)
|
||
fn = sum(1 for o in filtered if o.skill_correct is False and o.expected_skill is not None)
|
||
precision, recall, f1 = self.compute_prf(tp, fp, fn)
|
||
|
||
return CategoryMetrics(
|
||
category=category,
|
||
subcategory=subcategory,
|
||
total=total,
|
||
skill_correct=skill_correct_count,
|
||
skill_recall=round(recall, 4),
|
||
skill_precision=round(precision, 4),
|
||
skill_f1=round(f1, 4),
|
||
execution_mode_correct=exec_correct_count,
|
||
execution_mode_accuracy=round(self._safe_div(exec_correct_count, total), 4),
|
||
complexity_correct=complexity_correct_count,
|
||
complexity_accuracy=round(self._safe_div(complexity_correct_count, total), 4),
|
||
task_success_rate=round(self._safe_div(task_success_count, total), 4),
|
||
avg_response_time_ms=round(avg_response_time, 2),
|
||
)
|
||
|
||
def detect_overfitting(
|
||
self, observations: list[CapabilityObservation]
|
||
) -> tuple[list[OverfittingResult], float]:
|
||
"""Detect overfitting by comparing original vs paraphrase results.
|
||
|
||
Returns (overfitting_results, overall_overfitting_score).
|
||
overfitting_score = 0.0 means no overfitting (paraphrases work as well as originals).
|
||
overfitting_score = 1.0 means complete overfitting (originals correct, paraphrases all wrong).
|
||
"""
|
||
originals = {o.benchmark_id: o for o in observations if not o.is_paraphrase}
|
||
paraphrases: dict[str, list[CapabilityObservation]] = defaultdict(list)
|
||
for o in observations:
|
||
if o.is_paraphrase:
|
||
paraphrases[o.benchmark_id].append(o)
|
||
|
||
results: list[OverfittingResult] = []
|
||
total_inconsistency = 0.0
|
||
total_comparisons = 0
|
||
|
||
for bid, orig in originals.items():
|
||
paras = paraphrases.get(bid, [])
|
||
if not paras:
|
||
continue
|
||
|
||
orig_correct = orig.skill_correct is True
|
||
para_corrects = [p.skill_correct is True for p in paras]
|
||
|
||
# Consistency: how many paraphrases match the original result
|
||
matches = sum(1 for pc in para_corrects if pc == orig_correct)
|
||
consistency_rate = self._safe_div(matches, len(para_corrects))
|
||
|
||
# Overfitted: original correct but paraphrases mostly wrong
|
||
is_overfitted = orig_correct and consistency_rate < 0.5
|
||
|
||
results.append(
|
||
OverfittingResult(
|
||
benchmark_id=bid,
|
||
original_correct=orig_correct,
|
||
paraphrase_results=para_corrects,
|
||
consistency_rate=round(consistency_rate, 4),
|
||
is_overfitted=is_overfitted,
|
||
)
|
||
)
|
||
|
||
if orig_correct:
|
||
# Only count inconsistency when original was correct
|
||
total_inconsistency += 1.0 - consistency_rate
|
||
total_comparisons += 1
|
||
|
||
overfitting_score = self._safe_div(total_inconsistency, total_comparisons)
|
||
return results, round(overfitting_score, 4)
|
||
|
||
def identify_weaknesses(
|
||
self,
|
||
category_metrics: list[CategoryMetrics],
|
||
overfitting_results: list[OverfittingResult],
|
||
) -> list[WeaknessItem]:
|
||
"""Identify intelligence weaknesses based on metrics analysis."""
|
||
weaknesses: list[WeaknessItem] = []
|
||
|
||
for cm in category_metrics:
|
||
# Low skill F1
|
||
if cm.skill_f1 < 0.5 and cm.total >= 2:
|
||
weaknesses.append(
|
||
WeaknessItem(
|
||
dimension=cm.category,
|
||
subcategory=cm.subcategory,
|
||
severity="critical" if cm.skill_f1 < 0.3 else "high",
|
||
description=f"技能路由F1过低 ({cm.skill_f1:.2f}),子类别: {cm.subcategory}",
|
||
evidence=f"召回率={cm.skill_recall:.2%}, 精确率={cm.skill_precision:.2%}, 样本数={cm.total}",
|
||
suggestion="改进该子类别的关键词匹配或意图分类逻辑",
|
||
)
|
||
)
|
||
elif cm.skill_f1 < 0.8 and cm.total >= 2:
|
||
weaknesses.append(
|
||
WeaknessItem(
|
||
dimension=cm.category,
|
||
subcategory=cm.subcategory,
|
||
severity="medium",
|
||
description=f"技能路由F1偏低 ({cm.skill_f1:.2f}),子类别: {cm.subcategory}",
|
||
evidence=f"召回率={cm.skill_recall:.2%}, 精确率={cm.skill_precision:.2%}, 样本数={cm.total}",
|
||
suggestion="微调路由阈值或增加更多意图示例",
|
||
)
|
||
)
|
||
|
||
# Low execution mode accuracy
|
||
if cm.execution_mode_accuracy < 0.6 and cm.total >= 2:
|
||
weaknesses.append(
|
||
WeaknessItem(
|
||
dimension=cm.category,
|
||
subcategory=cm.subcategory,
|
||
severity="high" if cm.execution_mode_accuracy < 0.4 else "medium",
|
||
description=f"执行模式准确率过低 ({cm.execution_mode_accuracy:.2%}),子类别: {cm.subcategory}",
|
||
evidence=f"正确数={cm.execution_mode_correct}/{cm.total}",
|
||
suggestion="检查复杂度估算和模式选择逻辑",
|
||
)
|
||
)
|
||
|
||
# Low task success rate
|
||
if cm.task_success_rate < 0.8 and cm.total >= 2:
|
||
weaknesses.append(
|
||
WeaknessItem(
|
||
dimension=cm.category,
|
||
subcategory=cm.subcategory,
|
||
severity="critical" if cm.task_success_rate < 0.5 else "high",
|
||
description=f"任务成功率过低 ({cm.task_success_rate:.2%}),子类别: {cm.subcategory}",
|
||
evidence=f"成功数={int(cm.task_success_rate * cm.total)}/{cm.total}",
|
||
suggestion="排查该子类别的任务执行失败原因",
|
||
)
|
||
)
|
||
|
||
# Overfitting weaknesses
|
||
overfitted_cases = [r for r in overfitting_results if r.is_overfitted]
|
||
if overfitted_cases:
|
||
weaknesses.append(
|
||
WeaknessItem(
|
||
dimension="routing",
|
||
subcategory="overfitting",
|
||
severity="high",
|
||
description=f"检测到 {len(overfitted_cases)} 个用例存在过拟合",
|
||
evidence=f"过拟合用例: {', '.join(r.benchmark_id for r in overfitted_cases)}",
|
||
suggestion="增加更多样化的训练样本和同义改写,提升泛化能力",
|
||
)
|
||
)
|
||
|
||
# Sort by severity
|
||
severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
|
||
weaknesses.sort(key=lambda w: severity_order.get(w.severity, 99))
|
||
|
||
return weaknesses
|
||
|
||
# ═════════════════════════════════════════════════════════════════════
|
||
# Root Cause Analysis Engine
|
||
# ═════════════════════════════════════════════════════════════════════
|
||
|
||
def analyze_root_causes(
|
||
self,
|
||
observations: list[CapabilityObservation],
|
||
category_metrics: list[CategoryMetrics],
|
||
overfitting_results: list[OverfittingResult],
|
||
weaknesses: list[WeaknessItem],
|
||
) -> list[RootCause]:
|
||
"""Perform root cause analysis based on observation data.
|
||
|
||
Strategy:
|
||
1. For each weakness, examine the raw observations to find patterns
|
||
2. Cross-reference paraphrase vs original results for overfitting clues
|
||
3. Analyze error messages for common failure modes
|
||
4. Check recall vs precision imbalance to distinguish cause types
|
||
"""
|
||
root_causes: list[RootCause] = []
|
||
originals = [o for o in observations if not o.is_paraphrase]
|
||
paraphrases = [o for o in observations if o.is_paraphrase]
|
||
|
||
# --- Cause 1: Keyword gap (low recall = keywords not matching) ---
|
||
low_recall_cases = [
|
||
o
|
||
for o in originals
|
||
if o.skill_correct is False and o.expected_skill is not None and o.actual_skill is None
|
||
]
|
||
if low_recall_cases:
|
||
affected = [o.benchmark_id for o in low_recall_cases]
|
||
# Check if paraphrases also fail → confirms keyword gap
|
||
para_also_fail = sum(
|
||
1 for p in paraphrases if p.benchmark_id in affected and p.skill_correct is False
|
||
)
|
||
confidence = min(1.0, 0.5 + 0.1 * para_also_fail) if paraphrases else 0.6
|
||
root_causes.append(
|
||
RootCause(
|
||
cause_type="keyword_gap",
|
||
cause_description="关键词覆盖不足:用户输入无法匹配到目标技能的关键词",
|
||
confidence=round(confidence, 2),
|
||
affected_cases=affected[:10],
|
||
detail=(
|
||
f"共 {len(low_recall_cases)} 个原始输入未能路由到期望技能。"
|
||
f"改写输入中也有 {para_also_fail} 个失败,"
|
||
f"说明关键词库对同义表达的覆盖不足。"
|
||
f"受影响子类别: {', '.join(set(o.subcategory for o in low_recall_cases))}"
|
||
),
|
||
)
|
||
)
|
||
|
||
# --- Cause 2: Precision gap (wrong skill routed = intent ambiguous) ---
|
||
wrong_route_cases = [
|
||
o
|
||
for o in originals
|
||
if o.skill_correct is False
|
||
and o.actual_skill is not None
|
||
and o.expected_skill is not None
|
||
]
|
||
if wrong_route_cases:
|
||
affected = [o.benchmark_id for o in wrong_route_cases]
|
||
# Check which skills are being confused
|
||
confusion_pairs: dict[tuple[str, str], int] = defaultdict(int)
|
||
for o in wrong_route_cases:
|
||
confusion_pairs[(o.expected_skill, o.actual_skill)] += 1
|
||
top_confusions = sorted(confusion_pairs.items(), key=lambda x: -x[1])[:5]
|
||
confusion_detail = "; ".join(
|
||
f"{exp}→{act}({cnt}次)" for (exp, act), cnt in top_confusions
|
||
)
|
||
root_causes.append(
|
||
RootCause(
|
||
cause_type="intent_ambiguous",
|
||
cause_description="意图歧义:不同技能的关键词/意图描述重叠,导致路由混淆",
|
||
confidence=0.7,
|
||
affected_cases=affected[:10],
|
||
detail=f"技能混淆对: {confusion_detail}",
|
||
)
|
||
)
|
||
|
||
# --- Cause 3: Complexity misjudge (execution mode wrong) ---
|
||
exec_wrong_cases = [o for o in originals if o.execution_mode_correct is False]
|
||
if exec_wrong_cases:
|
||
affected = [o.benchmark_id for o in exec_wrong_cases]
|
||
# Analyze direction of misjudgment
|
||
over_simplified = sum(
|
||
1
|
||
for o in exec_wrong_cases
|
||
if o.expected_complexity in ("high", "medium")
|
||
and o.actual_execution_mode == "direct"
|
||
)
|
||
over_complicated = sum(
|
||
1
|
||
for o in exec_wrong_cases
|
||
if o.expected_complexity == "low"
|
||
and o.actual_execution_mode in ("react", "rewoo", "reflexion")
|
||
)
|
||
direction = ""
|
||
if over_simplified > over_complicated:
|
||
direction = "倾向低估复杂度(将复杂任务误判为简单直接调用)"
|
||
elif over_complicated > over_simplified:
|
||
direction = "倾向高估复杂度(将简单任务误判为需要多步推理)"
|
||
else:
|
||
direction = "复杂度误判方向不明确,双向均有偏差"
|
||
|
||
root_causes.append(
|
||
RootCause(
|
||
cause_type="complexity_misjudge",
|
||
cause_description=f"复杂度估算偏差:{direction}",
|
||
confidence=0.75,
|
||
affected_cases=affected[:10],
|
||
detail=(
|
||
f"共 {len(exec_wrong_cases)} 个执行模式判断错误。"
|
||
f"低估复杂度 {over_simplified} 次,高估复杂度 {over_complicated} 次。"
|
||
f"受影响子类别: {', '.join(set(o.subcategory for o in exec_wrong_cases))}"
|
||
),
|
||
)
|
||
)
|
||
|
||
# --- Cause 4: Fallback missing (no skill matched, task failed) ---
|
||
fallback_fail_cases = [
|
||
o for o in originals if o.expected_skill is None and not o.task_succeeded
|
||
]
|
||
if fallback_fail_cases:
|
||
affected = [o.benchmark_id for o in fallback_fail_cases]
|
||
root_causes.append(
|
||
RootCause(
|
||
cause_type="fallback_missing",
|
||
cause_description="回退机制不足:无匹配技能时,直接聊天模式未能正常处理",
|
||
confidence=0.65,
|
||
affected_cases=affected[:10],
|
||
detail=(
|
||
f"共 {len(fallback_fail_cases)} 个无技能匹配的任务执行失败。"
|
||
f"错误信息: {'; '.join(set(o.error_message or 'N/A' for o in fallback_fail_cases[:5]))}"
|
||
),
|
||
)
|
||
)
|
||
|
||
# --- Cause 5: Overfit pattern (paraphrases fail while original succeeds) ---
|
||
overfitted = [r for r in overfitting_results if r.is_overfitted]
|
||
if overfitted:
|
||
affected = [r.benchmark_id for r in overfitted]
|
||
# Analyze what kind of paraphrases fail
|
||
para_fail_details: list[str] = []
|
||
for r in overfitted:
|
||
fail_count = sum(1 for ok in r.paraphrase_results if not ok)
|
||
para_fail_details.append(
|
||
f"{r.benchmark_id}({fail_count}/{len(r.paraphrase_results)}改写失败)"
|
||
)
|
||
root_causes.append(
|
||
RootCause(
|
||
cause_type="overfit_pattern",
|
||
cause_description="路由过拟合:对特定表述形式过度敏感,同义改写后路由失败",
|
||
confidence=0.85,
|
||
affected_cases=affected,
|
||
detail=(
|
||
f"共 {len(overfitted)} 个用例存在过拟合。"
|
||
f"详情: {'; '.join(para_fail_details)}。"
|
||
f"说明路由逻辑对输入的具体措辞过于敏感,缺乏语义层面的泛化能力。"
|
||
),
|
||
)
|
||
)
|
||
|
||
# --- Cause 6: Quality threshold (task succeeded but output poor) ---
|
||
success_but_wrong = [o for o in originals if o.task_succeeded and o.skill_correct is False]
|
||
if len(success_but_wrong) >= 2:
|
||
affected = [o.benchmark_id for o in success_but_wrong]
|
||
root_causes.append(
|
||
RootCause(
|
||
cause_type="quality_threshold",
|
||
cause_description="质量门控阈值过低:任务虽成功完成但输出了错误结果",
|
||
confidence=0.6,
|
||
affected_cases=affected[:10],
|
||
detail=(
|
||
f"共 {len(success_but_wrong)} 个任务虽然HTTP成功但路由到了错误技能。"
|
||
f"质量门控未能拦截这些错误路由的结果。"
|
||
),
|
||
)
|
||
)
|
||
|
||
# --- Cause 7: Config error (HTTP errors) ---
|
||
error_cases = [o for o in originals if o.error_message and not o.task_succeeded]
|
||
if error_cases:
|
||
# Group by error pattern
|
||
error_patterns: dict[str, int] = defaultdict(int)
|
||
for o in error_cases:
|
||
# Simplify error message to pattern
|
||
msg = (o.error_message or "")[:80]
|
||
error_patterns[msg] += 1
|
||
top_errors = sorted(error_patterns.items(), key=lambda x: -x[1])[:3]
|
||
error_detail = "; ".join(f"{msg}({cnt}次)" for msg, cnt in top_errors)
|
||
root_causes.append(
|
||
RootCause(
|
||
cause_type="config_error",
|
||
cause_description="配置或服务端错误:请求处理过程中出现异常",
|
||
confidence=0.5,
|
||
affected_cases=[o.benchmark_id for o in error_cases[:10]],
|
||
detail=f"常见错误: {error_detail}",
|
||
)
|
||
)
|
||
|
||
# Sort by confidence
|
||
root_causes.sort(key=lambda rc: -rc.confidence)
|
||
return root_causes
|
||
|
||
# ═════════════════════════════════════════════════════════════════════
|
||
# Improvement Strategy Planner
|
||
# ═════════════════════════════════════════════════════════════════════
|
||
|
||
def plan_improvements(
|
||
self,
|
||
weaknesses: list[WeaknessItem],
|
||
root_causes: list[RootCause],
|
||
) -> list[ImprovementPlan]:
|
||
"""Generate improvement plans based on weaknesses and root causes."""
|
||
plans: list[ImprovementPlan] = []
|
||
action_counter = 0
|
||
|
||
# Map root causes by type for quick lookup
|
||
causes_by_type: dict[str, list[RootCause]] = defaultdict(list)
|
||
for rc in root_causes:
|
||
causes_by_type[rc.cause_type].append(rc)
|
||
|
||
# --- Plan for keyword_gap ---
|
||
if "keyword_gap" in causes_by_type:
|
||
cause = causes_by_type["keyword_gap"][0]
|
||
actions: list[ImprovementAction] = []
|
||
action_counter += 1
|
||
actions.append(
|
||
ImprovementAction(
|
||
action_id=f"ACT-{action_counter:03d}",
|
||
title="扩展技能关键词同义词库",
|
||
description=(
|
||
"为每个技能的 intent.keywords 添加更多同义词、近义词和用户常见表述。"
|
||
"重点补充中文变体、口语化表达和行业术语。"
|
||
),
|
||
target_module="configs/skills/*.yaml → intent.keywords",
|
||
priority="P0",
|
||
expected_impact=f"预计提升召回率 15~30%,影响 {len(cause.affected_cases)} 个用例",
|
||
effort="small",
|
||
related_causes=["keyword_gap"],
|
||
verification="重新运行E2E回测,验证受影响用例的召回率提升",
|
||
)
|
||
)
|
||
action_counter += 1
|
||
actions.append(
|
||
ImprovementAction(
|
||
action_id=f"ACT-{action_counter:03d}",
|
||
title="引入语义相似度匹配(Layer 1.5)",
|
||
description=(
|
||
"在 CostAwareRouter 的 Layer 1.5 SemanticRouter 中,"
|
||
"使用向量嵌入计算用户输入与技能描述的语义相似度,"
|
||
"弥补关键词精确匹配的不足。"
|
||
),
|
||
target_module="src/agentkit/chat/skill_routing.py",
|
||
priority="P1",
|
||
expected_impact="预计提升召回率 20~40%,显著改善同义改写场景",
|
||
effort="large",
|
||
related_causes=["keyword_gap", "overfit_pattern"],
|
||
verification="运行过拟合检测回测,验证改写一致性提升至 >80%",
|
||
)
|
||
)
|
||
plans.append(
|
||
ImprovementPlan(
|
||
weakness_description=cause.cause_description,
|
||
root_causes=causes_by_type["keyword_gap"],
|
||
actions=actions,
|
||
overall_strategy=(
|
||
"短期:扩充关键词库(低成本高收益);"
|
||
"中期:引入语义匹配层(高成本高收益);"
|
||
"长期:基于用户真实查询日志持续优化关键词库"
|
||
),
|
||
)
|
||
)
|
||
|
||
# --- Plan for intent_ambiguous ---
|
||
if "intent_ambiguous" in causes_by_type:
|
||
cause = causes_by_type["intent_ambiguous"][0]
|
||
actions = []
|
||
action_counter += 1
|
||
actions.append(
|
||
ImprovementAction(
|
||
action_id=f"ACT-{action_counter:03d}",
|
||
title="为易混淆技能添加互斥关键词",
|
||
description=(
|
||
"在技能配置中为容易混淆的技能对添加互斥关键词(disambiguation_keywords),"
|
||
"当用户输入同时匹配多个技能时,优先选择包含互斥关键词的技能。"
|
||
),
|
||
target_module="configs/skills/*.yaml → intent.disambiguation_keywords",
|
||
priority="P1",
|
||
expected_impact="预计提升精确率 10~25%,减少技能混淆",
|
||
effort="small",
|
||
related_causes=["intent_ambiguous"],
|
||
verification="运行歧义消解回测,验证路由精确率提升",
|
||
)
|
||
)
|
||
action_counter += 1
|
||
actions.append(
|
||
ImprovementAction(
|
||
action_id=f"ACT-{action_counter:03d}",
|
||
title="实现LLM二次分类消歧",
|
||
description=(
|
||
"当 Layer 0/1 路由到多个候选技能时,"
|
||
"调用 LLM quick_classify 进行二次意图判断,"
|
||
"选择最匹配的技能。"
|
||
),
|
||
target_module="src/agentkit/chat/skill_routing.py → Layer 1",
|
||
priority="P2",
|
||
expected_impact="预计提升精确率 15~30%,但增加 ~500ms 延迟和 ~100 tokens",
|
||
effort="medium",
|
||
related_causes=["intent_ambiguous"],
|
||
verification="运行歧义消解回测,对比延迟和精确率变化",
|
||
)
|
||
)
|
||
plans.append(
|
||
ImprovementPlan(
|
||
weakness_description=cause.cause_description,
|
||
root_causes=causes_by_type["intent_ambiguous"],
|
||
actions=actions,
|
||
overall_strategy=(
|
||
"短期:添加互斥关键词消歧;"
|
||
"中期:启用LLM二次分类;"
|
||
"长期:训练专用意图分类模型替代规则匹配"
|
||
),
|
||
)
|
||
)
|
||
|
||
# --- Plan for complexity_misjudge ---
|
||
if "complexity_misjudge" in causes_by_type:
|
||
cause = causes_by_type["complexity_misjudge"][0]
|
||
actions = []
|
||
action_counter += 1
|
||
actions.append(
|
||
ImprovementAction(
|
||
action_id=f"ACT-{action_counter:03d}",
|
||
title="优化复杂度估算启发式规则",
|
||
description=(
|
||
"调整 HeuristicClassifier 的复杂度评分权重:"
|
||
"增加任务动词(分析/研究/设计)的权重,"
|
||
"降低简单问答动词(是什么/多少)的权重。"
|
||
),
|
||
target_module="src/agentkit/chat/skill_routing.py → HeuristicClassifier",
|
||
priority="P1",
|
||
expected_impact="预计提升执行模式准确率 10~20%",
|
||
effort="small",
|
||
related_causes=["complexity_misjudge"],
|
||
verification="运行执行模式回测,验证准确率提升",
|
||
)
|
||
)
|
||
action_counter += 1
|
||
actions.append(
|
||
ImprovementAction(
|
||
action_id=f"ACT-{action_counter:03d}",
|
||
title="引入任务复杂度校准数据集",
|
||
description=(
|
||
"收集标注了复杂度等级的真实用户查询,"
|
||
"构建校准数据集,定期评估和调整复杂度阈值。"
|
||
),
|
||
target_module="tests/e2e/benchmark_dataset.py",
|
||
priority="P2",
|
||
expected_impact="持续提升复杂度判断准确性",
|
||
effort="medium",
|
||
related_causes=["complexity_misjudge"],
|
||
verification="每次调整后运行回测,对比前后F1变化",
|
||
)
|
||
)
|
||
plans.append(
|
||
ImprovementPlan(
|
||
weakness_description=cause.cause_description,
|
||
root_causes=causes_by_type["complexity_misjudge"],
|
||
actions=actions,
|
||
overall_strategy=(
|
||
"短期:调整启发式规则权重;"
|
||
"中期:构建复杂度校准数据集;"
|
||
"长期:训练复杂度评估模型替代规则"
|
||
),
|
||
)
|
||
)
|
||
|
||
# --- Plan for fallback_missing ---
|
||
if "fallback_missing" in causes_by_type:
|
||
cause = causes_by_type["fallback_missing"][0]
|
||
actions = []
|
||
action_counter += 1
|
||
actions.append(
|
||
ImprovementAction(
|
||
action_id=f"ACT-{action_counter:03d}",
|
||
title="增强DIRECT_CHAT回退路径",
|
||
description=(
|
||
"当无技能匹配时,确保DIRECT_CHAT模式能正常处理请求:"
|
||
"1) 检查默认Agent是否正确初始化;"
|
||
"2) 确保无技能时不会触发空指针异常;"
|
||
"3) 添加友好的降级提示。"
|
||
),
|
||
target_module="src/agentkit/chat/skill_routing.py → _fallback_direct_chat",
|
||
priority="P0",
|
||
expected_impact="确保100%的请求都有回退处理,消除任务失败",
|
||
effort="small",
|
||
related_causes=["fallback_missing"],
|
||
verification="运行回退场景回测,验证所有无匹配请求均成功",
|
||
)
|
||
)
|
||
plans.append(
|
||
ImprovementPlan(
|
||
weakness_description=cause.cause_description,
|
||
root_causes=causes_by_type["fallback_missing"],
|
||
actions=actions,
|
||
overall_strategy=(
|
||
"短期:修复回退路径确保基本可用;"
|
||
"中期:优化回退模式的回答质量;"
|
||
"长期:基于用户反馈自动发现新技能需求"
|
||
),
|
||
)
|
||
)
|
||
|
||
# --- Plan for overfit_pattern ---
|
||
if "overfit_pattern" in causes_by_type:
|
||
cause = causes_by_type["overfit_pattern"][0]
|
||
actions = []
|
||
action_counter += 1
|
||
actions.append(
|
||
ImprovementAction(
|
||
action_id=f"ACT-{action_counter:03d}",
|
||
title="添加意图描述和示例(intent.description + examples)",
|
||
description=(
|
||
"为每个技能添加 intent.description(自然语言描述)和 intent.examples(示例查询),"
|
||
"使路由器能理解语义层面的意图,而不仅依赖关键词精确匹配。"
|
||
),
|
||
target_module="configs/skills/*.yaml → intent.description / intent.examples",
|
||
priority="P0",
|
||
expected_impact="预计提升改写一致性 20~40%",
|
||
effort="small",
|
||
related_causes=["overfit_pattern", "keyword_gap"],
|
||
verification="运行过拟合检测回测,验证改写一致性提升",
|
||
)
|
||
)
|
||
action_counter += 1
|
||
actions.append(
|
||
ImprovementAction(
|
||
action_id=f"ACT-{action_counter:03d}",
|
||
title="实现意图泛化测试CI",
|
||
description=(
|
||
"在CI中集成意图泛化回测:每次修改路由逻辑或技能配置后,"
|
||
"自动运行包含改写的回测用例,确保不引入新的过拟合。"
|
||
),
|
||
target_module=".github/workflows/ + tests/e2e/",
|
||
priority="P2",
|
||
expected_impact="防止过拟合回归,持续监控泛化能力",
|
||
effort="medium",
|
||
related_causes=["overfit_pattern"],
|
||
verification="CI流水线中自动运行回测并检查过拟合分数",
|
||
)
|
||
)
|
||
plans.append(
|
||
ImprovementPlan(
|
||
weakness_description=cause.cause_description,
|
||
root_causes=causes_by_type["overfit_pattern"],
|
||
actions=actions,
|
||
overall_strategy=(
|
||
"短期:补充意图描述和示例;"
|
||
"中期:引入语义匹配(同keyword_gap方案);"
|
||
"长期:建立意图泛化CI防线"
|
||
),
|
||
)
|
||
)
|
||
|
||
# --- Plan for quality_threshold ---
|
||
if "quality_threshold" in causes_by_type:
|
||
cause = causes_by_type["quality_threshold"][0]
|
||
actions = []
|
||
action_counter += 1
|
||
actions.append(
|
||
ImprovementAction(
|
||
action_id=f"ACT-{action_counter:03d}",
|
||
title="增强质量门控的技能匹配验证",
|
||
description=(
|
||
"在QualityGate中增加技能匹配验证:"
|
||
"检查输出是否与路由到的技能的能力范围一致,"
|
||
"如果不一致则触发重试或降级。"
|
||
),
|
||
target_module="src/agentkit/quality/gate.py",
|
||
priority="P1",
|
||
expected_impact="减少错误路由导致的低质量输出",
|
||
effort="medium",
|
||
related_causes=["quality_threshold"],
|
||
verification="运行质量门控回测,验证错误路由拦截率",
|
||
)
|
||
)
|
||
plans.append(
|
||
ImprovementPlan(
|
||
weakness_description=cause.cause_description,
|
||
root_causes=causes_by_type["quality_threshold"],
|
||
actions=actions,
|
||
overall_strategy=(
|
||
"短期:增加技能匹配验证;"
|
||
"中期:引入输出质量评分模型;"
|
||
"长期:实现自动质量回归检测"
|
||
),
|
||
)
|
||
)
|
||
|
||
# --- Plan for config_error ---
|
||
if "config_error" in causes_by_type:
|
||
cause = causes_by_type["config_error"][0]
|
||
actions = []
|
||
action_counter += 1
|
||
actions.append(
|
||
ImprovementAction(
|
||
action_id=f"ACT-{action_counter:03d}",
|
||
title="修复服务端配置和异常处理",
|
||
description=(
|
||
"根据错误信息排查服务端配置问题:"
|
||
"1) 检查API路由注册是否完整;"
|
||
"2) 增加输入校验和错误提示;"
|
||
"3) 确保所有异常都有友好的错误响应。"
|
||
),
|
||
target_module="src/agentkit/server/routes/",
|
||
priority="P0",
|
||
expected_impact="消除服务端错误,提升任务成功率",
|
||
effort="small",
|
||
related_causes=["config_error"],
|
||
verification="重新运行E2E回测,验证HTTP错误率降低",
|
||
)
|
||
)
|
||
plans.append(
|
||
ImprovementPlan(
|
||
weakness_description=cause.cause_description,
|
||
root_causes=causes_by_type["config_error"],
|
||
actions=actions,
|
||
overall_strategy=(
|
||
"短期:修复已知配置错误;"
|
||
"中期:增加输入校验和错误处理;"
|
||
"长期:建立配置变更的自动化验证"
|
||
),
|
||
)
|
||
)
|
||
|
||
return plans
|
||
|
||
def analyze_alignment(self, observations: list[CapabilityObservation]) -> dict[str, Any]:
|
||
"""Analyze alignment guard and cascade detector metrics.
|
||
|
||
Returns a dict with:
|
||
- total_alignment_tests: number of observations in alignment category
|
||
- violation_count: total constraint violations
|
||
- violation_rate: ratio of tests with at least one violation
|
||
- cascade_alert_count: number of cascade alerts triggered
|
||
- cascade_alert_rate: ratio of tests that triggered cascade
|
||
- neg_constraint_pass_rate: pass rate for negative constraints
|
||
- pos_constraint_pass_rate: pass rate for positive constraints
|
||
"""
|
||
alignment_obs = [o for o in observations if o.category == "alignment"]
|
||
if not alignment_obs:
|
||
return {
|
||
"total_alignment_tests": 0,
|
||
"violation_count": 0,
|
||
"violation_rate": 0.0,
|
||
"cascade_alert_count": 0,
|
||
"cascade_alert_rate": 0.0,
|
||
"neg_constraint_pass_rate": 0.0,
|
||
"pos_constraint_pass_rate": 0.0,
|
||
}
|
||
|
||
total = len(alignment_obs)
|
||
with_violations = sum(1 for o in alignment_obs if o.alignment_violations > 0)
|
||
total_violations = sum(o.alignment_violations for o in alignment_obs)
|
||
with_cascade = sum(1 for o in alignment_obs if o.cascade_alert)
|
||
|
||
# Separate by subcategory for neg/pos constraint pass rates
|
||
neg_obs = [o for o in alignment_obs if o.subcategory == "negative_constraint"]
|
||
pos_obs = [o for o in alignment_obs if o.subcategory == "positive_constraint"]
|
||
|
||
neg_pass_rate = self._safe_div(
|
||
sum(1 for o in neg_obs if o.alignment_violations == 0),
|
||
len(neg_obs),
|
||
)
|
||
pos_pass_rate = self._safe_div(
|
||
sum(1 for o in pos_obs if o.alignment_violations == 0),
|
||
len(pos_obs),
|
||
)
|
||
|
||
return {
|
||
"total_alignment_tests": total,
|
||
"violation_count": total_violations,
|
||
"violation_rate": round(self._safe_div(with_violations, total), 4),
|
||
"cascade_alert_count": with_cascade,
|
||
"cascade_alert_rate": round(self._safe_div(with_cascade, total), 4),
|
||
"neg_constraint_pass_rate": round(neg_pass_rate, 4),
|
||
"pos_constraint_pass_rate": round(pos_pass_rate, 4),
|
||
}
|
||
|
||
def generate_report(self, collector: MetricsCollector) -> CapabilityReport:
|
||
"""Generate a full capability analysis report from collected observations."""
|
||
observations = collector.observations
|
||
originals = collector.get_original_observations()
|
||
|
||
# Compute overall metrics
|
||
total = len(originals)
|
||
if total > 0:
|
||
tp = sum(1 for o in originals if o.skill_correct is True)
|
||
fp = sum(
|
||
1 for o in originals if o.skill_correct is False and o.actual_skill is not None
|
||
)
|
||
fn = sum(
|
||
1 for o in originals if o.skill_correct is False and o.expected_skill is not None
|
||
)
|
||
overall_precision, overall_recall, overall_f1 = self.compute_prf(tp, fp, fn)
|
||
|
||
exec_correct = sum(1 for o in originals if o.execution_mode_correct is True)
|
||
overall_exec_accuracy = self._safe_div(exec_correct, total)
|
||
|
||
task_success = sum(1 for o in originals if o.task_succeeded)
|
||
overall_success_rate = self._safe_div(task_success, total)
|
||
else:
|
||
overall_precision = overall_recall = overall_f1 = 0.0
|
||
overall_exec_accuracy = overall_success_rate = 0.0
|
||
|
||
# Compute per-category metrics
|
||
categories: set[tuple[str, str]] = {(o.category, o.subcategory) for o in originals}
|
||
category_metrics = [
|
||
self.analyze_category(observations, cat, subcat) for cat, subcat in sorted(categories)
|
||
]
|
||
|
||
# Detect overfitting
|
||
overfitting_results, overfitting_score = self.detect_overfitting(observations)
|
||
|
||
# Identify weaknesses
|
||
weaknesses = self.identify_weaknesses(category_metrics, overfitting_results)
|
||
|
||
# Root cause analysis
|
||
root_causes = self.analyze_root_causes(
|
||
observations, category_metrics, overfitting_results, weaknesses
|
||
)
|
||
|
||
# Improvement strategy planning
|
||
improvement_plans = self.plan_improvements(weaknesses, root_causes)
|
||
|
||
return CapabilityReport(
|
||
generated_at=datetime.now(timezone.utc).isoformat(),
|
||
total_observations=len(observations),
|
||
overall_skill_recall=round(overall_recall, 4),
|
||
overall_skill_precision=round(overall_precision, 4),
|
||
overall_skill_f1=round(overall_f1, 4),
|
||
overall_execution_mode_accuracy=round(overall_exec_accuracy, 4),
|
||
overall_task_success_rate=round(overall_success_rate, 4),
|
||
category_metrics=category_metrics,
|
||
overfitting_results=overfitting_results,
|
||
overfitting_score=overfitting_score,
|
||
weaknesses=weaknesses,
|
||
root_causes=root_causes,
|
||
improvement_plans=improvement_plans,
|
||
raw_observations=observations,
|
||
)
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
# 4. Metrics Reporter
|
||
# ═══════════════════════════════════════════════════════════════════════════
|
||
|
||
|
||
class MetricsReporter:
|
||
"""Generate human-readable and machine-readable reports."""
|
||
|
||
@staticmethod
|
||
def to_json(report: CapabilityReport, path: str) -> None:
|
||
"""Save report as JSON."""
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
json.dump(report.model_dump(), f, ensure_ascii=False, indent=2)
|
||
|
||
@staticmethod
|
||
def to_text(report: CapabilityReport) -> str:
|
||
"""Generate plain-text summary report in Chinese."""
|
||
lines: list[str] = []
|
||
|
||
lines.append("=" * 72)
|
||
lines.append(" AgentKit 智能化能力分析报告")
|
||
lines.append(f" 生成时间: {report.generated_at}")
|
||
lines.append("=" * 72)
|
||
lines.append("")
|
||
|
||
# Overall metrics
|
||
lines.append("── 总体指标 ──────────────────────────────────────────────")
|
||
lines.append(f" 观测总数: {report.total_observations}")
|
||
lines.append(f" 技能路由召回率: {report.overall_skill_recall:.2%}")
|
||
lines.append(f" 技能路由精确率: {report.overall_skill_precision:.2%}")
|
||
lines.append(f" 技能路由F1: {report.overall_skill_f1:.2%}")
|
||
lines.append(f" 执行模式准确率: {report.overall_execution_mode_accuracy:.2%}")
|
||
lines.append(f" 任务成功率: {report.overall_task_success_rate:.2%}")
|
||
lines.append(f" 过拟合分数: {report.overfitting_score:.2%}")
|
||
lines.append("")
|
||
|
||
# Per-category breakdown
|
||
lines.append("── 分类明细 ──────────────────────────────────────────────")
|
||
for cm in report.category_metrics:
|
||
cat_label = {
|
||
"routing": "路由",
|
||
"execution": "执行",
|
||
"quality": "质量",
|
||
"team": "团队",
|
||
"consistency": "一致性",
|
||
}.get(cm.category, cm.category)
|
||
subcat_label = {
|
||
"keyword_match": "关键词匹配",
|
||
"explicit_prefix": "显式前缀",
|
||
"greeting": "问候语",
|
||
"identity": "身份识别",
|
||
"disambiguation": "歧义消解",
|
||
"fallback": "回退处理",
|
||
"complexity_low": "低复杂度",
|
||
"complexity_high": "高复杂度",
|
||
"intent_variant": "意图变体",
|
||
"direct_mode": "直接模式",
|
||
"react_mode": "ReAct模式",
|
||
"quality_gate": "质量门控",
|
||
"output_std": "输出标准化",
|
||
"explicit_team": "显式团队",
|
||
"deterministic": "确定性",
|
||
"overfitting": "过拟合",
|
||
}.get(cm.subcategory, cm.subcategory)
|
||
lines.append(f" [{cat_label}/{subcat_label}]")
|
||
lines.append(
|
||
f" 样本数={cm.total} 召回率={cm.skill_recall:.2%} "
|
||
f"精确率={cm.skill_precision:.2%} F1={cm.skill_f1:.2%}"
|
||
)
|
||
lines.append(
|
||
f" 执行模式准确率={cm.execution_mode_accuracy:.2%} "
|
||
f"成功率={cm.task_success_rate:.2%} "
|
||
f"平均耗时={cm.avg_response_time_ms:.0f}ms"
|
||
)
|
||
lines.append("")
|
||
|
||
# Overfitting analysis
|
||
if report.overfitting_results:
|
||
lines.append("── 过拟合分析 ────────────────────────────────────────────")
|
||
for r in report.overfitting_results:
|
||
status = "⚠ 过拟合" if r.is_overfitted else "✓ 正常"
|
||
orig_label = "✓" if r.original_correct else "✗"
|
||
lines.append(
|
||
f" [{status}] {r.benchmark_id}: "
|
||
f"原始输入={orig_label}, "
|
||
f"改写一致性={r.consistency_rate:.0%}"
|
||
)
|
||
lines.append("")
|
||
|
||
# Semantic router analysis
|
||
semantic_cats = [cm for cm in report.category_metrics if cm.category == "semantic_router"]
|
||
if semantic_cats:
|
||
lines.append("── 语义路由分析 ──────────────────────────────────────────")
|
||
for cm in semantic_cats:
|
||
lines.append(
|
||
f" [{cm.subcategory}] 样本数={cm.total} "
|
||
f"精确率={cm.skill_precision:.2%} F1={cm.skill_f1:.2%}"
|
||
)
|
||
lines.append("")
|
||
|
||
# Team routing analysis
|
||
team_cats = [cm for cm in report.category_metrics if cm.category == "team"]
|
||
if team_cats:
|
||
lines.append("── 团队路由分析 ──────────────────────────────────────────")
|
||
for cm in team_cats:
|
||
lines.append(
|
||
f" [{cm.subcategory}] 样本数={cm.total} "
|
||
f"成功率={cm.task_success_rate:.2%} "
|
||
f"执行模式准确率={cm.execution_mode_accuracy:.2%}"
|
||
)
|
||
lines.append("")
|
||
|
||
# Alignment guard analysis
|
||
alignment_obs = [o for o in report.raw_observations if o.category == "alignment"]
|
||
if alignment_obs:
|
||
analyzer = MetricsAnalyzer()
|
||
alignment_metrics = analyzer.analyze_alignment(report.raw_observations)
|
||
lines.append("── 对齐守卫分析 ──────────────────────────────────────────")
|
||
lines.append(f" 测试总数: {alignment_metrics['total_alignment_tests']}")
|
||
lines.append(f" 约束违规总数: {alignment_metrics['violation_count']}")
|
||
lines.append(f" 违规率: {alignment_metrics['violation_rate']:.2%}")
|
||
lines.append(
|
||
f" 否定约束通过率: {alignment_metrics['neg_constraint_pass_rate']:.2%}"
|
||
)
|
||
lines.append(
|
||
f" 肯定约束通过率: {alignment_metrics['pos_constraint_pass_rate']:.2%}"
|
||
)
|
||
lines.append(f" 级联告警次数: {alignment_metrics['cascade_alert_count']}")
|
||
lines.append(f" 级联告警率: {alignment_metrics['cascade_alert_rate']:.2%}")
|
||
lines.append("")
|
||
|
||
# Weakness analysis
|
||
if report.weaknesses:
|
||
lines.append("── 智能化短板识别 ────────────────────────────────────────")
|
||
for w in report.weaknesses:
|
||
icon = {"critical": "🔴", "high": "🟠", "medium": "🟡", "low": "🟢"}.get(
|
||
w.severity, "⚪"
|
||
)
|
||
severity_label = {
|
||
"critical": "严重",
|
||
"high": "高",
|
||
"medium": "中",
|
||
"low": "低",
|
||
}.get(w.severity, w.severity)
|
||
lines.append(f" {icon} [{severity_label}] {w.description}")
|
||
lines.append(f" 证据: {w.evidence}")
|
||
lines.append(f" 建议: {w.suggestion}")
|
||
lines.append("")
|
||
else:
|
||
lines.append("── 未检测到显著短板 ────────────────────────────────────")
|
||
lines.append("")
|
||
|
||
# Root cause analysis
|
||
if report.root_causes:
|
||
lines.append("── 根因分析 ──────────────────────────────────────────────")
|
||
cause_type_labels = {
|
||
"keyword_gap": "关键词覆盖不足",
|
||
"intent_ambiguous": "意图歧义",
|
||
"complexity_misjudge": "复杂度估算偏差",
|
||
"fallback_missing": "回退机制不足",
|
||
"overfit_pattern": "路由过拟合",
|
||
"quality_threshold": "质量门控阈值过低",
|
||
"config_error": "配置/服务端错误",
|
||
"tool_missing": "工具缺失",
|
||
}
|
||
for rc in report.root_causes:
|
||
type_label = cause_type_labels.get(rc.cause_type, rc.cause_type)
|
||
conf_bar = "█" * int(rc.confidence * 10) + "░" * (10 - int(rc.confidence * 10))
|
||
lines.append(f" ▸ [{type_label}] 置信度: {conf_bar} {rc.confidence:.0%}")
|
||
lines.append(f" 原因: {rc.cause_description}")
|
||
if rc.detail:
|
||
lines.append(f" 详情: {rc.detail}")
|
||
if rc.affected_cases:
|
||
lines.append(
|
||
f" 受影响用例: {', '.join(rc.affected_cases[:5])}"
|
||
f"{'...' if len(rc.affected_cases) > 5 else ''}"
|
||
)
|
||
lines.append("")
|
||
|
||
# Improvement strategy
|
||
if report.improvement_plans:
|
||
lines.append("── 改进策略规划 ──────────────────────────────────────────")
|
||
for i, plan in enumerate(report.improvement_plans, 1):
|
||
lines.append(f" ┌─ 策略 {i}: {plan.weakness_description}")
|
||
lines.append(f" │ 总体策略: {plan.overall_strategy}")
|
||
lines.append(" │")
|
||
for action in plan.actions:
|
||
priority_icon = {"P0": "🔴", "P1": "🟠", "P2": "🟡", "P3": "🟢"}.get(
|
||
action.priority, "⚪"
|
||
)
|
||
effort_label = {"small": "小", "medium": "中", "large": "大"}.get(
|
||
action.effort, action.effort
|
||
)
|
||
lines.append(f" │ {priority_icon} [{action.priority}] {action.title}")
|
||
lines.append(f" │ 目标模块: {action.target_module}")
|
||
lines.append(f" │ 具体操作: {action.description}")
|
||
lines.append(f" │ 预期影响: {action.expected_impact}")
|
||
lines.append(f" │ 工作量: {effort_label}")
|
||
lines.append(f" │ 验证方式: {action.verification}")
|
||
lines.append(" │")
|
||
lines.append(f" └{'─' * 60}")
|
||
lines.append("")
|
||
|
||
# L3 Output Quality Evaluation
|
||
if report.output_quality_evaluations:
|
||
lines.append("── L3 输出质量评估 ──────────────────────────────────────────")
|
||
evaluated = [e for e in report.output_quality_evaluations if e.evaluated]
|
||
if evaluated:
|
||
avg_score = sum(e.quality_score for e in evaluated) / len(evaluated)
|
||
lines.append(f" 评估样本数: {len(evaluated)}")
|
||
lines.append(f" 平均质量评分: {avg_score:.2f}/5.0")
|
||
score_dist = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0}
|
||
for e in evaluated:
|
||
bucket = max(1, min(5, int(e.quality_score)))
|
||
score_dist[bucket] += 1
|
||
lines.append(f" 评分分布: 1分:{score_dist[1]} 2分:{score_dist[2]} 3分:{score_dist[3]} 4分:{score_dist[4]} 5分:{score_dist[5]}")
|
||
# Show some examples
|
||
lines.append("")
|
||
lines.append(" 样例:")
|
||
for e in evaluated[:5]:
|
||
lines.append(f" [{e.benchmark_id}] 评分={e.quality_score:.0f} 期望={e.expected_skill} 实际={e.actual_skill}")
|
||
if e.reasoning:
|
||
lines.append(f" 理由: {e.reasoning}")
|
||
else:
|
||
lines.append(" 无有效评估结果")
|
||
lines.append("")
|
||
|
||
# L5 Adaptive Capability (reuse overfitting consistency data)
|
||
if report.overfitting_results:
|
||
lines.append("── L5 自适应能力 ──────────────────────────────────────────")
|
||
consistency_rates = [r.consistency_rate for r in report.overfitting_results]
|
||
if consistency_rates:
|
||
avg_consistency = sum(consistency_rates) / len(consistency_rates)
|
||
lines.append(f" 测试组数: {len(consistency_rates)}")
|
||
lines.append(f" 平均自适应率: {avg_consistency:.2%}")
|
||
high_adapt = sum(1 for r in consistency_rates if r >= 0.8)
|
||
lines.append(f" 高自适应(>=80%): {high_adapt}/{len(consistency_rates)}")
|
||
lines.append("")
|
||
|
||
lines.append("=" * 72)
|
||
return "\n".join(lines)
|
||
|
||
@staticmethod
|
||
def save_report(report: CapabilityReport, output_dir: str) -> dict[str, str]:
|
||
"""Save both JSON and text reports. Returns paths to saved files."""
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
json_path = os.path.join(output_dir, "capability_report.json")
|
||
text_path = os.path.join(output_dir, "capability_report.txt")
|
||
|
||
MetricsReporter.to_json(report, json_path)
|
||
with open(text_path, "w", encoding="utf-8") as f:
|
||
f.write(MetricsReporter.to_text(report))
|
||
|
||
return {"json": json_path, "text": text_path}
|