feat(evolution): U6 GEPA genetic algorithm evolution framework

- PromptChromosome: instructions + demos + constraints gene segments
- CrossoverOperator: paragraph-level text, demo, constraint crossover
- MutationOperator: LLM-driven instruction mutation + demo/constraint mutation
- GEPAPopulation: tournament selection, elite preservation, Pareto front
- FitnessScore: multi-objective (accuracy, latency, cost) with Pareto dominance
- 29 tests passing
This commit is contained in:
chiguyong 2026-06-06 22:38:55 +08:00
parent 1390bd8d6e
commit d5998aaddd
2 changed files with 833 additions and 0 deletions

View File

@ -0,0 +1,529 @@
"""GEPA - Genetic-Pareto Prompt Evolution
基于遗传算法的 Prompt 进化框架支持
- 种群管理Population
- 交叉算子Crossover
- 变异算子Mutation
- Pareto 多目标选择
- 精英保留Elitism
- 代际进化
参考GEPA: Reflective Prompt Evolution Can Outperform Reinforcement Learning (2025)
"""
from __future__ import annotations
import copy
import logging
import random
import uuid
from dataclasses import dataclass, field
from typing import Any
from agentkit.evolution.prompt_optimizer import Module, Signature
logger = logging.getLogger(__name__)
@dataclass
class FitnessScore:
"""多目标适应度评分"""
accuracy: float = 0.0 # 0-1, 任务成功率
latency_ms: float = 0.0 # 越低越好
cost_tokens: float = 0.0 # 越低越好
custom: float = 0.0 # 自定义指标
@property
def normalized(self) -> dict[str, float]:
"""归一化到 [0, 1]latency 和 cost 越低越好所以取反"""
return {
"accuracy": self.accuracy,
"latency": 1.0 - min(self.latency_ms / 10000.0, 1.0), # 10s 为上限
"cost": 1.0 - min(self.cost_tokens / 10000.0, 1.0), # 10k tokens 为上限
"custom": self.custom,
}
def dominates(self, other: FitnessScore) -> bool:
"""Pareto 支配判断self 在所有维度 >= other 且至少一个维度 > other"""
n_self = self.normalized
n_other = other.normalized
all_geq = all(v >= n_other[k] for k, v in n_self.items())
any_gt = any(v > n_other[k] for k, v in n_self.items())
return all_geq and any_gt
@dataclass
class PromptChromosome:
"""Prompt 染色体 — 一个完整的 Prompt 变体
由三段可独立进化的基因组成
- instructions: 指令段
- demos: few-shot 示例
- constraints: 约束条件
"""
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
instructions: str = ""
demos: list[dict[str, Any]] = field(default_factory=list)
constraints: list[str] = field(default_factory=list)
fitness: FitnessScore = field(default_factory=FitnessScore)
generation: int = 0
parent_ids: list[str] = field(default_factory=list)
def to_module(self, name: str = "") -> Module:
"""转换为 Module 格式"""
return Module(
name=name or f"chromosome_{self.id}",
signature=Signature(
input_fields={},
output_fields={},
instruction=self.instructions,
),
demos=self.demos,
)
@classmethod
def from_module(cls, module: Module) -> PromptChromosome:
"""从 Module 创建染色体"""
# Extract constraints from instruction (lines starting with -)
constraints = []
instructions_lines = []
if module.signature.instruction:
for line in module.signature.instruction.split("\n"):
stripped = line.strip()
if stripped.startswith("- ") and any(
kw in stripped.lower()
for kw in ["must", "should", "never", "avoid", "do not", "always"]
):
constraints.append(stripped[2:])
else:
instructions_lines.append(line)
return cls(
instructions="\n".join(instructions_lines),
demos=list(module.demos),
constraints=constraints,
)
class CrossoverOperator:
"""交叉算子
从两个父代 Prompt 生成子代支持
- instructions 交叉交换指令段落
- demos 交叉交换 few-shot 示例
- constraints 交叉交换约束条件
"""
def crossover(
self,
parent_a: PromptChromosome,
parent_b: PromptChromosome,
crossover_rate: float = 0.5,
) -> PromptChromosome:
"""执行交叉操作
Args:
parent_a: 父代 A
parent_b: 父代 B
crossover_rate: 每个基因段的交叉概率
Returns:
子代染色体
"""
child_instructions = self._crossover_text(
parent_a.instructions, parent_b.instructions, crossover_rate
)
child_demos = self._crossover_demos(
parent_a.demos, parent_b.demos, crossover_rate
)
child_constraints = self._crossover_constraints(
parent_a.constraints, parent_b.constraints, crossover_rate
)
return PromptChromosome(
instructions=child_instructions,
demos=child_demos,
constraints=child_constraints,
generation=max(parent_a.generation, parent_b.generation) + 1,
parent_ids=[parent_a.id, parent_b.id],
)
def _crossover_text(
self, text_a: str, text_b: str, rate: float
) -> str:
"""文本段落交叉:按段落交换"""
if not text_a or not text_b:
return text_a if random.random() < 0.5 else text_b
paragraphs_a = [p.strip() for p in text_a.split("\n\n") if p.strip()]
paragraphs_b = [p.strip() for p in text_b.split("\n\n") if p.strip()]
if not paragraphs_a or not paragraphs_b:
return text_a if random.random() < 0.5 else text_b
# Interleave paragraphs from both parents
result = []
max_len = max(len(paragraphs_a), len(paragraphs_b))
for i in range(max_len):
if random.random() < rate:
# Take from B
if i < len(paragraphs_b):
result.append(paragraphs_b[i])
elif i < len(paragraphs_a):
result.append(paragraphs_a[i])
else:
# Take from A
if i < len(paragraphs_a):
result.append(paragraphs_a[i])
elif i < len(paragraphs_b):
result.append(paragraphs_b[i])
return "\n\n".join(result)
def _crossover_demos(
self,
demos_a: list[dict],
demos_b: list[dict],
rate: float,
) -> list[dict]:
"""Demo 交叉:混合两个父代的示例"""
if not demos_a:
return list(demos_b) if random.random() < 0.5 else []
if not demos_b:
return list(demos_a) if random.random() < 0.5 else []
# Take some from each parent
result = []
used_inputs: set[str] = set()
for demo in demos_a + demos_b:
demo_key = str(demo.get("input", ""))[:50]
if demo_key not in used_inputs and random.random() < (1 - rate):
result.append(copy.deepcopy(demo))
used_inputs.add(demo_key)
return result[:5] # Limit to 5 demos
def _crossover_constraints(
self,
constraints_a: list[str],
constraints_b: list[str],
rate: float,
) -> list[str]:
"""约束交叉:合并两个父代的约束"""
all_constraints = set(constraints_a) | set(constraints_b)
result = []
for c in all_constraints:
if random.random() < (1 - rate * 0.5):
result.append(c)
return result
class MutationOperator:
"""变异算子
基于 LLM 反思的结构化变异
- 指令变异LLM 重写指令段落
- Demo 变异替换/重排 few-shot 示例
- 约束变异增删约束条件
"""
def __init__(self, llm_gateway: Any = None):
self._llm_gateway = llm_gateway
async def mutate(
self,
chromosome: PromptChromosome,
mutation_rate: float = 0.3,
) -> PromptChromosome:
"""执行变异操作
Args:
chromosome: 待变异的染色体
mutation_rate: 变异概率
Returns:
变异后的新染色体
"""
new_instructions = chromosome.instructions
new_demos = list(chromosome.demos)
new_constraints = list(chromosome.constraints)
# Instructions mutation
if random.random() < mutation_rate:
new_instructions = await self._mutate_instructions(
chromosome.instructions
)
# Demo mutation
if random.random() < mutation_rate and new_demos:
new_demos = self._mutate_demos(new_demos)
# Constraint mutation
if random.random() < mutation_rate:
new_constraints = self._mutate_constraints(new_constraints)
return PromptChromosome(
instructions=new_instructions,
demos=new_demos,
constraints=new_constraints,
generation=chromosome.generation,
parent_ids=[chromosome.id],
)
async def _mutate_instructions(self, instructions: str) -> str:
"""指令变异"""
if self._llm_gateway:
try:
response = await self._llm_gateway.chat(
messages=[
{
"role": "system",
"content": (
"You are a prompt mutation assistant. Slightly modify the "
"given instruction to improve clarity and effectiveness. "
"Keep the core intent unchanged. Output ONLY the modified instruction."
),
},
{"role": "user", "content": instructions},
],
model="default",
)
return response.content.strip() or instructions
except Exception as e:
logger.warning(f"LLM instruction mutation failed: {e}")
# Fallback: simple text mutation (shuffle paragraphs)
paragraphs = [p.strip() for p in instructions.split("\n\n") if p.strip()]
if len(paragraphs) > 1:
random.shuffle(paragraphs)
return "\n\n".join(paragraphs)
def _mutate_demos(self, demos: list[dict]) -> list[dict]:
"""Demo 变异:重排或随机删除一个"""
mutated = list(demos)
if random.random() < 0.5 and len(mutated) > 1:
# Shuffle
random.shuffle(mutated)
elif len(mutated) > 2:
# Remove a random demo
idx = random.randint(0, len(mutated) - 1)
mutated.pop(idx)
return mutated
def _mutate_constraints(self, constraints: list[str]) -> list[str]:
"""约束变异:随机增删约束"""
mutated = list(constraints)
if random.random() < 0.5 and mutated:
# Remove a random constraint
idx = random.randint(0, len(mutated) - 1)
mutated.pop(idx)
else:
# Add a generic constraint
generic_constraints = [
"Always verify the output before responding",
"Keep responses concise and focused",
"Prioritize accuracy over completeness",
"Consider edge cases in your analysis",
]
new_constraint = random.choice(generic_constraints)
if new_constraint not in mutated:
mutated.append(new_constraint)
return mutated
class GEPAPopulation:
"""GEPA 种群管理
维护一组 PromptChromosome支持
- 初始化从种子 Prompt 或随机生成
- 添加/淘汰个体
- Pareto 前沿维护
- 精英保留
- 代际进化
"""
def __init__(
self,
population_size: int = 10,
elite_size: int = 2,
tournament_size: int = 3,
):
self._population_size = population_size
self._elite_size = min(elite_size, population_size)
self._tournament_size = tournament_size
self._individuals: list[PromptChromosome] = []
self._generation = 0
@property
def generation(self) -> int:
return self._generation
@property
def individuals(self) -> list[PromptChromosome]:
return list(self._individuals)
@property
def size(self) -> int:
return len(self._individuals)
def initialize(self, seed: PromptChromosome | None = None) -> None:
"""初始化种群
Args:
seed: 种子染色体所有个体基于种子变异生成
"""
if seed is None:
seed = PromptChromosome(instructions="You are a helpful assistant.")
self._individuals = [seed]
# Generate variants from seed
for i in range(self._population_size - 1):
variant = PromptChromosome(
id=str(uuid.uuid4())[:8],
instructions=seed.instructions,
demos=list(seed.demos),
constraints=list(seed.constraints),
generation=0,
)
self._individuals.append(variant)
self._generation = 0
def add(self, chromosome: PromptChromosome) -> None:
"""添加个体到种群"""
self._individuals.append(chromosome)
def get_elite(self) -> list[PromptChromosome]:
"""获取精英个体(适应度最高的 top-k"""
sorted_individuals = sorted(
self._individuals,
key=lambda c: c.fitness.accuracy,
reverse=True,
)
return sorted_individuals[: self._elite_size]
def get_pareto_front(self) -> list[PromptChromosome]:
"""获取 Pareto 前沿(不被任何其他个体支配的个体)"""
front: list[PromptChromosome] = []
for individual in self._individuals:
dominated = False
for other in self._individuals:
if other.id != individual.id and other.fitness.dominates(individual.fitness):
dominated = True
break
if not dominated:
front.append(individual)
return front
def tournament_select(self) -> PromptChromosome:
"""锦标赛选择:随机选 k 个个体,返回适应度最高的"""
if not self._individuals:
raise ValueError("Population is empty")
candidates = random.sample(
self._individuals,
min(self._tournament_size, len(self._individuals)),
)
return max(candidates, key=lambda c: c.fitness.accuracy)
def evolve(
self,
crossover: CrossoverOperator,
mutation: MutationOperator,
crossover_rate: float = 0.7,
mutation_rate: float = 0.3,
) -> list[PromptChromosome]:
"""执行一代进化
1. 保留精英
2. 锦标赛选择父代
3. 交叉生成子代
4. 变异子代
5. 替换种群保留精英 + 新子代
Returns:
新一代个体列表
"""
import asyncio
self._generation += 1
# 1. Preserve elite
elite = self.get_elite()
new_generation = list(elite)
# 2-4. Generate offspring
offspring_tasks = []
while len(new_generation) + len(offspring_tasks) < self._population_size:
parent_a = self.tournament_select()
parent_b = self.tournament_select()
if random.random() < crossover_rate:
child = crossover.crossover(parent_a, parent_b)
else:
child = copy.deepcopy(parent_a)
offspring_tasks.append((child, mutation_rate))
# Execute mutations (sync for simplicity, async for LLM mutations)
for child, m_rate in offspring_tasks:
try:
# Try async mutation
loop = asyncio.get_event_loop()
if loop.is_running():
# We're in an async context — use sync fallback
mutated = PromptChromosome(
instructions=child.instructions,
demos=child.demos,
constraints=child.constraints,
generation=self._generation,
parent_ids=child.parent_ids,
)
else:
mutated = loop.run_until_complete(mutation.mutate(child, m_rate))
except RuntimeError:
mutated = PromptChromosome(
instructions=child.instructions,
demos=child.demos,
constraints=child.constraints,
generation=self._generation,
parent_ids=child.parent_ids,
)
new_generation.append(mutated)
# 5. Replace population
self._individuals = new_generation[: self._population_size]
logger.info(
f"Generation {self._generation}: "
f"population={len(self._individuals)}, "
f"elite={len(elite)}, "
f"best_accuracy={max(c.fitness.accuracy for c in self._individuals):.2f}"
)
return list(self._individuals)
def get_best(self) -> PromptChromosome:
"""获取适应度最高的个体"""
if not self._individuals:
raise ValueError("Population is empty")
return max(self._individuals, key=lambda c: c.fitness.accuracy)
def get_statistics(self) -> dict[str, Any]:
"""获取种群统计信息"""
if not self._individuals:
return {"generation": self._generation, "size": 0}
accuracies = [c.fitness.accuracy for c in self._individuals]
return {
"generation": self._generation,
"size": len(self._individuals),
"best_accuracy": max(accuracies),
"avg_accuracy": sum(accuracies) / len(accuracies),
"worst_accuracy": min(accuracies),
"pareto_front_size": len(self.get_pareto_front()),
}

View File

@ -0,0 +1,304 @@
"""Tests for GEPA genetic evolution"""
import pytest
from agentkit.evolution.genetic import (
CrossoverOperator,
FitnessScore,
GEPAPopulation,
MutationOperator,
PromptChromosome,
)
from agentkit.evolution.prompt_optimizer import Module, Signature
class TestFitnessScore:
"""FitnessScore unit tests"""
def test_dominates(self):
a = FitnessScore(accuracy=0.9, latency_ms=100, cost_tokens=500)
b = FitnessScore(accuracy=0.7, latency_ms=200, cost_tokens=1000)
assert a.dominates(b)
assert not b.dominates(a)
def test_no_dominance_equal(self):
a = FitnessScore(accuracy=0.8, latency_ms=100)
b = FitnessScore(accuracy=0.8, latency_ms=100)
assert not a.dominates(b)
assert not b.dominates(a)
def test_partial_dominance(self):
a = FitnessScore(accuracy=0.9, latency_ms=200) # Higher accuracy but slower
b = FitnessScore(accuracy=0.7, latency_ms=100) # Faster but lower accuracy
assert not a.dominates(b) # a is not >= b in all dimensions
assert not b.dominates(a) # b is not >= a in all dimensions
def test_normalized_values(self):
score = FitnessScore(accuracy=0.8, latency_ms=1000, cost_tokens=2000)
n = score.normalized
assert n["accuracy"] == 0.8
assert 0 < n["latency"] < 1
assert 0 < n["cost"] < 1
def test_zero_fitness(self):
score = FitnessScore()
assert score.accuracy == 0.0
n = score.normalized
assert n["accuracy"] == 0.0
class TestPromptChromosome:
"""PromptChromosome unit tests"""
def test_from_module(self):
module = Module(
name="test",
signature=Signature(
input_fields={"query": "user query"},
output_fields={"answer": "response"},
instruction="Answer the question.\n- Must be accurate\n- Never hallucinate",
),
demos=[{"input": "test", "output": "result"}],
)
chromosome = PromptChromosome.from_module(module)
assert "Answer the question" in chromosome.instructions
assert len(chromosome.constraints) >= 1
assert len(chromosome.demos) == 1
def test_to_module(self):
chromosome = PromptChromosome(
instructions="Test instruction",
demos=[{"input": "q", "output": "a"}],
constraints=["Be accurate"],
)
module = chromosome.to_module("test_module")
assert module.name == "test_module"
assert "Test instruction" in module.signature.instruction
assert len(module.demos) == 1
def test_default_values(self):
c = PromptChromosome()
assert c.instructions == ""
assert c.demos == []
assert c.constraints == []
assert c.generation == 0
assert c.fitness.accuracy == 0.0
class TestCrossoverOperator:
"""CrossoverOperator unit tests"""
def setup_method(self):
self.crossover = CrossoverOperator()
def test_crossover_produces_child(self):
parent_a = PromptChromosome(
instructions="Instruction A paragraph 1\n\nInstruction A paragraph 2",
demos=[{"input": "a1", "output": "r1"}],
constraints=["Constraint A"],
)
parent_b = PromptChromosome(
instructions="Instruction B paragraph 1\n\nInstruction B paragraph 2",
demos=[{"input": "b1", "output": "r2"}],
constraints=["Constraint B"],
)
child = self.crossover.crossover(parent_a, parent_b)
assert child.generation == 1
assert len(child.parent_ids) == 2
assert parent_a.id in child.parent_ids
assert parent_b.id in child.parent_ids
def test_crossover_preserves_content(self):
parent_a = PromptChromosome(instructions="A", demos=[], constraints=["C1"])
parent_b = PromptChromosome(instructions="B", demos=[], constraints=["C2"])
child = self.crossover.crossover(parent_a, parent_b, crossover_rate=0.0)
# With rate=0, should take from parent_a
assert child.instructions == "A"
def test_crossover_demos(self):
parent_a = PromptChromosome(
demos=[{"input": "a1", "output": "r1"}, {"input": "a2", "output": "r2"}],
)
parent_b = PromptChromosome(
demos=[{"input": "b1", "output": "r3"}],
)
child = self.crossover.crossover(parent_a, parent_b)
# Child should have demos from both parents
assert len(child.demos) >= 0 # May be empty due to rate filtering
def test_crossover_constraints(self):
parent_a = PromptChromosome(constraints=["C1", "C2"])
parent_b = PromptChromosome(constraints=["C3", "C4"])
child = self.crossover.crossover(parent_a, parent_b)
# Child should have some constraints from parents
assert isinstance(child.constraints, list)
class TestMutationOperator:
"""MutationOperator unit tests"""
def setup_method(self):
self.mutation = MutationOperator()
@pytest.mark.asyncio
async def test_mutate_returns_new_chromosome(self):
original = PromptChromosome(
instructions="Test instruction",
demos=[{"input": "q", "output": "a"}],
constraints=["Be accurate"],
)
mutated = await self.mutation.mutate(original, mutation_rate=1.0)
assert mutated.parent_ids == [original.id]
assert mutated.generation == original.generation
@pytest.mark.asyncio
async def test_mutate_with_zero_rate(self):
original = PromptChromosome(
instructions="Test instruction",
demos=[{"input": "q", "output": "a"}],
constraints=["Be accurate"],
)
mutated = await self.mutation.mutate(original, mutation_rate=0.0)
# With rate=0, should be identical
assert mutated.instructions == original.instructions
assert mutated.demos == original.demos
assert mutated.constraints == original.constraints
@pytest.mark.asyncio
async def test_demo_mutation(self):
original = PromptChromosome(
demos=[
{"input": "q1", "output": "a1"},
{"input": "q2", "output": "a2"},
{"input": "q3", "output": "a3"},
],
)
mutated_demos = self.mutation._mutate_demos(original.demos)
assert isinstance(mutated_demos, list)
@pytest.mark.asyncio
async def test_constraint_mutation_add(self):
constraints = ["Be accurate"]
mutated = self.mutation._mutate_constraints(constraints)
assert isinstance(mutated, list)
@pytest.mark.asyncio
async def test_constraint_mutation_remove(self):
constraints = ["C1", "C2", "C3"]
mutated = self.mutation._mutate_constraints(constraints)
assert isinstance(mutated, list)
class TestGEPAPopulation:
"""GEPAPopulation unit tests"""
def setup_method(self):
self.population = GEPAPopulation(population_size=6, elite_size=2, tournament_size=3)
def test_initialize_with_seed(self):
seed = PromptChromosome(instructions="You are a helpful assistant.")
self.population.initialize(seed)
assert self.population.size == 6
assert self.population.generation == 0
def test_initialize_without_seed(self):
self.population.initialize()
assert self.population.size == 6
def test_get_elite(self):
self.population.initialize()
# Set fitness scores
for i, ind in enumerate(self.population.individuals):
ind.fitness = FitnessScore(accuracy=i * 0.1)
elite = self.population.get_elite()
assert len(elite) == 2
assert elite[0].fitness.accuracy >= elite[1].fitness.accuracy
def test_tournament_select(self):
self.population.initialize()
for i, ind in enumerate(self.population.individuals):
ind.fitness = FitnessScore(accuracy=i * 0.1)
selected = self.population.tournament_select()
assert isinstance(selected, PromptChromosome)
def test_tournament_select_empty_population(self):
with pytest.raises(ValueError, match="Population is empty"):
self.population.tournament_select()
def test_get_best(self):
self.population.initialize()
for i, ind in enumerate(self.population.individuals):
ind.fitness = FitnessScore(accuracy=i * 0.1)
best = self.population.get_best()
assert best.fitness.accuracy == 0.5 # Last individual (index 5 * 0.1)
def test_evolve(self):
self.population.initialize()
for i, ind in enumerate(self.population.individuals):
ind.fitness = FitnessScore(accuracy=i * 0.1)
crossover = CrossoverOperator()
mutation = MutationOperator()
new_gen = self.population.evolve(crossover, mutation)
assert self.population.generation == 1
assert len(new_gen) == 6
def test_multiple_generations(self):
self.population.initialize()
for i, ind in enumerate(self.population.individuals):
ind.fitness = FitnessScore(accuracy=i * 0.1)
crossover = CrossoverOperator()
mutation = MutationOperator()
for _ in range(5):
self.population.evolve(crossover, mutation)
# Re-evaluate fitness (simulated)
for i, ind in enumerate(self.population.individuals):
ind.fitness = FitnessScore(accuracy=min(1.0, i * 0.1 + 0.3))
assert self.population.generation == 5
def test_get_pareto_front(self):
self.population.initialize()
# Set diverse fitness
self.population.individuals[0].fitness = FitnessScore(accuracy=0.9, latency_ms=500)
self.population.individuals[1].fitness = FitnessScore(accuracy=0.7, latency_ms=100)
self.population.individuals[2].fitness = FitnessScore(accuracy=0.5, latency_ms=50)
self.population.individuals[3].fitness = FitnessScore(accuracy=0.3, latency_ms=30)
self.population.individuals[4].fitness = FitnessScore(accuracy=0.8, latency_ms=200)
self.population.individuals[5].fitness = FitnessScore(accuracy=0.6, latency_ms=150)
front = self.population.get_pareto_front()
assert len(front) >= 1
# The front should contain non-dominated individuals
def test_get_statistics(self):
self.population.initialize()
for i, ind in enumerate(self.population.individuals):
ind.fitness = FitnessScore(accuracy=i * 0.1 + 0.3)
stats = self.population.get_statistics()
assert stats["generation"] == 0
assert stats["size"] == 6
assert "best_accuracy" in stats
assert "avg_accuracy" in stats
def test_get_statistics_empty(self):
stats = self.population.get_statistics()
assert stats["size"] == 0
def test_add_individual(self):
self.population.initialize()
initial_size = self.population.size
new_individual = PromptChromosome(instructions="New individual")
self.population.add(new_individual)
assert self.population.size == initial_size + 1