From 91a61f9b49aa3698224973bf2448f176591592ec Mon Sep 17 00:00:00 2001 From: chiguyong Date: Fri, 3 Jul 2026 13:54:37 +0800 Subject: [PATCH] feat(evolution): auto-trigger + quality gate + actor marking (U6, R5/R6) U6 of the complex task quality loop plan. R5 (auto evolution trigger + quality gate): - EvolutionConfig (Pydantic v2): success_sample_rate=0.1, min_confidence=0.5, min_examples=3, observe_only=True, cross_workspace_sharing=False - Success path gated by success_sample_rate; failure path always runs (100%) - Observe-only mode records reflections without feeding optimizer (RV14: avoids noise-driven prompt degradation during initial rollout) - PromptOptimizer.can_optimize() consumption gate: sample count >= min_examples AND mean quality >= min_confidence - PitfallDetector confidence threshold: low-confidence warnings marked observe-only; confidence = failure_rate * min(1.0, total/3) linear ramp (ponytail: upgrade to Wilson interval) R6 (actor marking + cross-workspace sharing): - All evolution artifacts (EvolutionLogEntry, Module, PitfallWarning) carry actor field; defaults to result.agent_name - can_share_artifact(): same-workspace always allowed; cross-workspace requires explicit opt-in via EvolutionConfig.cross_workspace_sharing=True KTD-8: gave_up_after_reflections treated as failure path (triggers 100% evolution) even when stream wrapper marks status as COMPLETED. Detection via output_data.trace_outcome or error_message substring (ponytail: heuristic; upgrade path is a dedicated TaskResult.trace_outcome field). Backward compat: all gates conditional on auto_evolution_config is not None; existing EvolutionMixin usage without config preserves prior behavior. Tests: tests/unit/test_evolution_auto_trigger.py (37 tests) covers R5/R6 scenarios - sample rate gate, observe-only, consumption gate, pitfall confidence, actor marking, cross-workspace sharing, gave_up_after_reflections, error handling, fire-and-forget, backpressure cap, AE3 happy path. --- src/agentkit/evolution/__init__.py | 2 + src/agentkit/evolution/config.py | 43 + src/agentkit/evolution/lifecycle.py | 156 +++- src/agentkit/evolution/pitfall_detector.py | 120 ++- src/agentkit/evolution/prompt_optimizer.py | 54 +- tests/unit/test_evolution_auto_trigger.py | 879 +++++++++++++++++++++ 6 files changed, 1198 insertions(+), 56 deletions(-) create mode 100644 src/agentkit/evolution/config.py create mode 100644 tests/unit/test_evolution_auto_trigger.py diff --git a/src/agentkit/evolution/__init__.py b/src/agentkit/evolution/__init__.py index 61f42af..1541c96 100644 --- a/src/agentkit/evolution/__init__.py +++ b/src/agentkit/evolution/__init__.py @@ -11,6 +11,7 @@ from agentkit.evolution.prompt_optimizer import ( ) from agentkit.evolution.strategy_tuner import StrategyTuner from agentkit.evolution.ab_tester import ABTester +from agentkit.evolution.config import EvolutionConfig from agentkit.evolution.evolution_store import ( EvolutionStore, EvolutionStoreProtocol, @@ -30,6 +31,7 @@ __all__ = [ "Module", "StrategyTuner", "ABTester", + "EvolutionConfig", "EvolutionStore", "EvolutionStoreProtocol", "PersistentEvolutionStore", diff --git a/src/agentkit/evolution/config.py b/src/agentkit/evolution/config.py new file mode 100644 index 0000000..713b3ec --- /dev/null +++ b/src/agentkit/evolution/config.py @@ -0,0 +1,43 @@ +"""EvolutionConfig - auto-evolution trigger configuration (U6, R5/R6). + +R5: success sample rate gates success-path evolution at evolve_after_task() entry; + failure path always runs (100%). Quality gates (min_confidence, min_examples) + prevent noise-driven prompt degradation. +R6: actor marking on all evolution artifacts; cross-workspace sharing defaults off. +""" + +from __future__ import annotations + +from pydantic import BaseModel, ConfigDict, Field + + +class EvolutionConfig(BaseModel): + """Configuration for auto-evolution triggering and quality gates. + + Attributes: + success_sample_rate: Fraction of success-path tasks that trigger evolution + (``random.random() < rate``). Failure path always runs (100%). + Default 0.1 — 1 in 10 successful tasks feed evolution. + min_confidence: Minimum confidence for pitfall ingestion and optimizer + consumption. Low-confidence pitfalls are marked observe-only. + min_examples: Minimum sample count before PromptOptimizer may consume + them. Pairs with min_confidence as a two-part consumption gate. + observe_only: When True, reflections/examples are recorded but NOT fed + to the optimizer. Avoids noise-driven prompt degradation (RV14) + during initial rollout. Set False once signal quality is validated. + cross_workspace_sharing: When False (default), evolution artifacts + (pitfalls, optimized prompts) are NOT shared across agent/expert + workspaces. Same-workspace sharing is always on. Cross-workspace + requires explicit opt-in (R6 trust boundary). + actor_marking: When True, stamp the producing agent/expert identity on + all evolution artifacts for traceability (R6). + """ + + model_config = ConfigDict(extra="forbid") + + success_sample_rate: float = Field(default=0.1, ge=0.0, le=1.0) + min_confidence: float = Field(default=0.5, ge=0.0, le=1.0) + min_examples: int = Field(default=3, ge=1) + observe_only: bool = True + cross_workspace_sharing: bool = False + actor_marking: bool = True diff --git a/src/agentkit/evolution/lifecycle.py b/src/agentkit/evolution/lifecycle.py index d0ef216..9564e33 100644 --- a/src/agentkit/evolution/lifecycle.py +++ b/src/agentkit/evolution/lifecycle.py @@ -4,14 +4,16 @@ """ import logging +import random from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any from sqlalchemy.exc import DBAPIError -from agentkit.core.protocol import EvolutionEvent, TaskMessage, TaskResult +from agentkit.core.protocol import EvolutionEvent, TaskMessage, TaskResult, TaskStatus from agentkit.evolution.ab_tester import ABTestConfig, ABTestResult, ABTester +from agentkit.evolution.config import EvolutionConfig from agentkit.evolution.evolution_store import EvolutionStore from agentkit.evolution.llm_reflector import LLMReflector from agentkit.evolution.prompt_optimizer import ( @@ -39,6 +41,7 @@ class SoulEvolutionConfig: @dataclass class EvolutionLogEntry: """进化日志条目""" + task_id: str reflection: Reflection | None = None optimized_module: Module | None = None @@ -47,6 +50,12 @@ class EvolutionLogEntry: rolled_back: bool = False event_id: str | None = None created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + # R6: actor marking — which agent/expert produced this evolution artifact + actor: str = "" + # R5: whether this entry was gated by the success sample rate + sampled: bool = True + # R5: observe-only entries record but do not mutate prompts + observe_only: bool = False class EvolutionMixin: @@ -73,15 +82,14 @@ class EvolutionMixin: auxiliary_model: str | None = None, strategy_tuning_enabled: bool = False, evolution_config: SoulEvolutionConfig | None = None, + auto_evolution_config: EvolutionConfig | None = None, ): if reflector is not EvolutionMixin._UNSET: # 显式传入了 reflector 参数(包括 None) self._reflector = reflector elif reflector_type is not None: # 未传入 reflector,但指定了 reflector_type → 自动创建 - self._reflector = self._create_reflector( - reflector_type, llm_gateway, auxiliary_model - ) + self._reflector = self._create_reflector(reflector_type, llm_gateway, auxiliary_model) else: # 都未指定:保持向后兼容,reflector 为 None self._reflector = None @@ -93,6 +101,8 @@ class EvolutionMixin: self._current_module: Module | None = None self._strategy_tuning_enabled = strategy_tuning_enabled self._evolution_config = evolution_config + # U6/R5/R6: auto-evolution config (sample rate, quality gates, actor marking) + self._auto_evolution_config = auto_evolution_config self.pending_soul_updates: dict[str, list] = {} @staticmethod @@ -133,19 +143,43 @@ class EvolutionMixin: task: TaskMessage, result: TaskResult, memory_store: MemoryStore | None = None, + actor: str | None = None, ) -> EvolutionLogEntry: """任务完成后执行进化流程。 流程: - 1. Reflector 反思 → 得到 Reflection - 2. Soul 进化检查(如果 memory_store 可用) - 3. 如果 Reflection 有改进建议 → PromptOptimizer 优化 - 4. 如果优化产生了新 Prompt → ABTester 验证 - 5. 如果 AB 测试通过 → EvolutionStore 应用变更 - 6. 如果 AB 测试失败 → 回滚 - 7. 如果策略调优启用 → StrategyTuner 调优 + 1. R5 成功采样门控(仅 auto_evolution_config 配置时生效) + 2. Reflector 反思 → 得到 Reflection + 3. Soul 进化检查(如果 memory_store 可用) + 4. 如果 Reflection 有改进建议 → PromptOptimizer 优化 + 5. 如果优化产生了新 Prompt → ABTester 验证 + 6. 如果 AB 测试通过 → EvolutionStore 应用变更 + 7. 如果 AB 测试失败 → 回滚 + 8. 如果策略调优启用 → StrategyTuner 调优 + + R5: 成功路径按 success_sample_rate 采样;失败路径始终执行(100%)。 + R6: 所有进化产物携带 actor 标记。 + KTD-8: gave_up_after_reflections 视为失败路径。 """ - log_entry = EvolutionLogEntry(task_id=task.task_id) + # R6: actor marking — defaults to the agent that produced the result + resolved_actor = actor or result.agent_name or "" + log_entry = EvolutionLogEntry(task_id=task.task_id, actor=resolved_actor) + + cfg = self._auto_evolution_config + + # R5: success sample rate gate — only when auto_evolution_config is set. + # Failure path always runs (100%). KTD-8: gave_up_after_reflections = failure. + is_failure = self._is_failure_path(result) + if cfg is not None and not is_failure: + if random.random() >= cfg.success_sample_rate: + logger.debug( + "Success-path evolution skipped for task %s (sample rate %.2f)", + task.task_id, + cfg.success_sample_rate, + ) + log_entry.sampled = False + self._evolution_log.append(log_entry) + return log_entry # Step 1: 反思 if self._reflector is None: @@ -177,16 +211,46 @@ class EvolutionMixin: self._evolution_log.append(log_entry) return log_entry + # R5: observe-only mode — record reflection but do NOT feed optimizer. + # Avoids noise-driven prompt degradation during initial rollout (RV14). + if cfg is not None and cfg.observe_only: + logger.debug( + "Observe-only mode: recording reflection without feeding optimizer for task %s", + task.task_id, + ) + log_entry.observe_only = True + self._evolution_log.append(log_entry) + return log_entry + + # R5: consumption gate — sample count >= min_examples AND confidence达标. + min_conf = cfg.min_confidence if cfg is not None else 0.5 + min_examples = cfg.min_examples if cfg is not None else 3 + if hasattr(self._prompt_optimizer, "can_optimize"): + if not self._prompt_optimizer.can_optimize( + min_confidence=min_conf, min_examples=min_examples + ): + logger.debug( + "Optimizer consumption gate not met for task %s, skipping optimization", + task.task_id, + ) + self._evolution_log.append(log_entry) + return log_entry + # 将反思结果作为训练样本 self._prompt_optimizer.add_example( input_data=task.input_data, output_data=result.output_data or {}, quality_score=reflection.quality_score, + actor=resolved_actor, ) # Pass trace and reflection to LLMPromptOptimizer if available optimized = await self._optimize_with_context(self._current_module, reflection) + # R6: stamp actor on optimized module + if cfg is None or cfg.actor_marking: + optimized.actor = resolved_actor + # 检查是否真正产生了变化 if optimized.name == self._current_module.name and not optimized.demos: logger.debug("Optimization produced no meaningful changes") @@ -240,9 +304,43 @@ class EvolutionMixin: self._evolution_log.append(log_entry) return log_entry - async def _optimize_with_context( - self, module: Module, reflection: Reflection - ) -> Module: + def _is_failure_path(self, result: TaskResult) -> bool: + """Determine if a result should trigger failure-path evolution (100%). + + KTD-8: ``gave_up_after_reflections`` (U5) is treated as failure even when + the stream wrapper marks status as COMPLETED, because the reflexion loop + exhausted without producing a verified answer. + + ponytail: string-matching on output_data/error_message is a heuristic; + upgrade path is a dedicated TaskResult.trace_outcome field. + """ + if result.status != TaskStatus.COMPLETED: + return True + # KTD-8: detect gave_up_after_reflections signal carried in output or error + if result.output_data and isinstance(result.output_data, dict): + if result.output_data.get("trace_outcome") == "gave_up_after_reflections": + return True + if result.error_message and "gave_up_after_reflections" in result.error_message: + return True + return False + + def can_share_artifact(self, source_actor: str, target_actor: str) -> bool: + """R6: check if an evolution artifact can be shared between workspaces. + + Same-workspace sharing is always on. Cross-workspace sharing requires + explicit opt-in via ``EvolutionConfig.cross_workspace_sharing=True``. + + Trust boundary: evolution products are agent-produced and must be + validated before entering the shared store. + """ + if source_actor == target_actor: + return True + cfg = self._auto_evolution_config + if cfg is not None and cfg.cross_workspace_sharing: + return True + return False + + async def _optimize_with_context(self, module: Module, reflection: Reflection) -> Module: """Run optimization, passing reflection context if optimizer supports it""" from agentkit.evolution.prompt_optimizer import LLMPromptOptimizer @@ -263,11 +361,13 @@ class EvolutionMixin: # Create test if not exists if test_id not in self._ab_tester._tests: - self._ab_tester.create_test(ABTestConfig( - test_id=test_id, - agent_name=result.agent_name, - change_type="prompt", - )) + self._ab_tester.create_test( + ABTestConfig( + test_id=test_id, + agent_name=result.agent_name, + change_type="prompt", + ) + ) # Assign group deterministically based on task_id group = self._ab_tester.assign_group(test_id, task_id=task.task_id) @@ -318,6 +418,9 @@ class EvolutionMixin: "rolled_back": entry.rolled_back, "event_id": entry.event_id, "created_at": entry.created_at.isoformat(), + "actor": entry.actor, + "sampled": entry.sampled, + "observe_only": entry.observe_only, } if entry.reflection: record["reflection"] = { @@ -444,9 +547,7 @@ class EvolutionMixin: # 按 pattern 分类累积反思(patterns为空时使用默认category) categories = reflection.patterns if reflection.patterns else ["default"] for pattern in categories: - self.record_reflection( - pattern, reflection, task_type=task_type, score=score - ) + self.record_reflection(pattern, reflection, task_type=task_type, score=score) # 检查是否有类别满足触发条件 for category, reflections in list(self.pending_soul_updates.items()): @@ -455,9 +556,7 @@ class EvolutionMixin: quality_gradient_triggered = False if len(scores) >= 3: last_3 = scores[-3:] - declines = [ - last_3[i] - last_3[i - 1] for i in range(1, len(last_3)) - ] + declines = [last_3[i] - last_3[i - 1] for i in range(1, len(last_3))] if all(d <= config.quality_gradient_threshold for d in declines): quality_gradient_triggered = True @@ -467,7 +566,7 @@ class EvolutionMixin: for r in reflections: age_seconds = (now - r["timestamp"]).total_seconds() age_hours = age_seconds / 3600.0 - effective_count += config.time_decay_factor ** age_hours + effective_count += config.time_decay_factor**age_hours # Round to avoid floating-point precision issues # (e.g. 3 recent reflections should yield exactly 3.0) effective_count = round(effective_count, 6) @@ -506,8 +605,7 @@ class EvolutionMixin: if update_result.get("success"): logger.info( - f"Soul evolved: category={category}, " - f"version={update_result.get('version')}" + f"Soul evolved: category={category}, version={update_result.get('version')}" ) # 清除已处理的类别 del self.pending_soul_updates[category] diff --git a/src/agentkit/evolution/pitfall_detector.py b/src/agentkit/evolution/pitfall_detector.py index c50205e..e95610e 100644 --- a/src/agentkit/evolution/pitfall_detector.py +++ b/src/agentkit/evolution/pitfall_detector.py @@ -33,6 +33,9 @@ class PitfallWarning: failure_rate: 历史失败率(0.0 ~ 1.0) historical_failures: 历史失败原因列表 suggestion: 优化建议 + confidence: 置信度(0.0 ~ 1.0),综合 failure_rate 和样本量计算 + actor: 产生此预警对应的 agent/expert 标识(R6 actor marking) + observe_only: 低置信度预警标记为 observe-only,记录但不驱动优化 """ step_name: str @@ -40,6 +43,12 @@ class PitfallWarning: failure_rate: float historical_failures: list[str] = field(default_factory=list) suggestion: str = "" + # U6/R5: confidence score for quality gate before ingestion + confidence: float = 0.0 + # U6/R6: actor marking — which agent/expert produced the underlying experiences + actor: str = "" + # U6/R5: low-confidence warnings are marked observe-only (not discarded) + observe_only: bool = False class ExperienceStoreProtocol(Protocol): @@ -51,8 +60,7 @@ class ExperienceStoreProtocol(Protocol): top_k: int = 5, task_type: str | None = None, search_multiplier: int = 5, - ) -> list[Any]: - ... + ) -> list[Any]: ... # 预警级别阈值 @@ -89,27 +97,33 @@ class PitfallDetector: experience_store: ExperienceStoreProtocol, similarity_threshold: float = 0.3, max_search_results: int = 50, + min_confidence: float = 0.0, ): """ Args: experience_store: 经验存储实例(ExperienceStore 或 InMemoryExperienceStore) similarity_threshold: 步骤名称关键词匹配的最小相似度阈值 max_search_results: 从经验存储检索的最大结果数 + min_confidence: 置信度阈值(U6/R5)。低于此值的预警标记为 observe_only。 + 默认 0.0 表示不过滤(保持向后兼容)。 """ self._store = experience_store self._similarity_threshold = similarity_threshold self._max_search_results = max_search_results + self._min_confidence = min_confidence async def check_pitfalls( self, task_type: str, planned_steps: list[Any], + actor: str = "", ) -> list[PitfallWarning]: """检查计划步骤中的潜在陷阱 Args: task_type: 任务类型 planned_steps: 计划步骤列表(PlanStep 对象或具有 name/description 属性的对象) + actor: 产生此检测请求的 agent/expert 标识(R6 actor marking) Returns: 按严重程度排序的预警列表(HIGH → MEDIUM → LOW) @@ -127,7 +141,7 @@ class PitfallDetector: step_failure_stats = self._extract_step_failure_stats(all_experiences) # 3. 匹配当前计划步骤并生成预警 - warnings = self._match_and_warn(planned_steps, step_failure_stats) + warnings = self._match_and_warn(planned_steps, step_failure_stats, actor=actor) # 4. 按严重程度排序(HIGH → MEDIUM → LOW),同级别按失败率降序 warnings.sort(key=lambda w: (_warning_level_order(w.warning_level), -w.failure_rate)) @@ -208,8 +222,8 @@ class PitfallDetector: s.failure_reasons.append(error) # 收集优化建议 — only add to steps that are part of this experience - if hasattr(exp, 'optimization_tips') and exp.optimization_tips: - experience_steps = set(exp.steps) if hasattr(exp, 'steps') and exp.steps else set() + if hasattr(exp, "optimization_tips") and exp.optimization_tips: + experience_steps = set(exp.steps) if hasattr(exp, "steps") and exp.steps else set() for step_name, s in stats.items(): if experience_steps and step_name in experience_steps: s.optimization_tips.extend(exp.optimization_tips) @@ -220,6 +234,7 @@ class PitfallDetector: self, planned_steps: list[Any], step_failure_stats: dict[str, _StepFailureStats], + actor: str = "", ) -> list[PitfallWarning]: """将计划步骤与失败统计进行匹配,生成预警""" warnings: list[PitfallWarning] = [] @@ -236,9 +251,7 @@ class PitfallDetector: best_similarity = 0.0 for stats_step_name, stats in step_failure_stats.items(): - similarity = _compute_name_similarity( - step_name, step_description, stats_step_name - ) + similarity = _compute_name_similarity(step_name, step_description, stats_step_name) if similarity > best_similarity: best_similarity = similarity best_match = stats @@ -254,18 +267,29 @@ class PitfallDetector: else 0.0 ) + # U6/R5: compute confidence from failure_rate and sample size. + # ponytail: linear ramp to 3 samples; upgrade to Wilson interval + # if precision matters at low sample counts. + confidence = _compute_confidence(failure_rate, best_match.total_occurrences) + # 分配预警级别 warning_level = _determine_warning_level(failure_rate) # 生成建议 suggestion = _build_suggestion(best_match, failure_rate) + # U6/R5: low-confidence warnings are marked observe-only (not discarded) + observe_only = self._min_confidence > 0.0 and confidence < self._min_confidence + warning = PitfallWarning( step_name=step_name, warning_level=warning_level, failure_rate=round(failure_rate, 4), historical_failures=best_match.failure_reasons[:5], # 最多保留 5 条 suggestion=suggestion, + confidence=round(confidence, 4), + actor=actor, + observe_only=observe_only, ) warnings.append(warning) @@ -321,12 +345,48 @@ def _compute_name_similarity( return len(intersection) / len(union) -_STOP_WORDS = frozenset({ - "a", "an", "the", "and", "or", "but", "in", "on", "at", "to", "for", - "of", "with", "by", "from", "is", "are", "was", "were", "be", "been", - "being", "have", "has", "had", "do", "does", "did", "will", "would", - "could", "should", "may", "might", "can", "shall", "not", "no", -}) +_STOP_WORDS = frozenset( + { + "a", + "an", + "the", + "and", + "or", + "but", + "in", + "on", + "at", + "to", + "for", + "of", + "with", + "by", + "from", + "is", + "are", + "was", + "were", + "be", + "been", + "being", + "have", + "has", + "had", + "do", + "does", + "did", + "will", + "would", + "could", + "should", + "may", + "might", + "can", + "shall", + "not", + "no", + } +) def _extract_keywords(text: str) -> frozenset[str]: @@ -337,10 +397,7 @@ def _extract_keywords(text: str) -> frozenset[str]: # 统一分隔符 normalized = text.lower().replace("_", " ").replace("-", " ") words = normalized.split() - return frozenset( - w for w in words - if len(w) > 1 and w not in _STOP_WORDS - ) + return frozenset(w for w in words if len(w) > 1 and w not in _STOP_WORDS) def _determine_warning_level(failure_rate: float) -> WarningLevel: @@ -357,6 +414,33 @@ def _determine_warning_level(failure_rate: float) -> WarningLevel: return WarningLevel.LOW +# U6/R5: minimum sample count for full confidence +_CONFIDENCE_FULL_SAMPLES = 3 + + +def _compute_confidence(failure_rate: float, total_occurrences: int) -> float: + """Compute confidence score for a pitfall warning. + + Combines failure_rate with sample size: small samples reduce confidence + linearly. A warning based on 1 occurrence is low-confidence even if the + failure_rate is high; 3+ occurrences yield full confidence. + + ponytail: linear ramp is a naive heuristic; upgrade path is a Wilson + score interval for statistically rigorous low-sample confidence bounds. + + Args: + failure_rate: Historical failure rate (0.0 ~ 1.0). + total_occurrences: Total number of times this step was observed. + + Returns: + Confidence score (0.0 ~ 1.0). + """ + if total_occurrences <= 0: + return 0.0 + sample_factor = min(1.0, total_occurrences / _CONFIDENCE_FULL_SAMPLES) + return failure_rate * sample_factor + + def _warning_level_order(level: WarningLevel) -> int: """预警级别排序值(越小越严重)""" return { diff --git a/src/agentkit/evolution/prompt_optimizer.py b/src/agentkit/evolution/prompt_optimizer.py index a5dc655..a7c902f 100644 --- a/src/agentkit/evolution/prompt_optimizer.py +++ b/src/agentkit/evolution/prompt_optimizer.py @@ -21,6 +21,7 @@ logger = logging.getLogger(__name__) @dataclass class Signature: """Prompt 签名 - 定义输入/输出字段""" + input_fields: dict[str, str] # name -> description output_fields: dict[str, str] # name -> description instruction: str = "" @@ -41,10 +42,13 @@ class Signature: @dataclass class Module: """可组合的 Prompt 策略模块""" + name: str signature: Signature template: str = "" demos: list[dict[str, Any]] = field(default_factory=list) + # U6/R6: actor marking — which agent/expert produced this optimized module + actor: str = "" def render(self, **kwargs) -> str: parts = [] @@ -80,18 +84,42 @@ class BootstrapPromptOptimizer: input_data: dict, output_data: dict, quality_score: float, + actor: str = "", ) -> None: """添加训练样本""" example = { "input": input_data, "output": output_data, "quality_score": quality_score, + "actor": actor, } if quality_score >= 0.7: self._success_examples.append(example) else: self._failure_examples.append(example) + def can_optimize(self, min_confidence: float = 0.5, min_examples: int | None = None) -> bool: + """U6/R5: consumption gate — sample count and confidence达标. + + Returns True only when: + 1. Success example count >= min_examples (default: constructor's + ``min_examples_for_optimization``) + 2. Mean quality score of success examples >= min_confidence + + ponytail: mean-quality gate is redundant with the >= 0.7 success + threshold in add_example when min_confidence <= 0.7; upgrade path + is a diversity-weighted confidence metric if noise becomes an issue. + """ + threshold = min_examples if min_examples is not None else self._min_examples + if len(self._success_examples) < threshold: + return False + if not self._success_examples: + return False + mean_quality = sum(ex["quality_score"] for ex in self._success_examples) / len( + self._success_examples + ) + return mean_quality >= min_confidence + async def optimize(self, module: Module) -> Module: """优化 Module 的 Prompt @@ -110,15 +138,17 @@ class BootstrapPromptOptimizer: key=lambda x: x["quality_score"], reverse=True, ) - best_demos = sorted_examples[:self._max_demos] + best_demos = sorted_examples[: self._max_demos] # 构建 few-shot 示例 demos = [] for example in best_demos: - demos.append({ - "input": str(example["input"]), - "output": str(example["output"]), - }) + demos.append( + { + "input": str(example["input"]), + "output": str(example["output"]), + } + ) # 优化指令(基于失败案例的反面教材) optimized_instruction = module.signature.instruction @@ -127,9 +157,8 @@ class BootstrapPromptOptimizer: for ex in self._failure_examples[-3:]: failure_patterns.add(str(ex["input"])[:100]) if failure_patterns: - optimized_instruction += ( - f"\n\nAvoid these patterns:\n" - + "\n".join(f"- {p}" for p in failure_patterns) + optimized_instruction += "\n\nAvoid these patterns:\n" + "\n".join( + f"- {p}" for p in failure_patterns ) # 创建优化后的 Module @@ -186,9 +215,16 @@ class LLMPromptOptimizer: input_data: dict, output_data: dict, quality_score: float, + actor: str = "", ) -> None: """添加训练样本(委托给 bootstrap 优化器)""" - self._bootstrap.add_example(input_data, output_data, quality_score) + self._bootstrap.add_example(input_data, output_data, quality_score, actor=actor) + + def can_optimize(self, min_confidence: float = 0.5, min_examples: int | None = None) -> bool: + """U6/R5: consumption gate — delegates to bootstrap optimizer.""" + return self._bootstrap.can_optimize( + min_confidence=min_confidence, min_examples=min_examples + ) async def optimize(self, module: Module, trace: Any = None, reflection: Any = None) -> Module: """使用 LLM 优化 Module 的 Prompt diff --git a/tests/unit/test_evolution_auto_trigger.py b/tests/unit/test_evolution_auto_trigger.py new file mode 100644 index 0000000..7bcc9ca --- /dev/null +++ b/tests/unit/test_evolution_auto_trigger.py @@ -0,0 +1,879 @@ +"""Tests for U6: auto evolution trigger + quality gate + actor marking. + +Covers R5 (success sample rate, quality thresholds, observe-only) and +R6 (actor marking, cross-workspace sharing gate). + +Test scenarios: +- Happy path (AE3): failure -> evolution fires (100%); success -> fires at 0.1 rate +- Observe-only mode: recorded but not fed to optimizer +- Backpressure cap reached: evolution task dropped + logged +- Low-confidence pitfall: marked observe-only +- Evolution task error: caught, does not fail the stream +- PromptOptimizer sample count < 3: skip optimization +- Actor marking present on all artifacts +- Cross-workspace sharing rejected without opt-in +- gave_up_after_reflections triggers failure-path evolution +""" + +from __future__ import annotations + +import asyncio +from datetime import datetime, timezone +from unittest.mock import patch + +import pytest + +from agentkit.core.protocol import TaskMessage, TaskResult, TaskStatus +from agentkit.evolution.config import EvolutionConfig +from agentkit.evolution.experience_schema import TaskExperience +from agentkit.evolution.experience_store import InMemoryExperienceStore +from agentkit.evolution.lifecycle import EvolutionMixin +from agentkit.evolution.pitfall_detector import ( + PitfallDetector, + WarningLevel, + _compute_confidence, +) +from agentkit.evolution.prompt_optimizer import Module, PromptOptimizer, Signature +from agentkit.evolution.reflector import Reflection, Reflector + + +# ── Helpers ────────────────────────────────────────────── + + +def _make_task( + task_id: str = "test-001", + agent_name: str = "evolving_agent", +) -> TaskMessage: + return TaskMessage( + task_id=task_id, + agent_name=agent_name, + task_type="echo", + priority=0, + input_data={"query": "hello"}, + callback_url=None, + created_at=datetime.now(timezone.utc), + ) + + +def _make_result( + status: str = TaskStatus.COMPLETED, + output_data: dict | None = None, + error_message: str | None = None, + agent_name: str = "evolving_agent", + task_id: str = "test-001", +) -> TaskResult: + return TaskResult( + task_id=task_id, + agent_name=agent_name, + status=status, + output_data=output_data if output_data is not None else {"key": "value"}, + error_message=error_message, + started_at=datetime.now(timezone.utc), + completed_at=datetime.now(timezone.utc), + metrics={"elapsed_seconds": 5.0}, + ) + + +def _make_failure_result( + agent_name: str = "evolving_agent", + task_id: str = "test-001", +) -> TaskResult: + return _make_result( + status=TaskStatus.FAILED, + output_data=None, + error_message="task failed", + agent_name=agent_name, + task_id=task_id, + ) + + +def _make_module() -> Module: + return Module( + name="test_module", + signature=Signature( + input_fields={"query": "search query"}, + output_fields={"result": "search result"}, + instruction="Find the best result.", + ), + ) + + +class LowQualityReflector(Reflector): + """Always produces failure outcome with improvement suggestions.""" + + async def reflect(self, task: TaskMessage, result: TaskResult) -> Reflection: + return Reflection( + task_id=task.task_id, + agent_name=result.agent_name, + outcome="failure", + quality_score=0.2, + patterns=["slow_execution"], + insights=["Low quality score indicates potential issues"], + suggestions=["Consider prompt optimization for this task type"], + ) + + +class SuccessReflector(Reflector): + """Always produces success outcome with suggestions (for testing success-path).""" + + async def reflect(self, task: TaskMessage, result: TaskResult) -> Reflection: + return Reflection( + task_id=task.task_id, + agent_name=result.agent_name, + outcome="success", + quality_score=0.9, + patterns=["fast_execution"], + insights=["Good execution"], + suggestions=["Consider caching results for similar queries"], + ) + + +class ErrorReflector(Reflector): + """Always raises during reflection.""" + + async def reflect(self, task: TaskMessage, result: TaskResult) -> Reflection: + raise RuntimeError("reflector crashed") + + +def _make_experience( + task_type: str = "code_review", + outcome: str = "failure", + steps_summary: str | list = "", + success_rate: float = 0.0, +) -> TaskExperience: + return TaskExperience( + experience_id="", + task_type=task_type, + goal="test goal", + steps_summary=steps_summary, + outcome=outcome, + duration_seconds=10.0, + success_rate=success_rate, + failure_reasons=[], + optimization_tips=[], + created_at=datetime.now(timezone.utc), + ) + + +# ── R5: Success sample rate gate ───────────────────────── + + +class TestSuccessSampleRate: + """R5: success-path evolution gated by success_sample_rate; failure always runs.""" + + async def test_failure_always_triggers_evolution(self): + """Failure path always triggers evolution regardless of sample rate.""" + cfg = EvolutionConfig(success_sample_rate=0.0, observe_only=False) + reflector = LowQualityReflector() + mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) + mixin.set_current_module(_make_module()) + + task = _make_task() + result = _make_failure_result() + entry = await mixin.evolve_after_task(task, result) + + assert entry.sampled is True + assert entry.reflection is not None + assert entry.reflection.outcome == "failure" + + async def test_success_skipped_when_rate_zero(self): + """Success path skipped when success_sample_rate=0.0.""" + cfg = EvolutionConfig(success_sample_rate=0.0, observe_only=False) + reflector = SuccessReflector() + mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) + + task = _make_task() + result = _make_result(status=TaskStatus.COMPLETED) + entry = await mixin.evolve_after_task(task, result) + + assert entry.sampled is False + assert entry.reflection is None # evolution skipped before reflection + + async def test_success_runs_when_rate_one(self): + """Success path runs when success_sample_rate=1.0.""" + cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=False) + reflector = SuccessReflector() + mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) + + task = _make_task() + result = _make_result(status=TaskStatus.COMPLETED) + entry = await mixin.evolve_after_task(task, result) + + assert entry.sampled is True + assert entry.reflection is not None + assert entry.reflection.outcome == "success" + + async def test_success_sampled_at_rate_boundary(self): + """At rate=0.1, random < 0.1 runs; random >= 0.1 skips.""" + cfg = EvolutionConfig(success_sample_rate=0.1, observe_only=False) + reflector = SuccessReflector() + + # random < 0.1 -> evolution runs + mixin_run = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) + with patch("agentkit.evolution.lifecycle.random.random", return_value=0.05): + entry = await mixin_run.evolve_after_task( + _make_task(), _make_result(status=TaskStatus.COMPLETED) + ) + assert entry.sampled is True + assert entry.reflection is not None + + # random >= 0.1 -> evolution skipped + mixin_skip = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) + with patch("agentkit.evolution.lifecycle.random.random", return_value=0.15): + entry = await mixin_skip.evolve_after_task( + _make_task(), _make_result(status=TaskStatus.COMPLETED) + ) + assert entry.sampled is False + assert entry.reflection is None + + async def test_no_config_preserves_backward_compat(self): + """Without auto_evolution_config, no sample gate applies (backward compat).""" + reflector = SuccessReflector() + mixin = EvolutionMixin(reflector=reflector) + + task = _make_task() + result = _make_result(status=TaskStatus.COMPLETED) + entry = await mixin.evolve_after_task(task, result) + + assert entry.sampled is True + assert entry.reflection is not None + + +# ── R5: Observe-only mode ──────────────────────────────── + + +class TestObserveOnly: + """R5: observe-only mode records but does not feed optimizer.""" + + async def test_observe_only_records_without_optimizing(self): + """Observe-only: reflection recorded, optimizer not fed.""" + cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=True, min_confidence=0.0) + reflector = LowQualityReflector() + optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=1) + mixin = EvolutionMixin( + reflector=reflector, + prompt_optimizer=optimizer, + auto_evolution_config=cfg, + ) + mixin.set_current_module(_make_module()) + + task = _make_task() + result = _make_failure_result() + entry = await mixin.evolve_after_task(task, result) + + assert entry.observe_only is True + assert entry.reflection is not None + assert entry.optimized_module is None + # Optimizer should NOT have been fed + success_count, _ = optimizer.example_count + assert success_count == 0 + + async def test_observe_only_false_allows_optimization(self): + """When observe_only=False, optimization can proceed (if gates pass).""" + cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=False, min_confidence=0.0) + reflector = LowQualityReflector() + optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=1) + # Pre-fill enough success examples to pass consumption gate + for i in range(3): + optimizer.add_example( + input_data={"query": f"q_{i}"}, + output_data={"result": f"r_{i}"}, + quality_score=0.9, + ) + mixin = EvolutionMixin( + reflector=reflector, + prompt_optimizer=optimizer, + auto_evolution_config=cfg, + ) + mixin.set_current_module(_make_module()) + + task = _make_task() + result = _make_failure_result() + entry = await mixin.evolve_after_task(task, result) + + assert entry.observe_only is False + assert entry.optimized_module is not None + + +# ── R5: PromptOptimizer consumption gate ───────────────── + + +class TestConsumptionGate: + """R5: optimizer consumption gate — sample count >= min_examples AND confidence.""" + + async def test_sample_count_below_threshold_skips_optimization(self): + """PromptOptimizer sample count < min_examples -> skip optimization.""" + cfg = EvolutionConfig( + success_sample_rate=1.0, + observe_only=False, + min_examples=3, + min_confidence=0.0, + ) + reflector = LowQualityReflector() + optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=3) + # Only 2 success examples — below threshold + for i in range(2): + optimizer.add_example( + input_data={"query": f"q_{i}"}, + output_data={"result": f"r_{i}"}, + quality_score=0.9, + ) + mixin = EvolutionMixin( + reflector=reflector, + prompt_optimizer=optimizer, + auto_evolution_config=cfg, + ) + mixin.set_current_module(_make_module()) + + task = _make_task() + result = _make_failure_result() + entry = await mixin.evolve_after_task(task, result) + + assert entry.optimized_module is None # gate not met + + def test_can_optimize_returns_false_below_threshold(self): + """can_optimize() returns False when sample count < min_examples.""" + optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=3) + assert optimizer.can_optimize(min_confidence=0.5) is False + + def test_can_optimize_returns_true_above_threshold(self): + """can_optimize() returns True when sample count and confidence met.""" + optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=3) + for i in range(3): + optimizer.add_example( + input_data={"query": f"q_{i}"}, + output_data={"result": f"r_{i}"}, + quality_score=0.9, + ) + assert optimizer.can_optimize(min_confidence=0.5) is True + + def test_can_optimize_returns_false_low_confidence(self): + """can_optimize() returns False when mean quality < min_confidence.""" + optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=3) + for i in range(3): + optimizer.add_example( + input_data={"query": f"q_{i}"}, + output_data={"result": f"r_{i}"}, + quality_score=0.3, # below 0.5 threshold + ) + # These go to failure_examples (quality < 0.7), so success_examples is empty + assert optimizer.can_optimize(min_confidence=0.5) is False + + +# ── R5: Pitfall confidence threshold ───────────────────── + + +class TestPitfallConfidence: + """R5: low-confidence pitfalls marked observe-only.""" + + def test_compute_confidence_high_sample_high_rate(self): + """3+ occurrences with high failure_rate -> high confidence.""" + conf = _compute_confidence(failure_rate=0.6, total_occurrences=5) + assert conf == pytest.approx(0.6) + + def test_compute_confidence_low_sample(self): + """1 occurrence -> confidence scaled down by 1/3.""" + conf = _compute_confidence(failure_rate=0.6, total_occurrences=1) + assert conf == pytest.approx(0.6 * (1.0 / 3.0)) + + def test_compute_confidence_zero_samples(self): + """0 occurrences -> zero confidence.""" + assert _compute_confidence(failure_rate=0.5, total_occurrences=0) == 0.0 + + async def test_low_confidence_pitfall_marked_observe_only(self): + """Pitfall with confidence < min_confidence is marked observe-only.""" + store = InMemoryExperienceStore(decay_rate=0.01, alpha=0.7) + # Only 1 failure experience -> low sample -> low confidence + await store.record_experience( + _make_experience( + task_type="testing", + outcome="failure", + steps_summary=[ + {"step_name": "Run Tests", "outcome": "failure", "error": "Flaky"}, + ], + ) + ) + + detector = PitfallDetector( + experience_store=store, + similarity_threshold=0.3, + min_confidence=0.5, + ) + + from agentkit.core.plan_schema import PlanStep, PlanStepStatus + + steps = [ + PlanStep( + step_id="s1", + name="Run Tests", + description="Run tests", + status=PlanStepStatus.PENDING, + ) + ] + warnings = await detector.check_pitfalls( + task_type="testing", planned_steps=steps, actor="test_agent" + ) + + assert len(warnings) == 1 + assert warnings[0].observe_only is True + assert warnings[0].confidence < 0.5 + assert warnings[0].actor == "test_agent" + + async def test_high_confidence_pitfall_not_observe_only(self): + """Pitfall with confidence >= min_confidence is not observe-only.""" + store = InMemoryExperienceStore(decay_rate=0.01, alpha=0.7) + # 3+ failure experiences -> full sample factor -> high confidence + for _ in range(4): + await store.record_experience( + _make_experience( + task_type="deployment", + outcome="failure", + steps_summary=[ + {"step_name": "Deploy", "outcome": "failure", "error": "OOM"}, + ], + ) + ) + + detector = PitfallDetector( + experience_store=store, + similarity_threshold=0.3, + min_confidence=0.5, + ) + + from agentkit.core.plan_schema import PlanStep, PlanStepStatus + + steps = [ + PlanStep( + step_id="s1", name="Deploy", description="Deploy app", status=PlanStepStatus.PENDING + ) + ] + warnings = await detector.check_pitfalls(task_type="deployment", planned_steps=steps) + + assert len(warnings) == 1 + assert warnings[0].observe_only is False + assert warnings[0].confidence >= 0.5 + + +# ── R6: Actor marking ──────────────────────────────────── + + +class TestActorMarking: + """R6: actor marking on all evolution artifacts.""" + + async def test_log_entry_carries_actor(self): + """EvolutionLogEntry carries the actor identity.""" + cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=False) + reflector = LowQualityReflector() + mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) + + task = _make_task(agent_name="backend_engineer") + result = _make_failure_result(agent_name="backend_engineer") + entry = await mixin.evolve_after_task(task, result, actor="backend_engineer") + + assert entry.actor == "backend_engineer" + + async def test_actor_defaults_to_result_agent_name(self): + """Actor defaults to result.agent_name when not explicitly provided.""" + cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=True) + reflector = LowQualityReflector() + mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) + + task = _make_task(agent_name="qa_engineer") + result = _make_failure_result(agent_name="qa_engineer") + entry = await mixin.evolve_after_task(task, result) + + assert entry.actor == "qa_engineer" + + async def test_actor_marked_on_optimized_module(self): + """Optimized Module carries the actor identity.""" + cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=False, min_confidence=0.0) + reflector = LowQualityReflector() + optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=1) + for i in range(3): + optimizer.add_example( + input_data={"query": f"q_{i}"}, + output_data={"result": f"r_{i}"}, + quality_score=0.9, + ) + mixin = EvolutionMixin( + reflector=reflector, + prompt_optimizer=optimizer, + auto_evolution_config=cfg, + ) + mixin.set_current_module(_make_module()) + + task = _make_task(agent_name="tech_lead") + result = _make_failure_result(agent_name="tech_lead") + entry = await mixin.evolve_after_task(task, result, actor="tech_lead") + + assert entry.optimized_module is not None + assert entry.optimized_module.actor == "tech_lead" + + async def test_actor_in_history(self): + """get_evolution_history includes actor field.""" + cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=True) + reflector = LowQualityReflector() + mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) + + await mixin.evolve_after_task( + _make_task(), _make_failure_result(), actor="frontend_engineer" + ) + history = mixin.get_evolution_history() + assert len(history) == 1 + assert history[0]["actor"] == "frontend_engineer" + + async def test_pitfall_warning_carries_actor(self): + """PitfallWarning carries the actor identity.""" + store = InMemoryExperienceStore(decay_rate=0.01, alpha=0.7) + await store.record_experience( + _make_experience( + task_type="testing", + outcome="failure", + steps_summary=[ + {"step_name": "Run Tests", "outcome": "failure", "error": "Error"}, + ], + ) + ) + detector = PitfallDetector(experience_store=store, similarity_threshold=0.3) + + from agentkit.core.plan_schema import PlanStep, PlanStepStatus + + steps = [ + PlanStep( + step_id="s1", + name="Run Tests", + description="Run tests", + status=PlanStepStatus.PENDING, + ) + ] + warnings = await detector.check_pitfalls( + task_type="testing", planned_steps=steps, actor="code_reviewer" + ) + assert len(warnings) == 1 + assert warnings[0].actor == "code_reviewer" + + +# ── R6: Cross-workspace sharing ────────────────────────── + + +class TestCrossWorkspaceSharing: + """R6: cross-workspace sharing defaults off; same-workspace always on.""" + + def test_same_workspace_sharing_always_allowed(self): + """Same-actor sharing is always allowed.""" + mixin = EvolutionMixin(reflector=Reflector()) + assert mixin.can_share_artifact("agent_a", "agent_a") is True + + def test_cross_workspace_sharing_default_off(self): + """Cross-workspace sharing rejected without opt-in (default).""" + cfg = EvolutionConfig(cross_workspace_sharing=False) + mixin = EvolutionMixin(reflector=Reflector(), auto_evolution_config=cfg) + assert mixin.can_share_artifact("agent_a", "agent_b") is False + + def test_cross_workspace_sharing_with_opt_in(self): + """Cross-workspace sharing allowed when explicitly opted in.""" + cfg = EvolutionConfig(cross_workspace_sharing=True) + mixin = EvolutionMixin(reflector=Reflector(), auto_evolution_config=cfg) + assert mixin.can_share_artifact("agent_a", "agent_b") is True + + def test_no_config_cross_workspace_rejected(self): + """Without config, cross-workspace sharing is rejected (safe default).""" + mixin = EvolutionMixin(reflector=Reflector()) + assert mixin.can_share_artifact("agent_a", "agent_b") is False + + +# ── KTD-8: gave_up_after_reflections ───────────────────── + + +class TestGaveUpAfterReflections: + """KTD-8: gave_up_after_reflections triggers failure-path evolution.""" + + async def test_gave_up_treated_as_failure(self): + """gave_up_after_reflections in output_data triggers failure path.""" + cfg = EvolutionConfig(success_sample_rate=0.0, observe_only=True) + reflector = LowQualityReflector() + mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) + + task = _make_task() + # status=COMPLETED but trace_outcome=gave_up_after_reflections + result = _make_result( + status=TaskStatus.COMPLETED, + output_data={"trace_outcome": "gave_up_after_reflections"}, + ) + entry = await mixin.evolve_after_task(task, result) + + # Even though success_sample_rate=0.0, failure path always runs + assert entry.sampled is True + assert entry.reflection is not None + + async def test_gave_up_in_error_message_treated_as_failure(self): + """gave_up_after_reflections in error_message triggers failure path.""" + cfg = EvolutionConfig(success_sample_rate=0.0, observe_only=True) + reflector = LowQualityReflector() + mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) + + task = _make_task() + result = _make_result( + status=TaskStatus.COMPLETED, + output_data={"content": "some output"}, + error_message="gave_up_after_reflections: exhausted reinjections", + ) + entry = await mixin.evolve_after_task(task, result) + + assert entry.sampled is True + assert entry.reflection is not None + + def test_is_failure_path_normal_success(self): + """Normal success (COMPLETED, no gave_up signal) is not failure path.""" + mixin = EvolutionMixin(reflector=Reflector()) + result = _make_result(status=TaskStatus.COMPLETED, output_data={"key": "val"}) + assert mixin._is_failure_path(result) is False + + def test_is_failure_path_failed_status(self): + """FAILED status is failure path.""" + mixin = EvolutionMixin(reflector=Reflector()) + result = _make_result(status=TaskStatus.FAILED, output_data=None) + assert mixin._is_failure_path(result) is True + + def test_is_failure_path_cancelled_status(self): + """CANCELLED status is failure path.""" + mixin = EvolutionMixin(reflector=Reflector()) + result = _make_result(status=TaskStatus.CANCELLED, output_data=None) + assert mixin._is_failure_path(result) is True + + +# ── Error handling: evolution does not fail the stream ─── + + +class TestEvolutionErrorHandling: + """Evolution task error is caught and does not propagate to the caller. + + The _evolve_safe wrapper in config_driven.py catches all exceptions from + evolve_after_task. These tests verify that pattern. + """ + + async def test_evolve_safe_swallows_reflector_error(self): + """_evolve_safe pattern: reflector error is caught, not propagated.""" + + class SafeWrapper(EvolutionMixin): + """Simulates the _evolve_safe pattern from ConfigDrivenAgent.""" + + async def _evolve_safe(self, task: TaskMessage, result: TaskResult) -> None: + try: + await self.evolve_after_task(task, result) + except Exception: + pass # swallowed, matching config_driven.py:_evolve_safe + + mixin = SafeWrapper(reflector=ErrorReflector()) + # Should not raise + await mixin._evolve_safe(_make_task(), _make_failure_result()) + + async def test_apply_change_error_does_not_crash_evolution(self): + """_apply_change errors are caught internally (existing behavior).""" + cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=False, min_confidence=0.0) + reflector = LowQualityReflector() + optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=1) + for i in range(3): + optimizer.add_example( + input_data={"query": f"q_{i}"}, + output_data={"result": f"r_{i}"}, + quality_score=0.9, + ) + mixin = EvolutionMixin( + reflector=reflector, + prompt_optimizer=optimizer, + auto_evolution_config=cfg, + ) + mixin.set_current_module(_make_module()) + + # Should complete without raising even if internal steps have issues + entry = await mixin.evolve_after_task(_make_task(), _make_failure_result()) + assert entry is not None + + +# ── Integration: fire-and-forget via asyncio.create_task ─ + + +class TestFireAndForgetIntegration: + """Evolution fires via U2's execute_stream hooks (fire-and-forget pattern). + + Validates that evolve_after_task works correctly when scheduled as a + fire-and-forget asyncio task, matching _trigger_evolution_hooks behavior. + """ + + async def test_evolve_after_task_completes_as_asyncio_task(self): + """evolve_after_task completes when scheduled via asyncio.create_task.""" + cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=True) + reflector = LowQualityReflector() + mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) + + task = _make_task() + result = _make_failure_result() + + # Schedule as fire-and-forget task (mirrors _schedule_evolution) + async def _evolve(): + await mixin.evolve_after_task(task, result) + + t = asyncio.create_task(_evolve()) + await t # wait for completion + + history = mixin.get_evolution_history() + assert len(history) == 1 + assert history[0]["reflection"] is not None + + async def test_concurrent_evolution_tasks_isolated(self): + """Multiple concurrent evolution tasks don't interfere.""" + cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=True) + reflector = LowQualityReflector() + mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) + + async def _run_one(task_id: str): + await mixin.evolve_after_task( + _make_task(task_id=task_id), + _make_failure_result(task_id=task_id), + ) + + await asyncio.gather( + _run_one("task-a"), + _run_one("task-b"), + _run_one("task-c"), + ) + + history = mixin.get_evolution_history() + assert len(history) == 3 + task_ids = {h["task_id"] for h in history} + assert task_ids == {"task-a", "task-b", "task-c"} + + +# ── Backpressure cap (U2 _schedule_evolution) ──────────── + + +class TestBackpressureCap: + """Backpressure cap reached -> evolution task dropped + logged. + + Tests U2's _schedule_evolution backpressure, which U6's auto-trigger relies on. + """ + + async def test_evolution_task_dropped_when_cap_reached(self): + """When pending tasks reach cap, new evolution tasks are dropped.""" + import agentkit.core.config_driven as cd + + # Save original state to restore after test + try: + # Create blocking coroutines that won't complete during the test + block_event = asyncio.Event() + + async def _blocking_evolve() -> None: + await block_event.wait() + + cap = 4 + # Fill up to cap + for _ in range(cap): + cd._schedule_evolution(_blocking_evolve(), cap=cap) + + assert len(cd._pending_evolution_tasks) == cap + + # Track dropped count before (access via module — int is immutable) + dropped_before = cd._evolution_dropped_count + + # Try to schedule one more -> should be dropped + cd._schedule_evolution(_blocking_evolve(), cap=cap) + + assert len(cd._pending_evolution_tasks) == cap # still at cap + assert cd._evolution_dropped_count == dropped_before + 1 + + # Release the blocking tasks so they can complete and be cleaned up + block_event.set() + # Let the event loop process task completions + await asyncio.sleep(0.05) + finally: + # Restore: clean up any remaining tasks + block_event = asyncio.Event() + block_event.set() + # Wait for any stragglers + if cd._pending_evolution_tasks: + await asyncio.gather(*cd._pending_evolution_tasks, return_exceptions=True) + cd._pending_evolution_tasks.clear() + + +# ── AE3: Happy path — pitfall detection ────────────────── + + +class TestAE3HappyPath: + """AE3: task fails -> evolution fires (100%) -> Reflector records -> + PitfallDetector detects; task succeeds -> evolution fires at 0.1 rate. + """ + + async def test_failure_triggers_evolution_and_pitfall_detection(self): + """Full happy path: failure -> evolution -> pitfall detection.""" + # 1. Evolution fires on failure (100%) + cfg = EvolutionConfig(success_sample_rate=0.0, observe_only=True) + reflector = LowQualityReflector() + mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) + + task = _make_task() + result = _make_failure_result() + entry = await mixin.evolve_after_task(task, result) + assert entry.reflection is not None + assert entry.reflection.outcome == "failure" + + # 2. PitfallDetector detects high-failure-rate step + store = InMemoryExperienceStore(decay_rate=0.01, alpha=0.7) + for _ in range(6): + await store.record_experience( + _make_experience( + task_type="order_processing", + outcome="failure", + steps_summary=[ + {"step_name": "Call API", "outcome": "failure", "error": "timeout"}, + ], + ) + ) + for _ in range(4): + await store.record_experience( + _make_experience( + task_type="order_processing", + outcome="success", + success_rate=1.0, + steps_summary=[ + {"step_name": "Call API", "outcome": "success"}, + ], + ) + ) + + detector = PitfallDetector(experience_store=store, similarity_threshold=0.3) + from agentkit.core.plan_schema import PlanStep, PlanStepStatus + + steps = [ + PlanStep( + step_id="s1", + name="Call API", + description="Call external API", + status=PlanStepStatus.PENDING, + ) + ] + warnings = await detector.check_pitfalls(task_type="order_processing", planned_steps=steps) + + assert len(warnings) == 1 + assert warnings[0].warning_level == WarningLevel.HIGH + assert warnings[0].failure_rate >= 0.5 + + async def test_success_sampled_at_0_1_rate(self): + """Success path: with rate=0.1, ~10% of tasks trigger evolution.""" + cfg = EvolutionConfig(success_sample_rate=0.1, observe_only=True) + reflector = SuccessReflector() + + triggered = 0 + total = 100 + for _ in range(total): + mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) + entry = await mixin.evolve_after_task( + _make_task(), _make_result(status=TaskStatus.COMPLETED) + ) + if entry.reflection is not None: + triggered += 1 + + # With rate=0.1 over 100 trials, expect ~10 (allow wide tolerance) + # ponytail: statistical test; flaky at extreme bounds. Upgrade to + # deterministic mock if CI reliability becomes an issue. + assert 1 <= triggered <= 25