diff --git a/src/agentkit/evolution/genetic.py b/src/agentkit/evolution/genetic.py new file mode 100644 index 0000000..38d9e4e --- /dev/null +++ b/src/agentkit/evolution/genetic.py @@ -0,0 +1,529 @@ +"""GEPA - Genetic-Pareto Prompt Evolution + +基于遗传算法的 Prompt 进化框架,支持: +- 种群管理(Population) +- 交叉算子(Crossover) +- 变异算子(Mutation) +- Pareto 多目标选择 +- 精英保留(Elitism) +- 代际进化 + +参考:GEPA: Reflective Prompt Evolution Can Outperform Reinforcement Learning (2025) +""" + +from __future__ import annotations + +import copy +import logging +import random +import uuid +from dataclasses import dataclass, field +from typing import Any + +from agentkit.evolution.prompt_optimizer import Module, Signature + +logger = logging.getLogger(__name__) + + +@dataclass +class FitnessScore: + """多目标适应度评分""" + + accuracy: float = 0.0 # 0-1, 任务成功率 + latency_ms: float = 0.0 # 越低越好 + cost_tokens: float = 0.0 # 越低越好 + custom: float = 0.0 # 自定义指标 + + @property + def normalized(self) -> dict[str, float]: + """归一化到 [0, 1],latency 和 cost 越低越好所以取反""" + return { + "accuracy": self.accuracy, + "latency": 1.0 - min(self.latency_ms / 10000.0, 1.0), # 10s 为上限 + "cost": 1.0 - min(self.cost_tokens / 10000.0, 1.0), # 10k tokens 为上限 + "custom": self.custom, + } + + def dominates(self, other: FitnessScore) -> bool: + """Pareto 支配判断:self 在所有维度 >= other 且至少一个维度 > other""" + n_self = self.normalized + n_other = other.normalized + all_geq = all(v >= n_other[k] for k, v in n_self.items()) + any_gt = any(v > n_other[k] for k, v in n_self.items()) + return all_geq and any_gt + + +@dataclass +class PromptChromosome: + """Prompt 染色体 — 一个完整的 Prompt 变体 + + 由三段可独立进化的基因组成: + - instructions: 指令段 + - demos: few-shot 示例 + - constraints: 约束条件 + """ + + id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) + instructions: str = "" + demos: list[dict[str, Any]] = field(default_factory=list) + constraints: list[str] = field(default_factory=list) + fitness: FitnessScore = field(default_factory=FitnessScore) + generation: int = 0 + parent_ids: list[str] = field(default_factory=list) + + def to_module(self, name: str = "") -> Module: + """转换为 Module 格式""" + return Module( + name=name or f"chromosome_{self.id}", + signature=Signature( + input_fields={}, + output_fields={}, + instruction=self.instructions, + ), + demos=self.demos, + ) + + @classmethod + def from_module(cls, module: Module) -> PromptChromosome: + """从 Module 创建染色体""" + # Extract constraints from instruction (lines starting with -) + constraints = [] + instructions_lines = [] + if module.signature.instruction: + for line in module.signature.instruction.split("\n"): + stripped = line.strip() + if stripped.startswith("- ") and any( + kw in stripped.lower() + for kw in ["must", "should", "never", "avoid", "do not", "always"] + ): + constraints.append(stripped[2:]) + else: + instructions_lines.append(line) + + return cls( + instructions="\n".join(instructions_lines), + demos=list(module.demos), + constraints=constraints, + ) + + +class CrossoverOperator: + """交叉算子 + + 从两个父代 Prompt 生成子代,支持: + - instructions 交叉:交换指令段落 + - demos 交叉:交换 few-shot 示例 + - constraints 交叉:交换约束条件 + """ + + def crossover( + self, + parent_a: PromptChromosome, + parent_b: PromptChromosome, + crossover_rate: float = 0.5, + ) -> PromptChromosome: + """执行交叉操作 + + Args: + parent_a: 父代 A + parent_b: 父代 B + crossover_rate: 每个基因段的交叉概率 + + Returns: + 子代染色体 + """ + child_instructions = self._crossover_text( + parent_a.instructions, parent_b.instructions, crossover_rate + ) + child_demos = self._crossover_demos( + parent_a.demos, parent_b.demos, crossover_rate + ) + child_constraints = self._crossover_constraints( + parent_a.constraints, parent_b.constraints, crossover_rate + ) + + return PromptChromosome( + instructions=child_instructions, + demos=child_demos, + constraints=child_constraints, + generation=max(parent_a.generation, parent_b.generation) + 1, + parent_ids=[parent_a.id, parent_b.id], + ) + + def _crossover_text( + self, text_a: str, text_b: str, rate: float + ) -> str: + """文本段落交叉:按段落交换""" + if not text_a or not text_b: + return text_a if random.random() < 0.5 else text_b + + paragraphs_a = [p.strip() for p in text_a.split("\n\n") if p.strip()] + paragraphs_b = [p.strip() for p in text_b.split("\n\n") if p.strip()] + + if not paragraphs_a or not paragraphs_b: + return text_a if random.random() < 0.5 else text_b + + # Interleave paragraphs from both parents + result = [] + max_len = max(len(paragraphs_a), len(paragraphs_b)) + for i in range(max_len): + if random.random() < rate: + # Take from B + if i < len(paragraphs_b): + result.append(paragraphs_b[i]) + elif i < len(paragraphs_a): + result.append(paragraphs_a[i]) + else: + # Take from A + if i < len(paragraphs_a): + result.append(paragraphs_a[i]) + elif i < len(paragraphs_b): + result.append(paragraphs_b[i]) + + return "\n\n".join(result) + + def _crossover_demos( + self, + demos_a: list[dict], + demos_b: list[dict], + rate: float, + ) -> list[dict]: + """Demo 交叉:混合两个父代的示例""" + if not demos_a: + return list(demos_b) if random.random() < 0.5 else [] + if not demos_b: + return list(demos_a) if random.random() < 0.5 else [] + + # Take some from each parent + result = [] + used_inputs: set[str] = set() + + for demo in demos_a + demos_b: + demo_key = str(demo.get("input", ""))[:50] + if demo_key not in used_inputs and random.random() < (1 - rate): + result.append(copy.deepcopy(demo)) + used_inputs.add(demo_key) + + return result[:5] # Limit to 5 demos + + def _crossover_constraints( + self, + constraints_a: list[str], + constraints_b: list[str], + rate: float, + ) -> list[str]: + """约束交叉:合并两个父代的约束""" + all_constraints = set(constraints_a) | set(constraints_b) + result = [] + for c in all_constraints: + if random.random() < (1 - rate * 0.5): + result.append(c) + return result + + +class MutationOperator: + """变异算子 + + 基于 LLM 反思的结构化变异: + - 指令变异:LLM 重写指令段落 + - Demo 变异:替换/重排 few-shot 示例 + - 约束变异:增删约束条件 + """ + + def __init__(self, llm_gateway: Any = None): + self._llm_gateway = llm_gateway + + async def mutate( + self, + chromosome: PromptChromosome, + mutation_rate: float = 0.3, + ) -> PromptChromosome: + """执行变异操作 + + Args: + chromosome: 待变异的染色体 + mutation_rate: 变异概率 + + Returns: + 变异后的新染色体 + """ + new_instructions = chromosome.instructions + new_demos = list(chromosome.demos) + new_constraints = list(chromosome.constraints) + + # Instructions mutation + if random.random() < mutation_rate: + new_instructions = await self._mutate_instructions( + chromosome.instructions + ) + + # Demo mutation + if random.random() < mutation_rate and new_demos: + new_demos = self._mutate_demos(new_demos) + + # Constraint mutation + if random.random() < mutation_rate: + new_constraints = self._mutate_constraints(new_constraints) + + return PromptChromosome( + instructions=new_instructions, + demos=new_demos, + constraints=new_constraints, + generation=chromosome.generation, + parent_ids=[chromosome.id], + ) + + async def _mutate_instructions(self, instructions: str) -> str: + """指令变异""" + if self._llm_gateway: + try: + response = await self._llm_gateway.chat( + messages=[ + { + "role": "system", + "content": ( + "You are a prompt mutation assistant. Slightly modify the " + "given instruction to improve clarity and effectiveness. " + "Keep the core intent unchanged. Output ONLY the modified instruction." + ), + }, + {"role": "user", "content": instructions}, + ], + model="default", + ) + return response.content.strip() or instructions + except Exception as e: + logger.warning(f"LLM instruction mutation failed: {e}") + + # Fallback: simple text mutation (shuffle paragraphs) + paragraphs = [p.strip() for p in instructions.split("\n\n") if p.strip()] + if len(paragraphs) > 1: + random.shuffle(paragraphs) + return "\n\n".join(paragraphs) + + def _mutate_demos(self, demos: list[dict]) -> list[dict]: + """Demo 变异:重排或随机删除一个""" + mutated = list(demos) + if random.random() < 0.5 and len(mutated) > 1: + # Shuffle + random.shuffle(mutated) + elif len(mutated) > 2: + # Remove a random demo + idx = random.randint(0, len(mutated) - 1) + mutated.pop(idx) + return mutated + + def _mutate_constraints(self, constraints: list[str]) -> list[str]: + """约束变异:随机增删约束""" + mutated = list(constraints) + if random.random() < 0.5 and mutated: + # Remove a random constraint + idx = random.randint(0, len(mutated) - 1) + mutated.pop(idx) + else: + # Add a generic constraint + generic_constraints = [ + "Always verify the output before responding", + "Keep responses concise and focused", + "Prioritize accuracy over completeness", + "Consider edge cases in your analysis", + ] + new_constraint = random.choice(generic_constraints) + if new_constraint not in mutated: + mutated.append(new_constraint) + return mutated + + +class GEPAPopulation: + """GEPA 种群管理 + + 维护一组 PromptChromosome,支持: + - 初始化(从种子 Prompt 或随机生成) + - 添加/淘汰个体 + - Pareto 前沿维护 + - 精英保留 + - 代际进化 + """ + + def __init__( + self, + population_size: int = 10, + elite_size: int = 2, + tournament_size: int = 3, + ): + self._population_size = population_size + self._elite_size = min(elite_size, population_size) + self._tournament_size = tournament_size + self._individuals: list[PromptChromosome] = [] + self._generation = 0 + + @property + def generation(self) -> int: + return self._generation + + @property + def individuals(self) -> list[PromptChromosome]: + return list(self._individuals) + + @property + def size(self) -> int: + return len(self._individuals) + + def initialize(self, seed: PromptChromosome | None = None) -> None: + """初始化种群 + + Args: + seed: 种子染色体,所有个体基于种子变异生成 + """ + if seed is None: + seed = PromptChromosome(instructions="You are a helpful assistant.") + + self._individuals = [seed] + # Generate variants from seed + for i in range(self._population_size - 1): + variant = PromptChromosome( + id=str(uuid.uuid4())[:8], + instructions=seed.instructions, + demos=list(seed.demos), + constraints=list(seed.constraints), + generation=0, + ) + self._individuals.append(variant) + + self._generation = 0 + + def add(self, chromosome: PromptChromosome) -> None: + """添加个体到种群""" + self._individuals.append(chromosome) + + def get_elite(self) -> list[PromptChromosome]: + """获取精英个体(适应度最高的 top-k)""" + sorted_individuals = sorted( + self._individuals, + key=lambda c: c.fitness.accuracy, + reverse=True, + ) + return sorted_individuals[: self._elite_size] + + def get_pareto_front(self) -> list[PromptChromosome]: + """获取 Pareto 前沿(不被任何其他个体支配的个体)""" + front: list[PromptChromosome] = [] + for individual in self._individuals: + dominated = False + for other in self._individuals: + if other.id != individual.id and other.fitness.dominates(individual.fitness): + dominated = True + break + if not dominated: + front.append(individual) + return front + + def tournament_select(self) -> PromptChromosome: + """锦标赛选择:随机选 k 个个体,返回适应度最高的""" + if not self._individuals: + raise ValueError("Population is empty") + + candidates = random.sample( + self._individuals, + min(self._tournament_size, len(self._individuals)), + ) + return max(candidates, key=lambda c: c.fitness.accuracy) + + def evolve( + self, + crossover: CrossoverOperator, + mutation: MutationOperator, + crossover_rate: float = 0.7, + mutation_rate: float = 0.3, + ) -> list[PromptChromosome]: + """执行一代进化 + + 1. 保留精英 + 2. 锦标赛选择父代 + 3. 交叉生成子代 + 4. 变异子代 + 5. 替换种群(保留精英 + 新子代) + + Returns: + 新一代个体列表 + """ + import asyncio + + self._generation += 1 + + # 1. Preserve elite + elite = self.get_elite() + new_generation = list(elite) + + # 2-4. Generate offspring + offspring_tasks = [] + while len(new_generation) + len(offspring_tasks) < self._population_size: + parent_a = self.tournament_select() + parent_b = self.tournament_select() + + if random.random() < crossover_rate: + child = crossover.crossover(parent_a, parent_b) + else: + child = copy.deepcopy(parent_a) + + offspring_tasks.append((child, mutation_rate)) + + # Execute mutations (sync for simplicity, async for LLM mutations) + for child, m_rate in offspring_tasks: + try: + # Try async mutation + loop = asyncio.get_event_loop() + if loop.is_running(): + # We're in an async context — use sync fallback + mutated = PromptChromosome( + instructions=child.instructions, + demos=child.demos, + constraints=child.constraints, + generation=self._generation, + parent_ids=child.parent_ids, + ) + else: + mutated = loop.run_until_complete(mutation.mutate(child, m_rate)) + except RuntimeError: + mutated = PromptChromosome( + instructions=child.instructions, + demos=child.demos, + constraints=child.constraints, + generation=self._generation, + parent_ids=child.parent_ids, + ) + + new_generation.append(mutated) + + # 5. Replace population + self._individuals = new_generation[: self._population_size] + + logger.info( + f"Generation {self._generation}: " + f"population={len(self._individuals)}, " + f"elite={len(elite)}, " + f"best_accuracy={max(c.fitness.accuracy for c in self._individuals):.2f}" + ) + + return list(self._individuals) + + def get_best(self) -> PromptChromosome: + """获取适应度最高的个体""" + if not self._individuals: + raise ValueError("Population is empty") + return max(self._individuals, key=lambda c: c.fitness.accuracy) + + def get_statistics(self) -> dict[str, Any]: + """获取种群统计信息""" + if not self._individuals: + return {"generation": self._generation, "size": 0} + + accuracies = [c.fitness.accuracy for c in self._individuals] + return { + "generation": self._generation, + "size": len(self._individuals), + "best_accuracy": max(accuracies), + "avg_accuracy": sum(accuracies) / len(accuracies), + "worst_accuracy": min(accuracies), + "pareto_front_size": len(self.get_pareto_front()), + } diff --git a/tests/unit/test_genetic_evolution.py b/tests/unit/test_genetic_evolution.py new file mode 100644 index 0000000..c043474 --- /dev/null +++ b/tests/unit/test_genetic_evolution.py @@ -0,0 +1,304 @@ +"""Tests for GEPA genetic evolution""" + +import pytest + +from agentkit.evolution.genetic import ( + CrossoverOperator, + FitnessScore, + GEPAPopulation, + MutationOperator, + PromptChromosome, +) +from agentkit.evolution.prompt_optimizer import Module, Signature + + +class TestFitnessScore: + """FitnessScore unit tests""" + + def test_dominates(self): + a = FitnessScore(accuracy=0.9, latency_ms=100, cost_tokens=500) + b = FitnessScore(accuracy=0.7, latency_ms=200, cost_tokens=1000) + assert a.dominates(b) + assert not b.dominates(a) + + def test_no_dominance_equal(self): + a = FitnessScore(accuracy=0.8, latency_ms=100) + b = FitnessScore(accuracy=0.8, latency_ms=100) + assert not a.dominates(b) + assert not b.dominates(a) + + def test_partial_dominance(self): + a = FitnessScore(accuracy=0.9, latency_ms=200) # Higher accuracy but slower + b = FitnessScore(accuracy=0.7, latency_ms=100) # Faster but lower accuracy + assert not a.dominates(b) # a is not >= b in all dimensions + assert not b.dominates(a) # b is not >= a in all dimensions + + def test_normalized_values(self): + score = FitnessScore(accuracy=0.8, latency_ms=1000, cost_tokens=2000) + n = score.normalized + assert n["accuracy"] == 0.8 + assert 0 < n["latency"] < 1 + assert 0 < n["cost"] < 1 + + def test_zero_fitness(self): + score = FitnessScore() + assert score.accuracy == 0.0 + n = score.normalized + assert n["accuracy"] == 0.0 + + +class TestPromptChromosome: + """PromptChromosome unit tests""" + + def test_from_module(self): + module = Module( + name="test", + signature=Signature( + input_fields={"query": "user query"}, + output_fields={"answer": "response"}, + instruction="Answer the question.\n- Must be accurate\n- Never hallucinate", + ), + demos=[{"input": "test", "output": "result"}], + ) + chromosome = PromptChromosome.from_module(module) + assert "Answer the question" in chromosome.instructions + assert len(chromosome.constraints) >= 1 + assert len(chromosome.demos) == 1 + + def test_to_module(self): + chromosome = PromptChromosome( + instructions="Test instruction", + demos=[{"input": "q", "output": "a"}], + constraints=["Be accurate"], + ) + module = chromosome.to_module("test_module") + assert module.name == "test_module" + assert "Test instruction" in module.signature.instruction + assert len(module.demos) == 1 + + def test_default_values(self): + c = PromptChromosome() + assert c.instructions == "" + assert c.demos == [] + assert c.constraints == [] + assert c.generation == 0 + assert c.fitness.accuracy == 0.0 + + +class TestCrossoverOperator: + """CrossoverOperator unit tests""" + + def setup_method(self): + self.crossover = CrossoverOperator() + + def test_crossover_produces_child(self): + parent_a = PromptChromosome( + instructions="Instruction A paragraph 1\n\nInstruction A paragraph 2", + demos=[{"input": "a1", "output": "r1"}], + constraints=["Constraint A"], + ) + parent_b = PromptChromosome( + instructions="Instruction B paragraph 1\n\nInstruction B paragraph 2", + demos=[{"input": "b1", "output": "r2"}], + constraints=["Constraint B"], + ) + + child = self.crossover.crossover(parent_a, parent_b) + assert child.generation == 1 + assert len(child.parent_ids) == 2 + assert parent_a.id in child.parent_ids + assert parent_b.id in child.parent_ids + + def test_crossover_preserves_content(self): + parent_a = PromptChromosome(instructions="A", demos=[], constraints=["C1"]) + parent_b = PromptChromosome(instructions="B", demos=[], constraints=["C2"]) + + child = self.crossover.crossover(parent_a, parent_b, crossover_rate=0.0) + # With rate=0, should take from parent_a + assert child.instructions == "A" + + def test_crossover_demos(self): + parent_a = PromptChromosome( + demos=[{"input": "a1", "output": "r1"}, {"input": "a2", "output": "r2"}], + ) + parent_b = PromptChromosome( + demos=[{"input": "b1", "output": "r3"}], + ) + + child = self.crossover.crossover(parent_a, parent_b) + # Child should have demos from both parents + assert len(child.demos) >= 0 # May be empty due to rate filtering + + def test_crossover_constraints(self): + parent_a = PromptChromosome(constraints=["C1", "C2"]) + parent_b = PromptChromosome(constraints=["C3", "C4"]) + + child = self.crossover.crossover(parent_a, parent_b) + # Child should have some constraints from parents + assert isinstance(child.constraints, list) + + +class TestMutationOperator: + """MutationOperator unit tests""" + + def setup_method(self): + self.mutation = MutationOperator() + + @pytest.mark.asyncio + async def test_mutate_returns_new_chromosome(self): + original = PromptChromosome( + instructions="Test instruction", + demos=[{"input": "q", "output": "a"}], + constraints=["Be accurate"], + ) + mutated = await self.mutation.mutate(original, mutation_rate=1.0) + assert mutated.parent_ids == [original.id] + assert mutated.generation == original.generation + + @pytest.mark.asyncio + async def test_mutate_with_zero_rate(self): + original = PromptChromosome( + instructions="Test instruction", + demos=[{"input": "q", "output": "a"}], + constraints=["Be accurate"], + ) + mutated = await self.mutation.mutate(original, mutation_rate=0.0) + # With rate=0, should be identical + assert mutated.instructions == original.instructions + assert mutated.demos == original.demos + assert mutated.constraints == original.constraints + + @pytest.mark.asyncio + async def test_demo_mutation(self): + original = PromptChromosome( + demos=[ + {"input": "q1", "output": "a1"}, + {"input": "q2", "output": "a2"}, + {"input": "q3", "output": "a3"}, + ], + ) + mutated_demos = self.mutation._mutate_demos(original.demos) + assert isinstance(mutated_demos, list) + + @pytest.mark.asyncio + async def test_constraint_mutation_add(self): + constraints = ["Be accurate"] + mutated = self.mutation._mutate_constraints(constraints) + assert isinstance(mutated, list) + + @pytest.mark.asyncio + async def test_constraint_mutation_remove(self): + constraints = ["C1", "C2", "C3"] + mutated = self.mutation._mutate_constraints(constraints) + assert isinstance(mutated, list) + + +class TestGEPAPopulation: + """GEPAPopulation unit tests""" + + def setup_method(self): + self.population = GEPAPopulation(population_size=6, elite_size=2, tournament_size=3) + + def test_initialize_with_seed(self): + seed = PromptChromosome(instructions="You are a helpful assistant.") + self.population.initialize(seed) + assert self.population.size == 6 + assert self.population.generation == 0 + + def test_initialize_without_seed(self): + self.population.initialize() + assert self.population.size == 6 + + def test_get_elite(self): + self.population.initialize() + # Set fitness scores + for i, ind in enumerate(self.population.individuals): + ind.fitness = FitnessScore(accuracy=i * 0.1) + + elite = self.population.get_elite() + assert len(elite) == 2 + assert elite[0].fitness.accuracy >= elite[1].fitness.accuracy + + def test_tournament_select(self): + self.population.initialize() + for i, ind in enumerate(self.population.individuals): + ind.fitness = FitnessScore(accuracy=i * 0.1) + + selected = self.population.tournament_select() + assert isinstance(selected, PromptChromosome) + + def test_tournament_select_empty_population(self): + with pytest.raises(ValueError, match="Population is empty"): + self.population.tournament_select() + + def test_get_best(self): + self.population.initialize() + for i, ind in enumerate(self.population.individuals): + ind.fitness = FitnessScore(accuracy=i * 0.1) + + best = self.population.get_best() + assert best.fitness.accuracy == 0.5 # Last individual (index 5 * 0.1) + + def test_evolve(self): + self.population.initialize() + for i, ind in enumerate(self.population.individuals): + ind.fitness = FitnessScore(accuracy=i * 0.1) + + crossover = CrossoverOperator() + mutation = MutationOperator() + + new_gen = self.population.evolve(crossover, mutation) + assert self.population.generation == 1 + assert len(new_gen) == 6 + + def test_multiple_generations(self): + self.population.initialize() + for i, ind in enumerate(self.population.individuals): + ind.fitness = FitnessScore(accuracy=i * 0.1) + + crossover = CrossoverOperator() + mutation = MutationOperator() + + for _ in range(5): + self.population.evolve(crossover, mutation) + # Re-evaluate fitness (simulated) + for i, ind in enumerate(self.population.individuals): + ind.fitness = FitnessScore(accuracy=min(1.0, i * 0.1 + 0.3)) + + assert self.population.generation == 5 + + def test_get_pareto_front(self): + self.population.initialize() + # Set diverse fitness + self.population.individuals[0].fitness = FitnessScore(accuracy=0.9, latency_ms=500) + self.population.individuals[1].fitness = FitnessScore(accuracy=0.7, latency_ms=100) + self.population.individuals[2].fitness = FitnessScore(accuracy=0.5, latency_ms=50) + self.population.individuals[3].fitness = FitnessScore(accuracy=0.3, latency_ms=30) + self.population.individuals[4].fitness = FitnessScore(accuracy=0.8, latency_ms=200) + self.population.individuals[5].fitness = FitnessScore(accuracy=0.6, latency_ms=150) + + front = self.population.get_pareto_front() + assert len(front) >= 1 + # The front should contain non-dominated individuals + + def test_get_statistics(self): + self.population.initialize() + for i, ind in enumerate(self.population.individuals): + ind.fitness = FitnessScore(accuracy=i * 0.1 + 0.3) + + stats = self.population.get_statistics() + assert stats["generation"] == 0 + assert stats["size"] == 6 + assert "best_accuracy" in stats + assert "avg_accuracy" in stats + + def test_get_statistics_empty(self): + stats = self.population.get_statistics() + assert stats["size"] == 0 + + def test_add_individual(self): + self.population.initialize() + initial_size = self.population.size + new_individual = PromptChromosome(instructions="New individual") + self.population.add(new_individual) + assert self.population.size == initial_size + 1