fischer-agentkit/src/agentkit/evolution/genetic.py

530 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""GEPA - Genetic-Pareto Prompt Evolution
基于遗传算法的 Prompt 进化框架,支持:
- 种群管理Population
- 交叉算子Crossover
- 变异算子Mutation
- Pareto 多目标选择
- 精英保留Elitism
- 代际进化
参考GEPA: Reflective Prompt Evolution Can Outperform Reinforcement Learning (2025)
"""
from __future__ import annotations
import copy
import logging
import random
import uuid
from dataclasses import dataclass, field
from typing import Any
from agentkit.evolution.prompt_optimizer import Module, Signature
logger = logging.getLogger(__name__)
@dataclass
class FitnessScore:
"""多目标适应度评分"""
accuracy: float = 0.0 # 0-1, 任务成功率
latency_ms: float = 0.0 # 越低越好
cost_tokens: float = 0.0 # 越低越好
custom: float = 0.0 # 自定义指标
@property
def normalized(self) -> dict[str, float]:
"""归一化到 [0, 1]latency 和 cost 越低越好所以取反"""
return {
"accuracy": self.accuracy,
"latency": 1.0 - min(self.latency_ms / 10000.0, 1.0), # 10s 为上限
"cost": 1.0 - min(self.cost_tokens / 10000.0, 1.0), # 10k tokens 为上限
"custom": self.custom,
}
def dominates(self, other: FitnessScore) -> bool:
"""Pareto 支配判断self 在所有维度 >= other 且至少一个维度 > other"""
n_self = self.normalized
n_other = other.normalized
all_geq = all(v >= n_other[k] for k, v in n_self.items())
any_gt = any(v > n_other[k] for k, v in n_self.items())
return all_geq and any_gt
@dataclass
class PromptChromosome:
"""Prompt 染色体 — 一个完整的 Prompt 变体
由三段可独立进化的基因组成:
- instructions: 指令段
- demos: few-shot 示例
- constraints: 约束条件
"""
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
instructions: str = ""
demos: list[dict[str, Any]] = field(default_factory=list)
constraints: list[str] = field(default_factory=list)
fitness: FitnessScore = field(default_factory=FitnessScore)
generation: int = 0
parent_ids: list[str] = field(default_factory=list)
def to_module(self, name: str = "") -> Module:
"""转换为 Module 格式"""
return Module(
name=name or f"chromosome_{self.id}",
signature=Signature(
input_fields={},
output_fields={},
instruction=self.instructions,
),
demos=self.demos,
)
@classmethod
def from_module(cls, module: Module) -> PromptChromosome:
"""从 Module 创建染色体"""
# Extract constraints from instruction (lines starting with -)
constraints = []
instructions_lines = []
if module.signature.instruction:
for line in module.signature.instruction.split("\n"):
stripped = line.strip()
if stripped.startswith("- ") and any(
kw in stripped.lower()
for kw in ["must", "should", "never", "avoid", "do not", "always"]
):
constraints.append(stripped[2:])
else:
instructions_lines.append(line)
return cls(
instructions="\n".join(instructions_lines),
demos=list(module.demos),
constraints=constraints,
)
class CrossoverOperator:
"""交叉算子
从两个父代 Prompt 生成子代,支持:
- instructions 交叉:交换指令段落
- demos 交叉:交换 few-shot 示例
- constraints 交叉:交换约束条件
"""
def crossover(
self,
parent_a: PromptChromosome,
parent_b: PromptChromosome,
crossover_rate: float = 0.5,
) -> PromptChromosome:
"""执行交叉操作
Args:
parent_a: 父代 A
parent_b: 父代 B
crossover_rate: 每个基因段的交叉概率
Returns:
子代染色体
"""
child_instructions = self._crossover_text(
parent_a.instructions, parent_b.instructions, crossover_rate
)
child_demos = self._crossover_demos(
parent_a.demos, parent_b.demos, crossover_rate
)
child_constraints = self._crossover_constraints(
parent_a.constraints, parent_b.constraints, crossover_rate
)
return PromptChromosome(
instructions=child_instructions,
demos=child_demos,
constraints=child_constraints,
generation=max(parent_a.generation, parent_b.generation) + 1,
parent_ids=[parent_a.id, parent_b.id],
)
def _crossover_text(
self, text_a: str, text_b: str, rate: float
) -> str:
"""文本段落交叉:按段落交换"""
if not text_a or not text_b:
return text_a if random.random() < 0.5 else text_b
paragraphs_a = [p.strip() for p in text_a.split("\n\n") if p.strip()]
paragraphs_b = [p.strip() for p in text_b.split("\n\n") if p.strip()]
if not paragraphs_a or not paragraphs_b:
return text_a if random.random() < 0.5 else text_b
# Interleave paragraphs from both parents
result = []
max_len = max(len(paragraphs_a), len(paragraphs_b))
for i in range(max_len):
if random.random() < rate:
# Take from B
if i < len(paragraphs_b):
result.append(paragraphs_b[i])
elif i < len(paragraphs_a):
result.append(paragraphs_a[i])
else:
# Take from A
if i < len(paragraphs_a):
result.append(paragraphs_a[i])
elif i < len(paragraphs_b):
result.append(paragraphs_b[i])
return "\n\n".join(result)
def _crossover_demos(
self,
demos_a: list[dict],
demos_b: list[dict],
rate: float,
) -> list[dict]:
"""Demo 交叉:混合两个父代的示例"""
if not demos_a:
return list(demos_b) if random.random() < 0.5 else []
if not demos_b:
return list(demos_a) if random.random() < 0.5 else []
# Take some from each parent
result = []
used_inputs: set[str] = set()
for demo in demos_a + demos_b:
demo_key = str(demo.get("input", ""))[:50]
if demo_key not in used_inputs and random.random() < (1 - rate):
result.append(copy.deepcopy(demo))
used_inputs.add(demo_key)
return result[:5] # Limit to 5 demos
def _crossover_constraints(
self,
constraints_a: list[str],
constraints_b: list[str],
rate: float,
) -> list[str]:
"""约束交叉:合并两个父代的约束"""
all_constraints = set(constraints_a) | set(constraints_b)
result = []
for c in all_constraints:
if random.random() < (1 - rate * 0.5):
result.append(c)
return result
class MutationOperator:
"""变异算子
基于 LLM 反思的结构化变异:
- 指令变异LLM 重写指令段落
- Demo 变异:替换/重排 few-shot 示例
- 约束变异:增删约束条件
"""
def __init__(self, llm_gateway: Any = None):
self._llm_gateway = llm_gateway
async def mutate(
self,
chromosome: PromptChromosome,
mutation_rate: float = 0.3,
) -> PromptChromosome:
"""执行变异操作
Args:
chromosome: 待变异的染色体
mutation_rate: 变异概率
Returns:
变异后的新染色体
"""
new_instructions = chromosome.instructions
new_demos = list(chromosome.demos)
new_constraints = list(chromosome.constraints)
# Instructions mutation
if random.random() < mutation_rate:
new_instructions = await self._mutate_instructions(
chromosome.instructions
)
# Demo mutation
if random.random() < mutation_rate and new_demos:
new_demos = self._mutate_demos(new_demos)
# Constraint mutation
if random.random() < mutation_rate:
new_constraints = self._mutate_constraints(new_constraints)
return PromptChromosome(
instructions=new_instructions,
demos=new_demos,
constraints=new_constraints,
generation=chromosome.generation,
parent_ids=[chromosome.id],
)
async def _mutate_instructions(self, instructions: str) -> str:
"""指令变异"""
if self._llm_gateway:
try:
response = await self._llm_gateway.chat(
messages=[
{
"role": "system",
"content": (
"You are a prompt mutation assistant. Slightly modify the "
"given instruction to improve clarity and effectiveness. "
"Keep the core intent unchanged. Output ONLY the modified instruction."
),
},
{"role": "user", "content": instructions},
],
model="default",
)
return response.content.strip() or instructions
except Exception as e:
logger.warning(f"LLM instruction mutation failed: {e}")
# Fallback: simple text mutation (shuffle paragraphs)
paragraphs = [p.strip() for p in instructions.split("\n\n") if p.strip()]
if len(paragraphs) > 1:
random.shuffle(paragraphs)
return "\n\n".join(paragraphs)
def _mutate_demos(self, demos: list[dict]) -> list[dict]:
"""Demo 变异:重排或随机删除一个"""
mutated = list(demos)
if random.random() < 0.5 and len(mutated) > 1:
# Shuffle
random.shuffle(mutated)
elif len(mutated) > 2:
# Remove a random demo
idx = random.randint(0, len(mutated) - 1)
mutated.pop(idx)
return mutated
def _mutate_constraints(self, constraints: list[str]) -> list[str]:
"""约束变异:随机增删约束"""
mutated = list(constraints)
if random.random() < 0.5 and mutated:
# Remove a random constraint
idx = random.randint(0, len(mutated) - 1)
mutated.pop(idx)
else:
# Add a generic constraint
generic_constraints = [
"Always verify the output before responding",
"Keep responses concise and focused",
"Prioritize accuracy over completeness",
"Consider edge cases in your analysis",
]
new_constraint = random.choice(generic_constraints)
if new_constraint not in mutated:
mutated.append(new_constraint)
return mutated
class GEPAPopulation:
"""GEPA 种群管理
维护一组 PromptChromosome支持
- 初始化(从种子 Prompt 或随机生成)
- 添加/淘汰个体
- Pareto 前沿维护
- 精英保留
- 代际进化
"""
def __init__(
self,
population_size: int = 10,
elite_size: int = 2,
tournament_size: int = 3,
):
self._population_size = population_size
self._elite_size = min(elite_size, population_size)
self._tournament_size = tournament_size
self._individuals: list[PromptChromosome] = []
self._generation = 0
@property
def generation(self) -> int:
return self._generation
@property
def individuals(self) -> list[PromptChromosome]:
return list(self._individuals)
@property
def size(self) -> int:
return len(self._individuals)
def initialize(self, seed: PromptChromosome | None = None) -> None:
"""初始化种群
Args:
seed: 种子染色体,所有个体基于种子变异生成
"""
if seed is None:
seed = PromptChromosome(instructions="You are a helpful assistant.")
self._individuals = [seed]
# Generate variants from seed
for i in range(self._population_size - 1):
variant = PromptChromosome(
id=str(uuid.uuid4())[:8],
instructions=seed.instructions,
demos=list(seed.demos),
constraints=list(seed.constraints),
generation=0,
)
self._individuals.append(variant)
self._generation = 0
def add(self, chromosome: PromptChromosome) -> None:
"""添加个体到种群"""
self._individuals.append(chromosome)
def get_elite(self) -> list[PromptChromosome]:
"""获取精英个体(适应度最高的 top-k"""
sorted_individuals = sorted(
self._individuals,
key=lambda c: c.fitness.accuracy,
reverse=True,
)
return sorted_individuals[: self._elite_size]
def get_pareto_front(self) -> list[PromptChromosome]:
"""获取 Pareto 前沿(不被任何其他个体支配的个体)"""
front: list[PromptChromosome] = []
for individual in self._individuals:
dominated = False
for other in self._individuals:
if other.id != individual.id and other.fitness.dominates(individual.fitness):
dominated = True
break
if not dominated:
front.append(individual)
return front
def tournament_select(self) -> PromptChromosome:
"""锦标赛选择:随机选 k 个个体,返回适应度最高的"""
if not self._individuals:
raise ValueError("Population is empty")
candidates = random.sample(
self._individuals,
min(self._tournament_size, len(self._individuals)),
)
return max(candidates, key=lambda c: c.fitness.accuracy)
def evolve(
self,
crossover: CrossoverOperator,
mutation: MutationOperator,
crossover_rate: float = 0.7,
mutation_rate: float = 0.3,
) -> list[PromptChromosome]:
"""执行一代进化
1. 保留精英
2. 锦标赛选择父代
3. 交叉生成子代
4. 变异子代
5. 替换种群(保留精英 + 新子代)
Returns:
新一代个体列表
"""
import asyncio
self._generation += 1
# 1. Preserve elite
elite = self.get_elite()
new_generation = list(elite)
# 2-4. Generate offspring
offspring_tasks = []
while len(new_generation) + len(offspring_tasks) < self._population_size:
parent_a = self.tournament_select()
parent_b = self.tournament_select()
if random.random() < crossover_rate:
child = crossover.crossover(parent_a, parent_b)
else:
child = copy.deepcopy(parent_a)
offspring_tasks.append((child, mutation_rate))
# Execute mutations (sync for simplicity, async for LLM mutations)
for child, m_rate in offspring_tasks:
try:
# Try async mutation
loop = asyncio.get_event_loop()
if loop.is_running():
# We're in an async context — use sync fallback
mutated = PromptChromosome(
instructions=child.instructions,
demos=child.demos,
constraints=child.constraints,
generation=self._generation,
parent_ids=child.parent_ids,
)
else:
mutated = loop.run_until_complete(mutation.mutate(child, m_rate))
except RuntimeError:
mutated = PromptChromosome(
instructions=child.instructions,
demos=child.demos,
constraints=child.constraints,
generation=self._generation,
parent_ids=child.parent_ids,
)
new_generation.append(mutated)
# 5. Replace population
self._individuals = new_generation[: self._population_size]
logger.info(
f"Generation {self._generation}: "
f"population={len(self._individuals)}, "
f"elite={len(elite)}, "
f"best_accuracy={max(c.fitness.accuracy for c in self._individuals):.2f}"
)
return list(self._individuals)
def get_best(self) -> PromptChromosome:
"""获取适应度最高的个体"""
if not self._individuals:
raise ValueError("Population is empty")
return max(self._individuals, key=lambda c: c.fitness.accuracy)
def get_statistics(self) -> dict[str, Any]:
"""获取种群统计信息"""
if not self._individuals:
return {"generation": self._generation, "size": 0}
accuracies = [c.fitness.accuracy for c in self._individuals]
return {
"generation": self._generation,
"size": len(self._individuals),
"best_accuracy": max(accuracies),
"avg_accuracy": sum(accuracies) / len(accuracies),
"worst_accuracy": min(accuracies),
"pareto_front_size": len(self.get_pareto_front()),
}