"""Agent Capability Metrics — Collection, Analysis, and Reporting. Core components: 1. CapabilityMetrics: data model for a single test observation 2. MetricsCollector: session-scoped collector that gathers all observations 3. MetricsAnalyzer: computes recall/precision/F1, overfitting scores, weakness analysis 4. MetricsReporter: generates human-readable and machine-readable reports Design: - Collector is a pytest fixture (session-scoped), injected into capability tests - Each test records what actually happened vs what was expected - After all tests, analyzer computes aggregate metrics - Reporter outputs JSON + plain-text summary """ import json import os import time from collections import defaultdict from datetime import datetime, timezone from typing import Any from pydantic import BaseModel, ConfigDict from tests.e2e.benchmark_dataset import BenchmarkCase # ═══════════════════════════════════════════════════════════════════════════ # 1. Data Models # ═══════════════════════════════════════════════════════════════════════════ class CapabilityObservation(BaseModel): """A single test observation: what was expected vs what actually happened.""" model_config = ConfigDict() # Identity benchmark_id: str test_name: str timestamp: str # Input input_query: str is_paraphrase: bool = False # True if this is a paraphrase test (overfitting detection) # Expected (ground truth) expected_skill: str | None = None expected_execution_mode: str = "direct" expected_complexity: str = "low" # Actual (observed) actual_skill: str | None = None actual_execution_mode: str | None = None actual_status_code: int = 0 actual_response_keys: list[str] = [] actual_complexity_score: float | None = None actual_match_method: str | None = None actual_match_confidence: float | None = None # Judgments skill_correct: bool | None = None # None = couldn't determine execution_mode_correct: bool | None = None complexity_correct: bool | None = None task_succeeded: bool = False # HTTP 200 + valid response # Metadata category: str = "" subcategory: str = "" response_time_ms: float = 0.0 error_message: str | None = None # Alignment & Cascade fields (U5) alignment_violations: int = 0 # Number of constraint violations detected cascade_alert: bool = False # Whether a cascade alert was triggered # L3 Output Quality fields output_quality_score: float | None = None # 1-5 LLM-as-Judge score output_quality_reasoning: str | None = None # Judge's reasoning class OutputQualityObservation(BaseModel): """L3 output quality evaluation result.""" model_config = ConfigDict() benchmark_id: str input_query: str expected_skill: str | None = None actual_skill: str | None = None quality_score: float = 0.0 # 1-5 reasoning: str = "" evaluated: bool = False class CategoryMetrics(BaseModel): """Aggregate metrics for a specific category/subcategory.""" model_config = ConfigDict() category: str subcategory: str total: int = 0 skill_correct: int = 0 skill_recall: float = 0.0 skill_precision: float = 0.0 skill_f1: float = 0.0 execution_mode_correct: int = 0 execution_mode_accuracy: float = 0.0 complexity_correct: int = 0 complexity_accuracy: float = 0.0 task_success_rate: float = 0.0 avg_response_time_ms: float = 0.0 class OverfittingResult(BaseModel): """Overfitting detection result for a single benchmark case.""" model_config = ConfigDict() benchmark_id: str original_correct: bool paraphrase_results: list[bool] # True = correct for each paraphrase consistency_rate: float = 0.0 # % of paraphrases that match original result is_overfitted: bool = False # True if original correct but paraphrases mostly wrong class WeaknessItem(BaseModel): """A single identified weakness.""" model_config = ConfigDict() dimension: str # routing / execution / quality / team / consistency subcategory: str severity: str # critical / high / medium / low description: str evidence: str suggestion: str class RootCause(BaseModel): """Root cause analysis for a weakness.""" model_config = ConfigDict() cause_type: str # keyword_gap / complexity_misjudge / intent_ambiguous / fallback_missing / overfit_pattern / tool_missing / config_error / quality_threshold cause_description: str confidence: float = 0.0 # 0.0~1.0, how confident we are about this root cause affected_cases: list[str] = [] # benchmark IDs affected by this cause detail: str = "" # additional technical detail class ImprovementAction(BaseModel): """A single actionable improvement step.""" model_config = ConfigDict() action_id: str title: str description: str target_module: str # which code module to modify priority: str # P0 / P1 / P2 / P3 expected_impact: str # what improvement to expect effort: str # small / medium / large related_causes: list[str] = [] # cause_types this action addresses verification: str = "" # how to verify the fix works class ImprovementPlan(BaseModel): """Improvement plan for a specific weakness.""" model_config = ConfigDict() weakness_description: str root_causes: list[RootCause] actions: list[ImprovementAction] overall_strategy: str class CapabilityReport(BaseModel): """Full capability analysis report.""" model_config = ConfigDict() generated_at: str total_observations: int overall_skill_recall: float overall_skill_precision: float overall_skill_f1: float overall_execution_mode_accuracy: float overall_task_success_rate: float category_metrics: list[CategoryMetrics] overfitting_results: list[OverfittingResult] overfitting_score: float # 0.0 = no overfitting, 1.0 = fully overfitted weaknesses: list[WeaknessItem] root_causes: list[RootCause] improvement_plans: list[ImprovementPlan] raw_observations: list[CapabilityObservation] output_quality_evaluations: list[OutputQualityObservation] = [] # ═══════════════════════════════════════════════════════════════════════════ # 2. Metrics Collector # ═══════════════════════════════════════════════════════════════════════════ class MetricsCollector: """Collects capability observations during E2E test execution. Usage in tests: collector.record(observation) collector.record_benchmark_result(benchmark, actual_skill, ...) """ def __init__(self) -> None: self._observations: list[CapabilityObservation] = [] self._start_times: dict[str, float] = {} def start_timer(self, benchmark_id: str) -> None: self._start_times[benchmark_id] = time.monotonic() def stop_timer(self, benchmark_id: str) -> float: start = self._start_times.pop(benchmark_id, None) if start is None: return 0.0 return (time.monotonic() - start) * 1000 # ms def record(self, observation: CapabilityObservation) -> None: self._observations.append(observation) def record_benchmark_result( self, benchmark: BenchmarkCase, *, test_name: str, actual_skill: str | None = None, actual_execution_mode: str | None = None, actual_status_code: int = 0, actual_response_keys: list[str] | None = None, task_succeeded: bool = False, is_paraphrase: bool = False, error_message: str | None = None, ) -> CapabilityObservation: """Record a benchmark test result with automatic correctness judgment.""" response_time = self.stop_timer(benchmark.id) # Judge skill correctness skill_correct: bool | None = None if benchmark.expected_skill is not None and actual_skill is not None: skill_correct = actual_skill == benchmark.expected_skill elif benchmark.expected_skill is None: # Expected no specific skill, so any non-error is acceptable skill_correct = actual_skill is None or task_succeeded # Judge execution mode correctness execution_mode_correct: bool | None = None if actual_execution_mode is not None: # Normalize both sides for comparison: # actual: "skill_react" / "rewoo" / "direct_chat" etc. # expected: "react" / "rewoo" / "direct" etc. _MODE_EQUIVALENCE: dict[str, str] = { "skill_react": "react", "direct_chat": "direct", "team_collab": "team_collab", } actual_norm = _MODE_EQUIVALENCE.get(actual_execution_mode, actual_execution_mode) execution_mode_correct = actual_norm == benchmark.expected_execution_mode # Judge complexity correctness (approximate: based on execution mode match) complexity_correct: bool | None = None if execution_mode_correct is not None: complexity_correct = execution_mode_correct obs = CapabilityObservation( benchmark_id=benchmark.id, test_name=test_name, timestamp=datetime.now(timezone.utc).isoformat(), input_query=benchmark.input, is_paraphrase=is_paraphrase, expected_skill=benchmark.expected_skill, expected_execution_mode=benchmark.expected_execution_mode, expected_complexity=benchmark.expected_complexity, actual_skill=actual_skill, actual_execution_mode=actual_execution_mode, actual_status_code=actual_status_code, actual_response_keys=actual_response_keys or [], skill_correct=skill_correct, execution_mode_correct=execution_mode_correct, complexity_correct=complexity_correct, task_succeeded=task_succeeded, category=benchmark.category, subcategory=benchmark.subcategory, response_time_ms=response_time, error_message=error_message, ) self._observations.append(obs) return obs @property def observations(self) -> list[CapabilityObservation]: return self._observations def get_observations_by_category(self, category: str) -> list[CapabilityObservation]: return [o for o in self._observations if o.category == category] def get_observations_by_subcategory(self, subcategory: str) -> list[CapabilityObservation]: return [o for o in self._observations if o.subcategory == subcategory] def get_original_observations(self) -> list[CapabilityObservation]: """Get non-paraphrase observations.""" return [o for o in self._observations if not o.is_paraphrase] def get_paraphrase_observations(self) -> list[CapabilityObservation]: """Get paraphrase observations only.""" return [o for o in self._observations if o.is_paraphrase] def evaluate_output_quality( self, llm_gateway: Any ) -> list[OutputQualityObservation]: """L3 Output Quality Evaluation using LLM-as-Judge. Evaluates only keyword_match and semantic_match categories. Returns list of OutputQualityObservation with quality scores. """ results: list[OutputQualityObservation] = [] eval_categories = {"routing", "semantic_router"} for obs in self._observations: if obs.category not in eval_categories: continue if obs.actual_skill is None: continue if not obs.task_succeeded: continue prompt = ( f"评估以下Agent路由-执行结果的质量(1-5分)。\n\n" f"用户输入: {obs.input_query}\n" f"期望技能: {obs.expected_skill}\n" f"实际路由技能: {obs.actual_skill}\n" f"执行模式: {obs.actual_execution_mode}\n\n" f"评分标准:\n" f"1分: 完全错误的路由,输出与用户意图无关\n" f"2分: 路由有偏差,输出部分相关但缺少关键内容\n" f"3分: 路由基本正确,输出相关但不完整\n" f"4分: 路由正确,输出完整且相关\n" f"5分: 路由精准,输出完全匹配用户意图且质量优秀\n\n" f"请只输出JSON: {{\"score\": <1-5>, \"reasoning\": \"<一句话理由>\"}}" ) try: import asyncio response = asyncio.run( llm_gateway.chat( messages=[{"role": "user", "content": prompt}], model="default", temperature=0.0, max_tokens=200, ) ) content = response.get("content", "") if isinstance(response, dict) else str(response) # Parse JSON from response import re json_match = re.search(r'\{[^}]+\}', content) if json_match: import json as _json parsed = _json.loads(json_match.group()) score = float(parsed.get("score", 0)) reasoning = parsed.get("reasoning", "") else: score = 0.0 reasoning = f"Parse failed: {content[:100]}" results.append( OutputQualityObservation( benchmark_id=obs.benchmark_id, input_query=obs.input_query, expected_skill=obs.expected_skill, actual_skill=obs.actual_skill, quality_score=max(1.0, min(5.0, score)), reasoning=reasoning, evaluated=True, ) ) except Exception as e: results.append( OutputQualityObservation( benchmark_id=obs.benchmark_id, input_query=obs.input_query, expected_skill=obs.expected_skill, actual_skill=obs.actual_skill, quality_score=0.0, reasoning=f"Evaluation error: {e}", evaluated=False, ) ) return results # ═══════════════════════════════════════════════════════════════════════════ # 3. Metrics Analyzer # ═══════════════════════════════════════════════════════════════════════════ class MetricsAnalyzer: """Analyzes collected metrics to compute recall/precision/F1, overfitting, weaknesses.""" @staticmethod def _safe_div(numerator: float, denominator: float) -> float: return numerator / denominator if denominator > 0 else 0.0 @staticmethod def compute_prf(tp: int, fp: int, fn: int) -> tuple[float, float, float]: """Compute precision, recall, F1 from counts.""" precision = MetricsAnalyzer._safe_div(tp, tp + fp) recall = MetricsAnalyzer._safe_div(tp, tp + fn) f1 = MetricsAnalyzer._safe_div(2 * precision * recall, precision + recall) return precision, recall, f1 def analyze_category( self, observations: list[CapabilityObservation], category: str, subcategory: str ) -> CategoryMetrics: """Compute aggregate metrics for a category/subcategory.""" filtered = [ o for o in observations if o.category == category and (not subcategory or o.subcategory == subcategory) ] if not filtered: return CategoryMetrics(category=category, subcategory=subcategory) total = len(filtered) skill_correct_count = sum(1 for o in filtered if o.skill_correct is True) exec_correct_count = sum(1 for o in filtered if o.execution_mode_correct is True) complexity_correct_count = sum(1 for o in filtered if o.complexity_correct is True) task_success_count = sum(1 for o in filtered if o.task_succeeded) avg_response_time = sum(o.response_time_ms for o in filtered) / total # For skill routing: compute per-skill PRF # TP = correctly routed to expected skill # FP = routed to wrong skill # FN = expected skill but not routed to it tp = skill_correct_count fp = sum(1 for o in filtered if o.skill_correct is False and o.actual_skill is not None) fn = sum(1 for o in filtered if o.skill_correct is False and o.expected_skill is not None) precision, recall, f1 = self.compute_prf(tp, fp, fn) return CategoryMetrics( category=category, subcategory=subcategory, total=total, skill_correct=skill_correct_count, skill_recall=round(recall, 4), skill_precision=round(precision, 4), skill_f1=round(f1, 4), execution_mode_correct=exec_correct_count, execution_mode_accuracy=round(self._safe_div(exec_correct_count, total), 4), complexity_correct=complexity_correct_count, complexity_accuracy=round(self._safe_div(complexity_correct_count, total), 4), task_success_rate=round(self._safe_div(task_success_count, total), 4), avg_response_time_ms=round(avg_response_time, 2), ) def detect_overfitting( self, observations: list[CapabilityObservation] ) -> tuple[list[OverfittingResult], float]: """Detect overfitting by comparing original vs paraphrase results. Returns (overfitting_results, overall_overfitting_score). overfitting_score = 0.0 means no overfitting (paraphrases work as well as originals). overfitting_score = 1.0 means complete overfitting (originals correct, paraphrases all wrong). """ originals = {o.benchmark_id: o for o in observations if not o.is_paraphrase} paraphrases: dict[str, list[CapabilityObservation]] = defaultdict(list) for o in observations: if o.is_paraphrase: paraphrases[o.benchmark_id].append(o) results: list[OverfittingResult] = [] total_inconsistency = 0.0 total_comparisons = 0 for bid, orig in originals.items(): paras = paraphrases.get(bid, []) if not paras: continue orig_correct = orig.skill_correct is True para_corrects = [p.skill_correct is True for p in paras] # Consistency: how many paraphrases match the original result matches = sum(1 for pc in para_corrects if pc == orig_correct) consistency_rate = self._safe_div(matches, len(para_corrects)) # Overfitted: original correct but paraphrases mostly wrong is_overfitted = orig_correct and consistency_rate < 0.5 results.append( OverfittingResult( benchmark_id=bid, original_correct=orig_correct, paraphrase_results=para_corrects, consistency_rate=round(consistency_rate, 4), is_overfitted=is_overfitted, ) ) if orig_correct: # Only count inconsistency when original was correct total_inconsistency += 1.0 - consistency_rate total_comparisons += 1 overfitting_score = self._safe_div(total_inconsistency, total_comparisons) return results, round(overfitting_score, 4) def identify_weaknesses( self, category_metrics: list[CategoryMetrics], overfitting_results: list[OverfittingResult], ) -> list[WeaknessItem]: """Identify intelligence weaknesses based on metrics analysis.""" weaknesses: list[WeaknessItem] = [] for cm in category_metrics: # Low skill F1 if cm.skill_f1 < 0.5 and cm.total >= 2: weaknesses.append( WeaknessItem( dimension=cm.category, subcategory=cm.subcategory, severity="critical" if cm.skill_f1 < 0.3 else "high", description=f"技能路由F1过低 ({cm.skill_f1:.2f}),子类别: {cm.subcategory}", evidence=f"召回率={cm.skill_recall:.2%}, 精确率={cm.skill_precision:.2%}, 样本数={cm.total}", suggestion="改进该子类别的关键词匹配或意图分类逻辑", ) ) elif cm.skill_f1 < 0.8 and cm.total >= 2: weaknesses.append( WeaknessItem( dimension=cm.category, subcategory=cm.subcategory, severity="medium", description=f"技能路由F1偏低 ({cm.skill_f1:.2f}),子类别: {cm.subcategory}", evidence=f"召回率={cm.skill_recall:.2%}, 精确率={cm.skill_precision:.2%}, 样本数={cm.total}", suggestion="微调路由阈值或增加更多意图示例", ) ) # Low execution mode accuracy if cm.execution_mode_accuracy < 0.6 and cm.total >= 2: weaknesses.append( WeaknessItem( dimension=cm.category, subcategory=cm.subcategory, severity="high" if cm.execution_mode_accuracy < 0.4 else "medium", description=f"执行模式准确率过低 ({cm.execution_mode_accuracy:.2%}),子类别: {cm.subcategory}", evidence=f"正确数={cm.execution_mode_correct}/{cm.total}", suggestion="检查复杂度估算和模式选择逻辑", ) ) # Low task success rate if cm.task_success_rate < 0.8 and cm.total >= 2: weaknesses.append( WeaknessItem( dimension=cm.category, subcategory=cm.subcategory, severity="critical" if cm.task_success_rate < 0.5 else "high", description=f"任务成功率过低 ({cm.task_success_rate:.2%}),子类别: {cm.subcategory}", evidence=f"成功数={int(cm.task_success_rate * cm.total)}/{cm.total}", suggestion="排查该子类别的任务执行失败原因", ) ) # Overfitting weaknesses overfitted_cases = [r for r in overfitting_results if r.is_overfitted] if overfitted_cases: weaknesses.append( WeaknessItem( dimension="routing", subcategory="overfitting", severity="high", description=f"检测到 {len(overfitted_cases)} 个用例存在过拟合", evidence=f"过拟合用例: {', '.join(r.benchmark_id for r in overfitted_cases)}", suggestion="增加更多样化的训练样本和同义改写,提升泛化能力", ) ) # Sort by severity severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3} weaknesses.sort(key=lambda w: severity_order.get(w.severity, 99)) return weaknesses # ═════════════════════════════════════════════════════════════════════ # Root Cause Analysis Engine # ═════════════════════════════════════════════════════════════════════ def analyze_root_causes( self, observations: list[CapabilityObservation], category_metrics: list[CategoryMetrics], overfitting_results: list[OverfittingResult], weaknesses: list[WeaknessItem], ) -> list[RootCause]: """Perform root cause analysis based on observation data. Strategy: 1. For each weakness, examine the raw observations to find patterns 2. Cross-reference paraphrase vs original results for overfitting clues 3. Analyze error messages for common failure modes 4. Check recall vs precision imbalance to distinguish cause types """ root_causes: list[RootCause] = [] originals = [o for o in observations if not o.is_paraphrase] paraphrases = [o for o in observations if o.is_paraphrase] # --- Cause 1: Keyword gap (low recall = keywords not matching) --- low_recall_cases = [ o for o in originals if o.skill_correct is False and o.expected_skill is not None and o.actual_skill is None ] if low_recall_cases: affected = [o.benchmark_id for o in low_recall_cases] # Check if paraphrases also fail → confirms keyword gap para_also_fail = sum( 1 for p in paraphrases if p.benchmark_id in affected and p.skill_correct is False ) confidence = min(1.0, 0.5 + 0.1 * para_also_fail) if paraphrases else 0.6 root_causes.append( RootCause( cause_type="keyword_gap", cause_description="关键词覆盖不足:用户输入无法匹配到目标技能的关键词", confidence=round(confidence, 2), affected_cases=affected[:10], detail=( f"共 {len(low_recall_cases)} 个原始输入未能路由到期望技能。" f"改写输入中也有 {para_also_fail} 个失败," f"说明关键词库对同义表达的覆盖不足。" f"受影响子类别: {', '.join(set(o.subcategory for o in low_recall_cases))}" ), ) ) # --- Cause 2: Precision gap (wrong skill routed = intent ambiguous) --- wrong_route_cases = [ o for o in originals if o.skill_correct is False and o.actual_skill is not None and o.expected_skill is not None ] if wrong_route_cases: affected = [o.benchmark_id for o in wrong_route_cases] # Check which skills are being confused confusion_pairs: dict[tuple[str, str], int] = defaultdict(int) for o in wrong_route_cases: confusion_pairs[(o.expected_skill, o.actual_skill)] += 1 top_confusions = sorted(confusion_pairs.items(), key=lambda x: -x[1])[:5] confusion_detail = "; ".join( f"{exp}→{act}({cnt}次)" for (exp, act), cnt in top_confusions ) root_causes.append( RootCause( cause_type="intent_ambiguous", cause_description="意图歧义:不同技能的关键词/意图描述重叠,导致路由混淆", confidence=0.7, affected_cases=affected[:10], detail=f"技能混淆对: {confusion_detail}", ) ) # --- Cause 3: Complexity misjudge (execution mode wrong) --- exec_wrong_cases = [o for o in originals if o.execution_mode_correct is False] if exec_wrong_cases: affected = [o.benchmark_id for o in exec_wrong_cases] # Analyze direction of misjudgment over_simplified = sum( 1 for o in exec_wrong_cases if o.expected_complexity in ("high", "medium") and o.actual_execution_mode == "direct" ) over_complicated = sum( 1 for o in exec_wrong_cases if o.expected_complexity == "low" and o.actual_execution_mode in ("react", "rewoo", "reflexion") ) direction = "" if over_simplified > over_complicated: direction = "倾向低估复杂度(将复杂任务误判为简单直接调用)" elif over_complicated > over_simplified: direction = "倾向高估复杂度(将简单任务误判为需要多步推理)" else: direction = "复杂度误判方向不明确,双向均有偏差" root_causes.append( RootCause( cause_type="complexity_misjudge", cause_description=f"复杂度估算偏差:{direction}", confidence=0.75, affected_cases=affected[:10], detail=( f"共 {len(exec_wrong_cases)} 个执行模式判断错误。" f"低估复杂度 {over_simplified} 次,高估复杂度 {over_complicated} 次。" f"受影响子类别: {', '.join(set(o.subcategory for o in exec_wrong_cases))}" ), ) ) # --- Cause 4: Fallback missing (no skill matched, task failed) --- fallback_fail_cases = [ o for o in originals if o.expected_skill is None and not o.task_succeeded ] if fallback_fail_cases: affected = [o.benchmark_id for o in fallback_fail_cases] root_causes.append( RootCause( cause_type="fallback_missing", cause_description="回退机制不足:无匹配技能时,直接聊天模式未能正常处理", confidence=0.65, affected_cases=affected[:10], detail=( f"共 {len(fallback_fail_cases)} 个无技能匹配的任务执行失败。" f"错误信息: {'; '.join(set(o.error_message or 'N/A' for o in fallback_fail_cases[:5]))}" ), ) ) # --- Cause 5: Overfit pattern (paraphrases fail while original succeeds) --- overfitted = [r for r in overfitting_results if r.is_overfitted] if overfitted: affected = [r.benchmark_id for r in overfitted] # Analyze what kind of paraphrases fail para_fail_details: list[str] = [] for r in overfitted: fail_count = sum(1 for ok in r.paraphrase_results if not ok) para_fail_details.append( f"{r.benchmark_id}({fail_count}/{len(r.paraphrase_results)}改写失败)" ) root_causes.append( RootCause( cause_type="overfit_pattern", cause_description="路由过拟合:对特定表述形式过度敏感,同义改写后路由失败", confidence=0.85, affected_cases=affected, detail=( f"共 {len(overfitted)} 个用例存在过拟合。" f"详情: {'; '.join(para_fail_details)}。" f"说明路由逻辑对输入的具体措辞过于敏感,缺乏语义层面的泛化能力。" ), ) ) # --- Cause 6: Quality threshold (task succeeded but output poor) --- success_but_wrong = [o for o in originals if o.task_succeeded and o.skill_correct is False] if len(success_but_wrong) >= 2: affected = [o.benchmark_id for o in success_but_wrong] root_causes.append( RootCause( cause_type="quality_threshold", cause_description="质量门控阈值过低:任务虽成功完成但输出了错误结果", confidence=0.6, affected_cases=affected[:10], detail=( f"共 {len(success_but_wrong)} 个任务虽然HTTP成功但路由到了错误技能。" f"质量门控未能拦截这些错误路由的结果。" ), ) ) # --- Cause 7: Config error (HTTP errors) --- error_cases = [o for o in originals if o.error_message and not o.task_succeeded] if error_cases: # Group by error pattern error_patterns: dict[str, int] = defaultdict(int) for o in error_cases: # Simplify error message to pattern msg = (o.error_message or "")[:80] error_patterns[msg] += 1 top_errors = sorted(error_patterns.items(), key=lambda x: -x[1])[:3] error_detail = "; ".join(f"{msg}({cnt}次)" for msg, cnt in top_errors) root_causes.append( RootCause( cause_type="config_error", cause_description="配置或服务端错误:请求处理过程中出现异常", confidence=0.5, affected_cases=[o.benchmark_id for o in error_cases[:10]], detail=f"常见错误: {error_detail}", ) ) # Sort by confidence root_causes.sort(key=lambda rc: -rc.confidence) return root_causes # ═════════════════════════════════════════════════════════════════════ # Improvement Strategy Planner # ═════════════════════════════════════════════════════════════════════ def plan_improvements( self, weaknesses: list[WeaknessItem], root_causes: list[RootCause], ) -> list[ImprovementPlan]: """Generate improvement plans based on weaknesses and root causes.""" plans: list[ImprovementPlan] = [] action_counter = 0 # Map root causes by type for quick lookup causes_by_type: dict[str, list[RootCause]] = defaultdict(list) for rc in root_causes: causes_by_type[rc.cause_type].append(rc) # --- Plan for keyword_gap --- if "keyword_gap" in causes_by_type: cause = causes_by_type["keyword_gap"][0] actions: list[ImprovementAction] = [] action_counter += 1 actions.append( ImprovementAction( action_id=f"ACT-{action_counter:03d}", title="扩展技能关键词同义词库", description=( "为每个技能的 intent.keywords 添加更多同义词、近义词和用户常见表述。" "重点补充中文变体、口语化表达和行业术语。" ), target_module="configs/skills/*.yaml → intent.keywords", priority="P0", expected_impact=f"预计提升召回率 15~30%,影响 {len(cause.affected_cases)} 个用例", effort="small", related_causes=["keyword_gap"], verification="重新运行E2E回测,验证受影响用例的召回率提升", ) ) action_counter += 1 actions.append( ImprovementAction( action_id=f"ACT-{action_counter:03d}", title="引入语义相似度匹配(Layer 1.5)", description=( "在 CostAwareRouter 的 Layer 1.5 SemanticRouter 中," "使用向量嵌入计算用户输入与技能描述的语义相似度," "弥补关键词精确匹配的不足。" ), target_module="src/agentkit/chat/skill_routing.py", priority="P1", expected_impact="预计提升召回率 20~40%,显著改善同义改写场景", effort="large", related_causes=["keyword_gap", "overfit_pattern"], verification="运行过拟合检测回测,验证改写一致性提升至 >80%", ) ) plans.append( ImprovementPlan( weakness_description=cause.cause_description, root_causes=causes_by_type["keyword_gap"], actions=actions, overall_strategy=( "短期:扩充关键词库(低成本高收益);" "中期:引入语义匹配层(高成本高收益);" "长期:基于用户真实查询日志持续优化关键词库" ), ) ) # --- Plan for intent_ambiguous --- if "intent_ambiguous" in causes_by_type: cause = causes_by_type["intent_ambiguous"][0] actions = [] action_counter += 1 actions.append( ImprovementAction( action_id=f"ACT-{action_counter:03d}", title="为易混淆技能添加互斥关键词", description=( "在技能配置中为容易混淆的技能对添加互斥关键词(disambiguation_keywords)," "当用户输入同时匹配多个技能时,优先选择包含互斥关键词的技能。" ), target_module="configs/skills/*.yaml → intent.disambiguation_keywords", priority="P1", expected_impact="预计提升精确率 10~25%,减少技能混淆", effort="small", related_causes=["intent_ambiguous"], verification="运行歧义消解回测,验证路由精确率提升", ) ) action_counter += 1 actions.append( ImprovementAction( action_id=f"ACT-{action_counter:03d}", title="实现LLM二次分类消歧", description=( "当 Layer 0/1 路由到多个候选技能时," "调用 LLM quick_classify 进行二次意图判断," "选择最匹配的技能。" ), target_module="src/agentkit/chat/skill_routing.py → Layer 1", priority="P2", expected_impact="预计提升精确率 15~30%,但增加 ~500ms 延迟和 ~100 tokens", effort="medium", related_causes=["intent_ambiguous"], verification="运行歧义消解回测,对比延迟和精确率变化", ) ) plans.append( ImprovementPlan( weakness_description=cause.cause_description, root_causes=causes_by_type["intent_ambiguous"], actions=actions, overall_strategy=( "短期:添加互斥关键词消歧;" "中期:启用LLM二次分类;" "长期:训练专用意图分类模型替代规则匹配" ), ) ) # --- Plan for complexity_misjudge --- if "complexity_misjudge" in causes_by_type: cause = causes_by_type["complexity_misjudge"][0] actions = [] action_counter += 1 actions.append( ImprovementAction( action_id=f"ACT-{action_counter:03d}", title="优化复杂度估算启发式规则", description=( "调整 HeuristicClassifier 的复杂度评分权重:" "增加任务动词(分析/研究/设计)的权重," "降低简单问答动词(是什么/多少)的权重。" ), target_module="src/agentkit/chat/skill_routing.py → HeuristicClassifier", priority="P1", expected_impact="预计提升执行模式准确率 10~20%", effort="small", related_causes=["complexity_misjudge"], verification="运行执行模式回测,验证准确率提升", ) ) action_counter += 1 actions.append( ImprovementAction( action_id=f"ACT-{action_counter:03d}", title="引入任务复杂度校准数据集", description=( "收集标注了复杂度等级的真实用户查询," "构建校准数据集,定期评估和调整复杂度阈值。" ), target_module="tests/e2e/benchmark_dataset.py", priority="P2", expected_impact="持续提升复杂度判断准确性", effort="medium", related_causes=["complexity_misjudge"], verification="每次调整后运行回测,对比前后F1变化", ) ) plans.append( ImprovementPlan( weakness_description=cause.cause_description, root_causes=causes_by_type["complexity_misjudge"], actions=actions, overall_strategy=( "短期:调整启发式规则权重;" "中期:构建复杂度校准数据集;" "长期:训练复杂度评估模型替代规则" ), ) ) # --- Plan for fallback_missing --- if "fallback_missing" in causes_by_type: cause = causes_by_type["fallback_missing"][0] actions = [] action_counter += 1 actions.append( ImprovementAction( action_id=f"ACT-{action_counter:03d}", title="增强DIRECT_CHAT回退路径", description=( "当无技能匹配时,确保DIRECT_CHAT模式能正常处理请求:" "1) 检查默认Agent是否正确初始化;" "2) 确保无技能时不会触发空指针异常;" "3) 添加友好的降级提示。" ), target_module="src/agentkit/chat/skill_routing.py → _fallback_direct_chat", priority="P0", expected_impact="确保100%的请求都有回退处理,消除任务失败", effort="small", related_causes=["fallback_missing"], verification="运行回退场景回测,验证所有无匹配请求均成功", ) ) plans.append( ImprovementPlan( weakness_description=cause.cause_description, root_causes=causes_by_type["fallback_missing"], actions=actions, overall_strategy=( "短期:修复回退路径确保基本可用;" "中期:优化回退模式的回答质量;" "长期:基于用户反馈自动发现新技能需求" ), ) ) # --- Plan for overfit_pattern --- if "overfit_pattern" in causes_by_type: cause = causes_by_type["overfit_pattern"][0] actions = [] action_counter += 1 actions.append( ImprovementAction( action_id=f"ACT-{action_counter:03d}", title="添加意图描述和示例(intent.description + examples)", description=( "为每个技能添加 intent.description(自然语言描述)和 intent.examples(示例查询)," "使路由器能理解语义层面的意图,而不仅依赖关键词精确匹配。" ), target_module="configs/skills/*.yaml → intent.description / intent.examples", priority="P0", expected_impact="预计提升改写一致性 20~40%", effort="small", related_causes=["overfit_pattern", "keyword_gap"], verification="运行过拟合检测回测,验证改写一致性提升", ) ) action_counter += 1 actions.append( ImprovementAction( action_id=f"ACT-{action_counter:03d}", title="实现意图泛化测试CI", description=( "在CI中集成意图泛化回测:每次修改路由逻辑或技能配置后," "自动运行包含改写的回测用例,确保不引入新的过拟合。" ), target_module=".github/workflows/ + tests/e2e/", priority="P2", expected_impact="防止过拟合回归,持续监控泛化能力", effort="medium", related_causes=["overfit_pattern"], verification="CI流水线中自动运行回测并检查过拟合分数", ) ) plans.append( ImprovementPlan( weakness_description=cause.cause_description, root_causes=causes_by_type["overfit_pattern"], actions=actions, overall_strategy=( "短期:补充意图描述和示例;" "中期:引入语义匹配(同keyword_gap方案);" "长期:建立意图泛化CI防线" ), ) ) # --- Plan for quality_threshold --- if "quality_threshold" in causes_by_type: cause = causes_by_type["quality_threshold"][0] actions = [] action_counter += 1 actions.append( ImprovementAction( action_id=f"ACT-{action_counter:03d}", title="增强质量门控的技能匹配验证", description=( "在QualityGate中增加技能匹配验证:" "检查输出是否与路由到的技能的能力范围一致," "如果不一致则触发重试或降级。" ), target_module="src/agentkit/quality/gate.py", priority="P1", expected_impact="减少错误路由导致的低质量输出", effort="medium", related_causes=["quality_threshold"], verification="运行质量门控回测,验证错误路由拦截率", ) ) plans.append( ImprovementPlan( weakness_description=cause.cause_description, root_causes=causes_by_type["quality_threshold"], actions=actions, overall_strategy=( "短期:增加技能匹配验证;" "中期:引入输出质量评分模型;" "长期:实现自动质量回归检测" ), ) ) # --- Plan for config_error --- if "config_error" in causes_by_type: cause = causes_by_type["config_error"][0] actions = [] action_counter += 1 actions.append( ImprovementAction( action_id=f"ACT-{action_counter:03d}", title="修复服务端配置和异常处理", description=( "根据错误信息排查服务端配置问题:" "1) 检查API路由注册是否完整;" "2) 增加输入校验和错误提示;" "3) 确保所有异常都有友好的错误响应。" ), target_module="src/agentkit/server/routes/", priority="P0", expected_impact="消除服务端错误,提升任务成功率", effort="small", related_causes=["config_error"], verification="重新运行E2E回测,验证HTTP错误率降低", ) ) plans.append( ImprovementPlan( weakness_description=cause.cause_description, root_causes=causes_by_type["config_error"], actions=actions, overall_strategy=( "短期:修复已知配置错误;" "中期:增加输入校验和错误处理;" "长期:建立配置变更的自动化验证" ), ) ) return plans def analyze_alignment(self, observations: list[CapabilityObservation]) -> dict[str, Any]: """Analyze alignment guard and cascade detector metrics. Returns a dict with: - total_alignment_tests: number of observations in alignment category - violation_count: total constraint violations - violation_rate: ratio of tests with at least one violation - cascade_alert_count: number of cascade alerts triggered - cascade_alert_rate: ratio of tests that triggered cascade - neg_constraint_pass_rate: pass rate for negative constraints - pos_constraint_pass_rate: pass rate for positive constraints """ alignment_obs = [o for o in observations if o.category == "alignment"] if not alignment_obs: return { "total_alignment_tests": 0, "violation_count": 0, "violation_rate": 0.0, "cascade_alert_count": 0, "cascade_alert_rate": 0.0, "neg_constraint_pass_rate": 0.0, "pos_constraint_pass_rate": 0.0, } total = len(alignment_obs) with_violations = sum(1 for o in alignment_obs if o.alignment_violations > 0) total_violations = sum(o.alignment_violations for o in alignment_obs) with_cascade = sum(1 for o in alignment_obs if o.cascade_alert) # Separate by subcategory for neg/pos constraint pass rates neg_obs = [o for o in alignment_obs if o.subcategory == "negative_constraint"] pos_obs = [o for o in alignment_obs if o.subcategory == "positive_constraint"] neg_pass_rate = self._safe_div( sum(1 for o in neg_obs if o.alignment_violations == 0), len(neg_obs), ) pos_pass_rate = self._safe_div( sum(1 for o in pos_obs if o.alignment_violations == 0), len(pos_obs), ) return { "total_alignment_tests": total, "violation_count": total_violations, "violation_rate": round(self._safe_div(with_violations, total), 4), "cascade_alert_count": with_cascade, "cascade_alert_rate": round(self._safe_div(with_cascade, total), 4), "neg_constraint_pass_rate": round(neg_pass_rate, 4), "pos_constraint_pass_rate": round(pos_pass_rate, 4), } def generate_report(self, collector: MetricsCollector) -> CapabilityReport: """Generate a full capability analysis report from collected observations.""" observations = collector.observations originals = collector.get_original_observations() # Compute overall metrics total = len(originals) if total > 0: tp = sum(1 for o in originals if o.skill_correct is True) fp = sum( 1 for o in originals if o.skill_correct is False and o.actual_skill is not None ) fn = sum( 1 for o in originals if o.skill_correct is False and o.expected_skill is not None ) overall_precision, overall_recall, overall_f1 = self.compute_prf(tp, fp, fn) exec_correct = sum(1 for o in originals if o.execution_mode_correct is True) overall_exec_accuracy = self._safe_div(exec_correct, total) task_success = sum(1 for o in originals if o.task_succeeded) overall_success_rate = self._safe_div(task_success, total) else: overall_precision = overall_recall = overall_f1 = 0.0 overall_exec_accuracy = overall_success_rate = 0.0 # Compute per-category metrics categories: set[tuple[str, str]] = {(o.category, o.subcategory) for o in originals} category_metrics = [ self.analyze_category(observations, cat, subcat) for cat, subcat in sorted(categories) ] # Detect overfitting overfitting_results, overfitting_score = self.detect_overfitting(observations) # Identify weaknesses weaknesses = self.identify_weaknesses(category_metrics, overfitting_results) # Root cause analysis root_causes = self.analyze_root_causes( observations, category_metrics, overfitting_results, weaknesses ) # Improvement strategy planning improvement_plans = self.plan_improvements(weaknesses, root_causes) return CapabilityReport( generated_at=datetime.now(timezone.utc).isoformat(), total_observations=len(observations), overall_skill_recall=round(overall_recall, 4), overall_skill_precision=round(overall_precision, 4), overall_skill_f1=round(overall_f1, 4), overall_execution_mode_accuracy=round(overall_exec_accuracy, 4), overall_task_success_rate=round(overall_success_rate, 4), category_metrics=category_metrics, overfitting_results=overfitting_results, overfitting_score=overfitting_score, weaknesses=weaknesses, root_causes=root_causes, improvement_plans=improvement_plans, raw_observations=observations, ) # ═══════════════════════════════════════════════════════════════════════════ # 4. Metrics Reporter # ═══════════════════════════════════════════════════════════════════════════ class MetricsReporter: """Generate human-readable and machine-readable reports.""" @staticmethod def to_json(report: CapabilityReport, path: str) -> None: """Save report as JSON.""" with open(path, "w", encoding="utf-8") as f: json.dump(report.model_dump(), f, ensure_ascii=False, indent=2) @staticmethod def to_text(report: CapabilityReport) -> str: """Generate plain-text summary report in Chinese.""" lines: list[str] = [] lines.append("=" * 72) lines.append(" AgentKit 智能化能力分析报告") lines.append(f" 生成时间: {report.generated_at}") lines.append("=" * 72) lines.append("") # Overall metrics lines.append("── 总体指标 ──────────────────────────────────────────────") lines.append(f" 观测总数: {report.total_observations}") lines.append(f" 技能路由召回率: {report.overall_skill_recall:.2%}") lines.append(f" 技能路由精确率: {report.overall_skill_precision:.2%}") lines.append(f" 技能路由F1: {report.overall_skill_f1:.2%}") lines.append(f" 执行模式准确率: {report.overall_execution_mode_accuracy:.2%}") lines.append(f" 任务成功率: {report.overall_task_success_rate:.2%}") lines.append(f" 过拟合分数: {report.overfitting_score:.2%}") lines.append("") # Per-category breakdown lines.append("── 分类明细 ──────────────────────────────────────────────") for cm in report.category_metrics: cat_label = { "routing": "路由", "execution": "执行", "quality": "质量", "team": "团队", "consistency": "一致性", }.get(cm.category, cm.category) subcat_label = { "keyword_match": "关键词匹配", "explicit_prefix": "显式前缀", "greeting": "问候语", "identity": "身份识别", "disambiguation": "歧义消解", "fallback": "回退处理", "complexity_low": "低复杂度", "complexity_high": "高复杂度", "intent_variant": "意图变体", "direct_mode": "直接模式", "react_mode": "ReAct模式", "quality_gate": "质量门控", "output_std": "输出标准化", "explicit_team": "显式团队", "deterministic": "确定性", "overfitting": "过拟合", }.get(cm.subcategory, cm.subcategory) lines.append(f" [{cat_label}/{subcat_label}]") lines.append( f" 样本数={cm.total} 召回率={cm.skill_recall:.2%} " f"精确率={cm.skill_precision:.2%} F1={cm.skill_f1:.2%}" ) lines.append( f" 执行模式准确率={cm.execution_mode_accuracy:.2%} " f"成功率={cm.task_success_rate:.2%} " f"平均耗时={cm.avg_response_time_ms:.0f}ms" ) lines.append("") # Overfitting analysis if report.overfitting_results: lines.append("── 过拟合分析 ────────────────────────────────────────────") for r in report.overfitting_results: status = "⚠ 过拟合" if r.is_overfitted else "✓ 正常" orig_label = "✓" if r.original_correct else "✗" lines.append( f" [{status}] {r.benchmark_id}: " f"原始输入={orig_label}, " f"改写一致性={r.consistency_rate:.0%}" ) lines.append("") # Semantic router analysis semantic_cats = [cm for cm in report.category_metrics if cm.category == "semantic_router"] if semantic_cats: lines.append("── 语义路由分析 ──────────────────────────────────────────") for cm in semantic_cats: lines.append( f" [{cm.subcategory}] 样本数={cm.total} " f"精确率={cm.skill_precision:.2%} F1={cm.skill_f1:.2%}" ) lines.append("") # Team routing analysis team_cats = [cm for cm in report.category_metrics if cm.category == "team"] if team_cats: lines.append("── 团队路由分析 ──────────────────────────────────────────") for cm in team_cats: lines.append( f" [{cm.subcategory}] 样本数={cm.total} " f"成功率={cm.task_success_rate:.2%} " f"执行模式准确率={cm.execution_mode_accuracy:.2%}" ) lines.append("") # Alignment guard analysis alignment_obs = [o for o in report.raw_observations if o.category == "alignment"] if alignment_obs: analyzer = MetricsAnalyzer() alignment_metrics = analyzer.analyze_alignment(report.raw_observations) lines.append("── 对齐守卫分析 ──────────────────────────────────────────") lines.append(f" 测试总数: {alignment_metrics['total_alignment_tests']}") lines.append(f" 约束违规总数: {alignment_metrics['violation_count']}") lines.append(f" 违规率: {alignment_metrics['violation_rate']:.2%}") lines.append( f" 否定约束通过率: {alignment_metrics['neg_constraint_pass_rate']:.2%}" ) lines.append( f" 肯定约束通过率: {alignment_metrics['pos_constraint_pass_rate']:.2%}" ) lines.append(f" 级联告警次数: {alignment_metrics['cascade_alert_count']}") lines.append(f" 级联告警率: {alignment_metrics['cascade_alert_rate']:.2%}") lines.append("") # Weakness analysis if report.weaknesses: lines.append("── 智能化短板识别 ────────────────────────────────────────") for w in report.weaknesses: icon = {"critical": "🔴", "high": "🟠", "medium": "🟡", "low": "🟢"}.get( w.severity, "⚪" ) severity_label = { "critical": "严重", "high": "高", "medium": "中", "low": "低", }.get(w.severity, w.severity) lines.append(f" {icon} [{severity_label}] {w.description}") lines.append(f" 证据: {w.evidence}") lines.append(f" 建议: {w.suggestion}") lines.append("") else: lines.append("── 未检测到显著短板 ────────────────────────────────────") lines.append("") # Root cause analysis if report.root_causes: lines.append("── 根因分析 ──────────────────────────────────────────────") cause_type_labels = { "keyword_gap": "关键词覆盖不足", "intent_ambiguous": "意图歧义", "complexity_misjudge": "复杂度估算偏差", "fallback_missing": "回退机制不足", "overfit_pattern": "路由过拟合", "quality_threshold": "质量门控阈值过低", "config_error": "配置/服务端错误", "tool_missing": "工具缺失", } for rc in report.root_causes: type_label = cause_type_labels.get(rc.cause_type, rc.cause_type) conf_bar = "█" * int(rc.confidence * 10) + "░" * (10 - int(rc.confidence * 10)) lines.append(f" ▸ [{type_label}] 置信度: {conf_bar} {rc.confidence:.0%}") lines.append(f" 原因: {rc.cause_description}") if rc.detail: lines.append(f" 详情: {rc.detail}") if rc.affected_cases: lines.append( f" 受影响用例: {', '.join(rc.affected_cases[:5])}" f"{'...' if len(rc.affected_cases) > 5 else ''}" ) lines.append("") # Improvement strategy if report.improvement_plans: lines.append("── 改进策略规划 ──────────────────────────────────────────") for i, plan in enumerate(report.improvement_plans, 1): lines.append(f" ┌─ 策略 {i}: {plan.weakness_description}") lines.append(f" │ 总体策略: {plan.overall_strategy}") lines.append(" │") for action in plan.actions: priority_icon = {"P0": "🔴", "P1": "🟠", "P2": "🟡", "P3": "🟢"}.get( action.priority, "⚪" ) effort_label = {"small": "小", "medium": "中", "large": "大"}.get( action.effort, action.effort ) lines.append(f" │ {priority_icon} [{action.priority}] {action.title}") lines.append(f" │ 目标模块: {action.target_module}") lines.append(f" │ 具体操作: {action.description}") lines.append(f" │ 预期影响: {action.expected_impact}") lines.append(f" │ 工作量: {effort_label}") lines.append(f" │ 验证方式: {action.verification}") lines.append(" │") lines.append(f" └{'─' * 60}") lines.append("") # L3 Output Quality Evaluation if report.output_quality_evaluations: lines.append("── L3 输出质量评估 ──────────────────────────────────────────") evaluated = [e for e in report.output_quality_evaluations if e.evaluated] if evaluated: avg_score = sum(e.quality_score for e in evaluated) / len(evaluated) lines.append(f" 评估样本数: {len(evaluated)}") lines.append(f" 平均质量评分: {avg_score:.2f}/5.0") score_dist = {1: 0, 2: 0, 3: 0, 4: 0, 5: 0} for e in evaluated: bucket = max(1, min(5, int(e.quality_score))) score_dist[bucket] += 1 lines.append(f" 评分分布: 1分:{score_dist[1]} 2分:{score_dist[2]} 3分:{score_dist[3]} 4分:{score_dist[4]} 5分:{score_dist[5]}") # Show some examples lines.append("") lines.append(" 样例:") for e in evaluated[:5]: lines.append(f" [{e.benchmark_id}] 评分={e.quality_score:.0f} 期望={e.expected_skill} 实际={e.actual_skill}") if e.reasoning: lines.append(f" 理由: {e.reasoning}") else: lines.append(" 无有效评估结果") lines.append("") # L5 Adaptive Capability (reuse overfitting consistency data) if report.overfitting_results: lines.append("── L5 自适应能力 ──────────────────────────────────────────") consistency_rates = [r.consistency_rate for r in report.overfitting_results] if consistency_rates: avg_consistency = sum(consistency_rates) / len(consistency_rates) lines.append(f" 测试组数: {len(consistency_rates)}") lines.append(f" 平均自适应率: {avg_consistency:.2%}") high_adapt = sum(1 for r in consistency_rates if r >= 0.8) lines.append(f" 高自适应(>=80%): {high_adapt}/{len(consistency_rates)}") lines.append("") lines.append("=" * 72) return "\n".join(lines) @staticmethod def save_report(report: CapabilityReport, output_dir: str) -> dict[str, str]: """Save both JSON and text reports. Returns paths to saved files.""" os.makedirs(output_dir, exist_ok=True) json_path = os.path.join(output_dir, "capability_report.json") text_path = os.path.join(output_dir, "capability_report.txt") MetricsReporter.to_json(report, json_path) with open(text_path, "w", encoding="utf-8") as f: f.write(MetricsReporter.to_text(report)) return {"json": json_path, "text": text_path}