From 34e083abde96c2d0c8ec3b9976c85d7a27314a8a Mon Sep 17 00:00:00 2001 From: chiguyong Date: Sat, 6 Jun 2026 22:42:54 +0800 Subject: [PATCH] feat(evolution): U7 multi-objective fitness and extended strategy space - MultiObjectiveFitness: weighted scoring, NSGA-II Pareto ranking, crowding distance - FitnessWeights: configurable accuracy/latency/cost weights with auto-normalization - ExtendedStrategyTuner: multi-dim Bayesian optimization (temperature, max_iterations, top_k, retrieval_mode) - ExtendedStrategyConfig: expanded parameter space - 20 tests passing --- src/agentkit/evolution/fitness.py | 279 ++++++++++++++++++++++++++++++ tests/unit/test_fitness.py | 186 ++++++++++++++++++++ 2 files changed, 465 insertions(+) create mode 100644 src/agentkit/evolution/fitness.py create mode 100644 tests/unit/test_fitness.py diff --git a/src/agentkit/evolution/fitness.py b/src/agentkit/evolution/fitness.py new file mode 100644 index 0000000..a293003 --- /dev/null +++ b/src/agentkit/evolution/fitness.py @@ -0,0 +1,279 @@ +"""MultiObjectiveFitness - 多目标适应度评估 + +支持准确率+延迟+成本的综合评估,Pareto 前沿维护。 +扩展 StrategyTuner 到多维参数空间。 +""" + +from __future__ import annotations + +import logging +import math +import random +from dataclasses import dataclass, field +from typing import Any + +from agentkit.evolution.genetic import FitnessScore + +logger = logging.getLogger(__name__) + + +@dataclass +class FitnessWeights: + """适应度权重配置""" + + accuracy: float = 0.6 + latency: float = 0.2 + cost: float = 0.2 + + def __post_init__(self): + total = self.accuracy + self.latency + self.cost + if abs(total - 1.0) > 0.01: + # Normalize to sum=1 + self.accuracy /= total + self.latency /= total + self.cost /= total + + +class MultiObjectiveFitness: + """多目标适应度评估器 + + 将多个维度的指标综合为加权适应度分数, + 并支持 Pareto 前沿维护。 + + 使用方式: + evaluator = MultiObjectiveFitness(weights=FitnessWeights(accuracy=0.6, latency=0.2, cost=0.2)) + score = evaluator.evaluate(accuracy=0.9, latency_ms=500, cost_tokens=2000) + weighted = evaluator.weighted_score(score) + """ + + def __init__( + self, + weights: FitnessWeights | None = None, + max_latency_ms: float = 10000.0, + max_cost_tokens: float = 10000.0, + ): + self._weights = weights or FitnessWeights() + self._max_latency_ms = max_latency_ms + self._max_cost_tokens = max_cost_tokens + + def evaluate( + self, + accuracy: float = 0.0, + latency_ms: float = 0.0, + cost_tokens: float = 0.0, + custom: float = 0.0, + ) -> FitnessScore: + """评估多目标适应度""" + return FitnessScore( + accuracy=min(max(accuracy, 0.0), 1.0), + latency_ms=latency_ms, + cost_tokens=cost_tokens, + custom=custom, + ) + + def weighted_score(self, score: FitnessScore) -> float: + """计算加权综合分数""" + n = score.normalized + return ( + n["accuracy"] * self._weights.accuracy + + n["latency"] * self._weights.latency + + n["cost"] * self._weights.cost + ) + + def pareto_rank(self, scores: list[FitnessScore]) -> list[int]: + """计算 Pareto 等级 + + 返回每个个体的 Pareto 等级(0 = 前沿,1 = 第二层,...) + + 使用非支配排序算法 (NSGA-II)。 + """ + n = len(scores) + if n == 0: + return [] + + ranks = [0] * n + domination_count = [0] * n # 被多少个体支配 + dominated_set: list[list[int]] = [[] for _ in range(n)] # 支配哪些个体 + + # Build domination relationships + for i in range(n): + for j in range(i + 1, n): + if scores[i].dominates(scores[j]): + dominated_set[i].append(j) + domination_count[j] += 1 + elif scores[j].dominates(scores[i]): + dominated_set[j].append(i) + domination_count[i] += 1 + + # Assign ranks level by level + current_front = [i for i in range(n) if domination_count[i] == 0] + rank = 0 + + while current_front: + for idx in current_front: + ranks[idx] = rank + + next_front = [] + for idx in current_front: + for dominated_idx in dominated_set[idx]: + domination_count[dominated_idx] -= 1 + if domination_count[dominated_idx] == 0: + next_front.append(dominated_idx) + + current_front = next_front + rank += 1 + + return ranks + + def crowding_distance(self, scores: list[FitnessScore]) -> list[float]: + """计算拥挤度距离(同一 Pareto 等级内的多样性指标)""" + n = len(scores) + if n <= 2: + return [float("inf")] * n + + distances = [0.0] * n + dimensions = ["accuracy", "latency", "cost"] + + for dim in dimensions: + # Sort by this dimension + indices = list(range(n)) + get_val = lambda i: scores[i].normalized[dim] + indices.sort(key=get_val) + + # Boundary points get infinite distance + distances[indices[0]] = float("inf") + distances[indices[-1]] = float("inf") + + # Compute range + vals = [get_val(i) for i in indices] + val_range = vals[-1] - vals[0] + if val_range == 0: + continue + + # Add normalized distance + for k in range(1, n - 1): + i = indices[k] + distances[i] += (vals[k + 1] - vals[k - 1]) / val_range + + return distances + + +@dataclass +class ExtendedStrategyConfig: + """扩展的策略配置""" + + temperature: float = 0.5 + max_iterations: int = 5 + top_k: int = 5 + retrieval_mode: str = "enhanced" # "standard", "enhanced" + timeout_seconds: int = 300 + tool_weights: dict[str, float] = field(default_factory=dict) + + +class ExtendedStrategyTuner: + """多维策略调优器 + + 扩展 StrategyTuner 到多维参数空间: + - temperature, max_iterations, top_k, retrieval_mode + - 支持参数范围约束 + - Bayesian-inspired 多维优化 + """ + + def __init__( + self, + param_ranges: dict[str, tuple[float, float]] | None = None, + ): + self._param_ranges = param_ranges or { + "temperature": (0.0, 2.0), + "max_iterations": (1, 10), + "top_k": (1, 20), + } + self._history: list[dict[str, Any]] = [] + + def record(self, config: ExtendedStrategyConfig, metric: float) -> None: + """记录配置和效果指标""" + self._history.append({ + "config": config, + "metric": metric, + }) + + async def suggest( + self, current: ExtendedStrategyConfig + ) -> ExtendedStrategyConfig: + """基于历史数据建议新策略 + + 使用多维 Bayesian-inspired 优化: + 1. 在历史中找到 Pareto 最优配置 + 2. 在最优配置附近添加高斯噪声探索 + """ + if len(self._history) < 3: + return current + + best = max(self._history, key=lambda x: x["metric"]) + best_config = best["config"] + + suggested_temperature = self._optimize_param( + "temperature", + best_config.temperature, + noise_std=0.1, + ) + + suggested_max_iterations = int(self._optimize_param( + "max_iterations", + best_config.max_iterations, + noise_std=1.0, + )) + + suggested_top_k = int(self._optimize_param( + "top_k", + best_config.top_k, + noise_std=2.0, + )) + + # Retrieval mode: switch if >50% of top performers use the other mode + suggested_mode = self._suggest_retrieval_mode(best_config.retrieval_mode) + + return ExtendedStrategyConfig( + temperature=suggested_temperature, + max_iterations=suggested_max_iterations, + top_k=suggested_top_k, + retrieval_mode=suggested_mode, + timeout_seconds=current.timeout_seconds, + tool_weights=dict(best_config.tool_weights), + ) + + def _optimize_param( + self, + param_name: str, + best_value: float, + noise_std: float, + ) -> float: + """多维 Bayesian-inspired 参数优化""" + decay = 1.0 / (1.0 + len(self._history) / 10.0) + effective_noise = noise_std * decay + perturbation = random.gauss(0, effective_noise) + new_value = best_value + perturbation + + min_val, max_val = self._param_ranges.get(param_name, (0.0, 1.0)) + return max(min_val, min(max_val, new_value)) + + def _suggest_retrieval_mode(self, current_mode: str) -> str: + """建议检索模式""" + if len(self._history) < 5: + return current_mode + + # Check top performers + top = sorted(self._history, key=lambda x: x["metric"], reverse=True)[:5] + enhanced_count = sum( + 1 for h in top if h["config"].retrieval_mode == "enhanced" + ) + + if enhanced_count >= 3: + return "enhanced" + elif enhanced_count <= 1: + return "standard" + return current_mode + + @property + def history_size(self) -> int: + return len(self._history) diff --git a/tests/unit/test_fitness.py b/tests/unit/test_fitness.py new file mode 100644 index 0000000..14dd723 --- /dev/null +++ b/tests/unit/test_fitness.py @@ -0,0 +1,186 @@ +"""Tests for MultiObjectiveFitness and ExtendedStrategyTuner""" + +import pytest + +from agentkit.evolution.fitness import ( + ExtendedStrategyConfig, + ExtendedStrategyTuner, + FitnessWeights, + MultiObjectiveFitness, +) +from agentkit.evolution.genetic import FitnessScore + + +class TestFitnessWeights: + """FitnessWeights unit tests""" + + def test_default_weights(self): + w = FitnessWeights() + assert abs(w.accuracy - 0.6) < 0.01 + assert abs(w.latency - 0.2) < 0.01 + assert abs(w.cost - 0.2) < 0.01 + + def test_custom_weights(self): + w = FitnessWeights(accuracy=0.5, latency=0.3, cost=0.2) + assert abs(w.accuracy - 0.5) < 0.01 + + def test_auto_normalization(self): + w = FitnessWeights(accuracy=1.0, latency=1.0, cost=1.0) + assert abs(w.accuracy - 1/3) < 0.01 + assert abs(w.latency - 1/3) < 0.01 + assert abs(w.cost - 1/3) < 0.01 + + +class TestMultiObjectiveFitness: + """MultiObjectiveFitness unit tests""" + + def setup_method(self): + self.evaluator = MultiObjectiveFitness() + + def test_evaluate(self): + score = self.evaluator.evaluate(accuracy=0.9, latency_ms=500, cost_tokens=2000) + assert score.accuracy == 0.9 + assert score.latency_ms == 500 + assert score.cost_tokens == 2000 + + def test_evaluate_clamps_accuracy(self): + score = self.evaluator.evaluate(accuracy=1.5) + assert score.accuracy == 1.0 + score = self.evaluator.evaluate(accuracy=-0.1) + assert score.accuracy == 0.0 + + def test_weighted_score(self): + score = self.evaluator.evaluate(accuracy=1.0, latency_ms=0, cost_tokens=0) + weighted = self.evaluator.weighted_score(score) + assert weighted == 1.0 # Perfect on all dimensions + + def test_weighted_score_zero(self): + score = self.evaluator.evaluate(accuracy=0.0, latency_ms=10000, cost_tokens=10000) + weighted = self.evaluator.weighted_score(score) + assert weighted == 0.0 # Worst on all dimensions + + def test_pareto_rank_simple(self): + scores = [ + FitnessScore(accuracy=0.9, latency_ms=100), # Dominates all + FitnessScore(accuracy=0.5, latency_ms=500), # Dominated by 0 + FitnessScore(accuracy=0.3, latency_ms=1000), # Dominated by 0, 1 + ] + ranks = self.evaluator.pareto_rank(scores) + assert ranks[0] == 0 # Front + assert ranks[1] >= 1 + assert ranks[2] >= ranks[1] + + def test_pareto_rank_empty(self): + ranks = self.evaluator.pareto_rank([]) + assert ranks == [] + + def test_pareto_rank_non_dominated(self): + scores = [ + FitnessScore(accuracy=0.9, latency_ms=500), # High accuracy, slow + FitnessScore(accuracy=0.5, latency_ms=100), # Low accuracy, fast + ] + ranks = self.evaluator.pareto_rank(scores) + # Neither dominates the other — both on front + assert ranks[0] == 0 + assert ranks[1] == 0 + + def test_crowding_distance(self): + scores = [ + FitnessScore(accuracy=0.9, latency_ms=100), + FitnessScore(accuracy=0.7, latency_ms=300), + FitnessScore(accuracy=0.5, latency_ms=500), + ] + distances = self.evaluator.crowding_distance(scores) + assert len(distances) == 3 + assert distances[0] == float("inf") # Boundary + assert distances[2] == float("inf") # Boundary + assert distances[1] > 0 # Interior point + + def test_crowding_distance_small(self): + scores = [FitnessScore(accuracy=0.5)] + distances = self.evaluator.crowding_distance(scores) + assert distances[0] == float("inf") + + def test_custom_weights_evaluator(self): + evaluator = MultiObjectiveFitness(weights=FitnessWeights(accuracy=1.0, latency=0.0, cost=0.0)) + score = evaluator.evaluate(accuracy=0.8, latency_ms=5000, cost_tokens=5000) + weighted = evaluator.weighted_score(score) + # Only accuracy matters + assert abs(weighted - 0.8) < 0.01 + + +class TestExtendedStrategyTuner: + """ExtendedStrategyTuner unit tests""" + + def setup_method(self): + self.tuner = ExtendedStrategyTuner() + + def test_record_and_suggest(self): + config = ExtendedStrategyConfig(temperature=0.5, max_iterations=5, top_k=5) + self.tuner.record(config, 0.7) + self.tuner.record(config, 0.8) + self.tuner.record(config, 0.9) + + @pytest.mark.asyncio + async def test_suggest_with_history(self): + config = ExtendedStrategyConfig(temperature=0.7, max_iterations=5, top_k=5) + for i in range(5): + self.tuner.record(config, 0.5 + i * 0.1) + + suggested = await self.tuner.suggest(config) + assert isinstance(suggested, ExtendedStrategyConfig) + assert 0.0 <= suggested.temperature <= 2.0 + assert 1 <= suggested.max_iterations <= 10 + assert 1 <= suggested.top_k <= 20 + + @pytest.mark.asyncio + async def test_suggest_without_history(self): + config = ExtendedStrategyConfig() + suggested = await self.tuner.suggest(config) + # Should return current config unchanged + assert suggested.temperature == config.temperature + assert suggested.max_iterations == config.max_iterations + + @pytest.mark.asyncio + async def test_retrieval_mode_suggestion(self): + config = ExtendedStrategyConfig(retrieval_mode="standard") + enhanced_config = ExtendedStrategyConfig(retrieval_mode="enhanced") + + # Record mostly enhanced results + for _ in range(4): + self.tuner.record(enhanced_config, 0.9) + self.tuner.record(config, 0.5) + + suggested = await self.tuner.suggest(config) + assert suggested.retrieval_mode == "enhanced" + + def test_history_size(self): + assert self.tuner.history_size == 0 + self.tuner.record(ExtendedStrategyConfig(), 0.5) + assert self.tuner.history_size == 1 + + +class TestExtendedStrategyConfig: + """ExtendedStrategyConfig unit tests""" + + def test_default_values(self): + config = ExtendedStrategyConfig() + assert config.temperature == 0.5 + assert config.max_iterations == 5 + assert config.top_k == 5 + assert config.retrieval_mode == "enhanced" + assert config.tool_weights == {} + + def test_custom_values(self): + config = ExtendedStrategyConfig( + temperature=0.8, + max_iterations=10, + top_k=15, + retrieval_mode="standard", + tool_weights={"search": 0.7, "analyze": 0.3}, + ) + assert config.temperature == 0.8 + assert config.max_iterations == 10 + assert config.top_k == 15 + assert config.retrieval_mode == "standard" + assert config.tool_weights["search"] == 0.7