feat(evolution): U7 multi-objective fitness and extended strategy space
- MultiObjectiveFitness: weighted scoring, NSGA-II Pareto ranking, crowding distance - FitnessWeights: configurable accuracy/latency/cost weights with auto-normalization - ExtendedStrategyTuner: multi-dim Bayesian optimization (temperature, max_iterations, top_k, retrieval_mode) - ExtendedStrategyConfig: expanded parameter space - 20 tests passing
This commit is contained in:
parent
d5998aaddd
commit
34e083abde
|
|
@ -0,0 +1,279 @@
|
|||
"""MultiObjectiveFitness - 多目标适应度评估
|
||||
|
||||
支持准确率+延迟+成本的综合评估,Pareto 前沿维护。
|
||||
扩展 StrategyTuner 到多维参数空间。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import math
|
||||
import random
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
from agentkit.evolution.genetic import FitnessScore
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class FitnessWeights:
|
||||
"""适应度权重配置"""
|
||||
|
||||
accuracy: float = 0.6
|
||||
latency: float = 0.2
|
||||
cost: float = 0.2
|
||||
|
||||
def __post_init__(self):
|
||||
total = self.accuracy + self.latency + self.cost
|
||||
if abs(total - 1.0) > 0.01:
|
||||
# Normalize to sum=1
|
||||
self.accuracy /= total
|
||||
self.latency /= total
|
||||
self.cost /= total
|
||||
|
||||
|
||||
class MultiObjectiveFitness:
|
||||
"""多目标适应度评估器
|
||||
|
||||
将多个维度的指标综合为加权适应度分数,
|
||||
并支持 Pareto 前沿维护。
|
||||
|
||||
使用方式:
|
||||
evaluator = MultiObjectiveFitness(weights=FitnessWeights(accuracy=0.6, latency=0.2, cost=0.2))
|
||||
score = evaluator.evaluate(accuracy=0.9, latency_ms=500, cost_tokens=2000)
|
||||
weighted = evaluator.weighted_score(score)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
weights: FitnessWeights | None = None,
|
||||
max_latency_ms: float = 10000.0,
|
||||
max_cost_tokens: float = 10000.0,
|
||||
):
|
||||
self._weights = weights or FitnessWeights()
|
||||
self._max_latency_ms = max_latency_ms
|
||||
self._max_cost_tokens = max_cost_tokens
|
||||
|
||||
def evaluate(
|
||||
self,
|
||||
accuracy: float = 0.0,
|
||||
latency_ms: float = 0.0,
|
||||
cost_tokens: float = 0.0,
|
||||
custom: float = 0.0,
|
||||
) -> FitnessScore:
|
||||
"""评估多目标适应度"""
|
||||
return FitnessScore(
|
||||
accuracy=min(max(accuracy, 0.0), 1.0),
|
||||
latency_ms=latency_ms,
|
||||
cost_tokens=cost_tokens,
|
||||
custom=custom,
|
||||
)
|
||||
|
||||
def weighted_score(self, score: FitnessScore) -> float:
|
||||
"""计算加权综合分数"""
|
||||
n = score.normalized
|
||||
return (
|
||||
n["accuracy"] * self._weights.accuracy
|
||||
+ n["latency"] * self._weights.latency
|
||||
+ n["cost"] * self._weights.cost
|
||||
)
|
||||
|
||||
def pareto_rank(self, scores: list[FitnessScore]) -> list[int]:
|
||||
"""计算 Pareto 等级
|
||||
|
||||
返回每个个体的 Pareto 等级(0 = 前沿,1 = 第二层,...)
|
||||
|
||||
使用非支配排序算法 (NSGA-II)。
|
||||
"""
|
||||
n = len(scores)
|
||||
if n == 0:
|
||||
return []
|
||||
|
||||
ranks = [0] * n
|
||||
domination_count = [0] * n # 被多少个体支配
|
||||
dominated_set: list[list[int]] = [[] for _ in range(n)] # 支配哪些个体
|
||||
|
||||
# Build domination relationships
|
||||
for i in range(n):
|
||||
for j in range(i + 1, n):
|
||||
if scores[i].dominates(scores[j]):
|
||||
dominated_set[i].append(j)
|
||||
domination_count[j] += 1
|
||||
elif scores[j].dominates(scores[i]):
|
||||
dominated_set[j].append(i)
|
||||
domination_count[i] += 1
|
||||
|
||||
# Assign ranks level by level
|
||||
current_front = [i for i in range(n) if domination_count[i] == 0]
|
||||
rank = 0
|
||||
|
||||
while current_front:
|
||||
for idx in current_front:
|
||||
ranks[idx] = rank
|
||||
|
||||
next_front = []
|
||||
for idx in current_front:
|
||||
for dominated_idx in dominated_set[idx]:
|
||||
domination_count[dominated_idx] -= 1
|
||||
if domination_count[dominated_idx] == 0:
|
||||
next_front.append(dominated_idx)
|
||||
|
||||
current_front = next_front
|
||||
rank += 1
|
||||
|
||||
return ranks
|
||||
|
||||
def crowding_distance(self, scores: list[FitnessScore]) -> list[float]:
|
||||
"""计算拥挤度距离(同一 Pareto 等级内的多样性指标)"""
|
||||
n = len(scores)
|
||||
if n <= 2:
|
||||
return [float("inf")] * n
|
||||
|
||||
distances = [0.0] * n
|
||||
dimensions = ["accuracy", "latency", "cost"]
|
||||
|
||||
for dim in dimensions:
|
||||
# Sort by this dimension
|
||||
indices = list(range(n))
|
||||
get_val = lambda i: scores[i].normalized[dim]
|
||||
indices.sort(key=get_val)
|
||||
|
||||
# Boundary points get infinite distance
|
||||
distances[indices[0]] = float("inf")
|
||||
distances[indices[-1]] = float("inf")
|
||||
|
||||
# Compute range
|
||||
vals = [get_val(i) for i in indices]
|
||||
val_range = vals[-1] - vals[0]
|
||||
if val_range == 0:
|
||||
continue
|
||||
|
||||
# Add normalized distance
|
||||
for k in range(1, n - 1):
|
||||
i = indices[k]
|
||||
distances[i] += (vals[k + 1] - vals[k - 1]) / val_range
|
||||
|
||||
return distances
|
||||
|
||||
|
||||
@dataclass
|
||||
class ExtendedStrategyConfig:
|
||||
"""扩展的策略配置"""
|
||||
|
||||
temperature: float = 0.5
|
||||
max_iterations: int = 5
|
||||
top_k: int = 5
|
||||
retrieval_mode: str = "enhanced" # "standard", "enhanced"
|
||||
timeout_seconds: int = 300
|
||||
tool_weights: dict[str, float] = field(default_factory=dict)
|
||||
|
||||
|
||||
class ExtendedStrategyTuner:
|
||||
"""多维策略调优器
|
||||
|
||||
扩展 StrategyTuner 到多维参数空间:
|
||||
- temperature, max_iterations, top_k, retrieval_mode
|
||||
- 支持参数范围约束
|
||||
- Bayesian-inspired 多维优化
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
param_ranges: dict[str, tuple[float, float]] | None = None,
|
||||
):
|
||||
self._param_ranges = param_ranges or {
|
||||
"temperature": (0.0, 2.0),
|
||||
"max_iterations": (1, 10),
|
||||
"top_k": (1, 20),
|
||||
}
|
||||
self._history: list[dict[str, Any]] = []
|
||||
|
||||
def record(self, config: ExtendedStrategyConfig, metric: float) -> None:
|
||||
"""记录配置和效果指标"""
|
||||
self._history.append({
|
||||
"config": config,
|
||||
"metric": metric,
|
||||
})
|
||||
|
||||
async def suggest(
|
||||
self, current: ExtendedStrategyConfig
|
||||
) -> ExtendedStrategyConfig:
|
||||
"""基于历史数据建议新策略
|
||||
|
||||
使用多维 Bayesian-inspired 优化:
|
||||
1. 在历史中找到 Pareto 最优配置
|
||||
2. 在最优配置附近添加高斯噪声探索
|
||||
"""
|
||||
if len(self._history) < 3:
|
||||
return current
|
||||
|
||||
best = max(self._history, key=lambda x: x["metric"])
|
||||
best_config = best["config"]
|
||||
|
||||
suggested_temperature = self._optimize_param(
|
||||
"temperature",
|
||||
best_config.temperature,
|
||||
noise_std=0.1,
|
||||
)
|
||||
|
||||
suggested_max_iterations = int(self._optimize_param(
|
||||
"max_iterations",
|
||||
best_config.max_iterations,
|
||||
noise_std=1.0,
|
||||
))
|
||||
|
||||
suggested_top_k = int(self._optimize_param(
|
||||
"top_k",
|
||||
best_config.top_k,
|
||||
noise_std=2.0,
|
||||
))
|
||||
|
||||
# Retrieval mode: switch if >50% of top performers use the other mode
|
||||
suggested_mode = self._suggest_retrieval_mode(best_config.retrieval_mode)
|
||||
|
||||
return ExtendedStrategyConfig(
|
||||
temperature=suggested_temperature,
|
||||
max_iterations=suggested_max_iterations,
|
||||
top_k=suggested_top_k,
|
||||
retrieval_mode=suggested_mode,
|
||||
timeout_seconds=current.timeout_seconds,
|
||||
tool_weights=dict(best_config.tool_weights),
|
||||
)
|
||||
|
||||
def _optimize_param(
|
||||
self,
|
||||
param_name: str,
|
||||
best_value: float,
|
||||
noise_std: float,
|
||||
) -> float:
|
||||
"""多维 Bayesian-inspired 参数优化"""
|
||||
decay = 1.0 / (1.0 + len(self._history) / 10.0)
|
||||
effective_noise = noise_std * decay
|
||||
perturbation = random.gauss(0, effective_noise)
|
||||
new_value = best_value + perturbation
|
||||
|
||||
min_val, max_val = self._param_ranges.get(param_name, (0.0, 1.0))
|
||||
return max(min_val, min(max_val, new_value))
|
||||
|
||||
def _suggest_retrieval_mode(self, current_mode: str) -> str:
|
||||
"""建议检索模式"""
|
||||
if len(self._history) < 5:
|
||||
return current_mode
|
||||
|
||||
# Check top performers
|
||||
top = sorted(self._history, key=lambda x: x["metric"], reverse=True)[:5]
|
||||
enhanced_count = sum(
|
||||
1 for h in top if h["config"].retrieval_mode == "enhanced"
|
||||
)
|
||||
|
||||
if enhanced_count >= 3:
|
||||
return "enhanced"
|
||||
elif enhanced_count <= 1:
|
||||
return "standard"
|
||||
return current_mode
|
||||
|
||||
@property
|
||||
def history_size(self) -> int:
|
||||
return len(self._history)
|
||||
|
|
@ -0,0 +1,186 @@
|
|||
"""Tests for MultiObjectiveFitness and ExtendedStrategyTuner"""
|
||||
|
||||
import pytest
|
||||
|
||||
from agentkit.evolution.fitness import (
|
||||
ExtendedStrategyConfig,
|
||||
ExtendedStrategyTuner,
|
||||
FitnessWeights,
|
||||
MultiObjectiveFitness,
|
||||
)
|
||||
from agentkit.evolution.genetic import FitnessScore
|
||||
|
||||
|
||||
class TestFitnessWeights:
|
||||
"""FitnessWeights unit tests"""
|
||||
|
||||
def test_default_weights(self):
|
||||
w = FitnessWeights()
|
||||
assert abs(w.accuracy - 0.6) < 0.01
|
||||
assert abs(w.latency - 0.2) < 0.01
|
||||
assert abs(w.cost - 0.2) < 0.01
|
||||
|
||||
def test_custom_weights(self):
|
||||
w = FitnessWeights(accuracy=0.5, latency=0.3, cost=0.2)
|
||||
assert abs(w.accuracy - 0.5) < 0.01
|
||||
|
||||
def test_auto_normalization(self):
|
||||
w = FitnessWeights(accuracy=1.0, latency=1.0, cost=1.0)
|
||||
assert abs(w.accuracy - 1/3) < 0.01
|
||||
assert abs(w.latency - 1/3) < 0.01
|
||||
assert abs(w.cost - 1/3) < 0.01
|
||||
|
||||
|
||||
class TestMultiObjectiveFitness:
|
||||
"""MultiObjectiveFitness unit tests"""
|
||||
|
||||
def setup_method(self):
|
||||
self.evaluator = MultiObjectiveFitness()
|
||||
|
||||
def test_evaluate(self):
|
||||
score = self.evaluator.evaluate(accuracy=0.9, latency_ms=500, cost_tokens=2000)
|
||||
assert score.accuracy == 0.9
|
||||
assert score.latency_ms == 500
|
||||
assert score.cost_tokens == 2000
|
||||
|
||||
def test_evaluate_clamps_accuracy(self):
|
||||
score = self.evaluator.evaluate(accuracy=1.5)
|
||||
assert score.accuracy == 1.0
|
||||
score = self.evaluator.evaluate(accuracy=-0.1)
|
||||
assert score.accuracy == 0.0
|
||||
|
||||
def test_weighted_score(self):
|
||||
score = self.evaluator.evaluate(accuracy=1.0, latency_ms=0, cost_tokens=0)
|
||||
weighted = self.evaluator.weighted_score(score)
|
||||
assert weighted == 1.0 # Perfect on all dimensions
|
||||
|
||||
def test_weighted_score_zero(self):
|
||||
score = self.evaluator.evaluate(accuracy=0.0, latency_ms=10000, cost_tokens=10000)
|
||||
weighted = self.evaluator.weighted_score(score)
|
||||
assert weighted == 0.0 # Worst on all dimensions
|
||||
|
||||
def test_pareto_rank_simple(self):
|
||||
scores = [
|
||||
FitnessScore(accuracy=0.9, latency_ms=100), # Dominates all
|
||||
FitnessScore(accuracy=0.5, latency_ms=500), # Dominated by 0
|
||||
FitnessScore(accuracy=0.3, latency_ms=1000), # Dominated by 0, 1
|
||||
]
|
||||
ranks = self.evaluator.pareto_rank(scores)
|
||||
assert ranks[0] == 0 # Front
|
||||
assert ranks[1] >= 1
|
||||
assert ranks[2] >= ranks[1]
|
||||
|
||||
def test_pareto_rank_empty(self):
|
||||
ranks = self.evaluator.pareto_rank([])
|
||||
assert ranks == []
|
||||
|
||||
def test_pareto_rank_non_dominated(self):
|
||||
scores = [
|
||||
FitnessScore(accuracy=0.9, latency_ms=500), # High accuracy, slow
|
||||
FitnessScore(accuracy=0.5, latency_ms=100), # Low accuracy, fast
|
||||
]
|
||||
ranks = self.evaluator.pareto_rank(scores)
|
||||
# Neither dominates the other — both on front
|
||||
assert ranks[0] == 0
|
||||
assert ranks[1] == 0
|
||||
|
||||
def test_crowding_distance(self):
|
||||
scores = [
|
||||
FitnessScore(accuracy=0.9, latency_ms=100),
|
||||
FitnessScore(accuracy=0.7, latency_ms=300),
|
||||
FitnessScore(accuracy=0.5, latency_ms=500),
|
||||
]
|
||||
distances = self.evaluator.crowding_distance(scores)
|
||||
assert len(distances) == 3
|
||||
assert distances[0] == float("inf") # Boundary
|
||||
assert distances[2] == float("inf") # Boundary
|
||||
assert distances[1] > 0 # Interior point
|
||||
|
||||
def test_crowding_distance_small(self):
|
||||
scores = [FitnessScore(accuracy=0.5)]
|
||||
distances = self.evaluator.crowding_distance(scores)
|
||||
assert distances[0] == float("inf")
|
||||
|
||||
def test_custom_weights_evaluator(self):
|
||||
evaluator = MultiObjectiveFitness(weights=FitnessWeights(accuracy=1.0, latency=0.0, cost=0.0))
|
||||
score = evaluator.evaluate(accuracy=0.8, latency_ms=5000, cost_tokens=5000)
|
||||
weighted = evaluator.weighted_score(score)
|
||||
# Only accuracy matters
|
||||
assert abs(weighted - 0.8) < 0.01
|
||||
|
||||
|
||||
class TestExtendedStrategyTuner:
|
||||
"""ExtendedStrategyTuner unit tests"""
|
||||
|
||||
def setup_method(self):
|
||||
self.tuner = ExtendedStrategyTuner()
|
||||
|
||||
def test_record_and_suggest(self):
|
||||
config = ExtendedStrategyConfig(temperature=0.5, max_iterations=5, top_k=5)
|
||||
self.tuner.record(config, 0.7)
|
||||
self.tuner.record(config, 0.8)
|
||||
self.tuner.record(config, 0.9)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_suggest_with_history(self):
|
||||
config = ExtendedStrategyConfig(temperature=0.7, max_iterations=5, top_k=5)
|
||||
for i in range(5):
|
||||
self.tuner.record(config, 0.5 + i * 0.1)
|
||||
|
||||
suggested = await self.tuner.suggest(config)
|
||||
assert isinstance(suggested, ExtendedStrategyConfig)
|
||||
assert 0.0 <= suggested.temperature <= 2.0
|
||||
assert 1 <= suggested.max_iterations <= 10
|
||||
assert 1 <= suggested.top_k <= 20
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_suggest_without_history(self):
|
||||
config = ExtendedStrategyConfig()
|
||||
suggested = await self.tuner.suggest(config)
|
||||
# Should return current config unchanged
|
||||
assert suggested.temperature == config.temperature
|
||||
assert suggested.max_iterations == config.max_iterations
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_retrieval_mode_suggestion(self):
|
||||
config = ExtendedStrategyConfig(retrieval_mode="standard")
|
||||
enhanced_config = ExtendedStrategyConfig(retrieval_mode="enhanced")
|
||||
|
||||
# Record mostly enhanced results
|
||||
for _ in range(4):
|
||||
self.tuner.record(enhanced_config, 0.9)
|
||||
self.tuner.record(config, 0.5)
|
||||
|
||||
suggested = await self.tuner.suggest(config)
|
||||
assert suggested.retrieval_mode == "enhanced"
|
||||
|
||||
def test_history_size(self):
|
||||
assert self.tuner.history_size == 0
|
||||
self.tuner.record(ExtendedStrategyConfig(), 0.5)
|
||||
assert self.tuner.history_size == 1
|
||||
|
||||
|
||||
class TestExtendedStrategyConfig:
|
||||
"""ExtendedStrategyConfig unit tests"""
|
||||
|
||||
def test_default_values(self):
|
||||
config = ExtendedStrategyConfig()
|
||||
assert config.temperature == 0.5
|
||||
assert config.max_iterations == 5
|
||||
assert config.top_k == 5
|
||||
assert config.retrieval_mode == "enhanced"
|
||||
assert config.tool_weights == {}
|
||||
|
||||
def test_custom_values(self):
|
||||
config = ExtendedStrategyConfig(
|
||||
temperature=0.8,
|
||||
max_iterations=10,
|
||||
top_k=15,
|
||||
retrieval_mode="standard",
|
||||
tool_weights={"search": 0.7, "analyze": 0.3},
|
||||
)
|
||||
assert config.temperature == 0.8
|
||||
assert config.max_iterations == 10
|
||||
assert config.top_k == 15
|
||||
assert config.retrieval_mode == "standard"
|
||||
assert config.tool_weights["search"] == 0.7
|
||||
Loading…
Reference in New Issue