"""Tests for U6: auto evolution trigger + quality gate + actor marking. Covers R5 (success sample rate, quality thresholds, observe-only) and R6 (actor marking, cross-workspace sharing gate). Test scenarios: - Happy path (AE3): failure -> evolution fires (100%); success -> fires at 0.1 rate - Observe-only mode: recorded but not fed to optimizer - Backpressure cap reached: evolution task dropped + logged - Low-confidence pitfall: marked observe-only - Evolution task error: caught, does not fail the stream - PromptOptimizer sample count < 3: skip optimization - Actor marking present on all artifacts - Cross-workspace sharing rejected without opt-in - gave_up_after_reflections triggers failure-path evolution """ from __future__ import annotations import asyncio from datetime import datetime, timezone from unittest.mock import patch import pytest from agentkit.core.protocol import TaskMessage, TaskResult, TaskStatus from agentkit.evolution.config import EvolutionConfig from agentkit.evolution.experience_schema import TaskExperience from agentkit.evolution.experience_store import InMemoryExperienceStore from agentkit.evolution.lifecycle import EvolutionMixin from agentkit.evolution.pitfall_detector import ( PitfallDetector, WarningLevel, _compute_confidence, ) from agentkit.evolution.prompt_optimizer import Module, PromptOptimizer, Signature from agentkit.evolution.reflector import Reflection, Reflector # ── Helpers ────────────────────────────────────────────── def _make_task( task_id: str = "test-001", agent_name: str = "evolving_agent", ) -> TaskMessage: return TaskMessage( task_id=task_id, agent_name=agent_name, task_type="echo", priority=0, input_data={"query": "hello"}, callback_url=None, created_at=datetime.now(timezone.utc), ) def _make_result( status: str = TaskStatus.COMPLETED, output_data: dict | None = None, error_message: str | None = None, agent_name: str = "evolving_agent", task_id: str = "test-001", ) -> TaskResult: return TaskResult( task_id=task_id, agent_name=agent_name, status=status, output_data=output_data if output_data is not None else {"key": "value"}, error_message=error_message, started_at=datetime.now(timezone.utc), completed_at=datetime.now(timezone.utc), metrics={"elapsed_seconds": 5.0}, ) def _make_failure_result( agent_name: str = "evolving_agent", task_id: str = "test-001", ) -> TaskResult: return _make_result( status=TaskStatus.FAILED, output_data=None, error_message="task failed", agent_name=agent_name, task_id=task_id, ) def _make_module() -> Module: return Module( name="test_module", signature=Signature( input_fields={"query": "search query"}, output_fields={"result": "search result"}, instruction="Find the best result.", ), ) class LowQualityReflector(Reflector): """Always produces failure outcome with improvement suggestions.""" async def reflect(self, task: TaskMessage, result: TaskResult) -> Reflection: return Reflection( task_id=task.task_id, agent_name=result.agent_name, outcome="failure", quality_score=0.2, patterns=["slow_execution"], insights=["Low quality score indicates potential issues"], suggestions=["Consider prompt optimization for this task type"], ) class SuccessReflector(Reflector): """Always produces success outcome with suggestions (for testing success-path).""" async def reflect(self, task: TaskMessage, result: TaskResult) -> Reflection: return Reflection( task_id=task.task_id, agent_name=result.agent_name, outcome="success", quality_score=0.9, patterns=["fast_execution"], insights=["Good execution"], suggestions=["Consider caching results for similar queries"], ) class ErrorReflector(Reflector): """Always raises during reflection.""" async def reflect(self, task: TaskMessage, result: TaskResult) -> Reflection: raise RuntimeError("reflector crashed") def _make_experience( task_type: str = "code_review", outcome: str = "failure", steps_summary: str | list = "", success_rate: float = 0.0, ) -> TaskExperience: return TaskExperience( experience_id="", task_type=task_type, goal="test goal", steps_summary=steps_summary, outcome=outcome, duration_seconds=10.0, success_rate=success_rate, failure_reasons=[], optimization_tips=[], created_at=datetime.now(timezone.utc), ) # ── R5: Success sample rate gate ───────────────────────── class TestSuccessSampleRate: """R5: success-path evolution gated by success_sample_rate; failure always runs.""" async def test_failure_always_triggers_evolution(self): """Failure path always triggers evolution regardless of sample rate.""" cfg = EvolutionConfig(success_sample_rate=0.0, observe_only=False) reflector = LowQualityReflector() mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) mixin.set_current_module(_make_module()) task = _make_task() result = _make_failure_result() entry = await mixin.evolve_after_task(task, result) assert entry.sampled is True assert entry.reflection is not None assert entry.reflection.outcome == "failure" async def test_success_skipped_when_rate_zero(self): """Success path skipped when success_sample_rate=0.0.""" cfg = EvolutionConfig(success_sample_rate=0.0, observe_only=False) reflector = SuccessReflector() mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) task = _make_task() result = _make_result(status=TaskStatus.COMPLETED) entry = await mixin.evolve_after_task(task, result) assert entry.sampled is False assert entry.reflection is None # evolution skipped before reflection async def test_success_runs_when_rate_one(self): """Success path runs when success_sample_rate=1.0.""" cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=False) reflector = SuccessReflector() mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) task = _make_task() result = _make_result(status=TaskStatus.COMPLETED) entry = await mixin.evolve_after_task(task, result) assert entry.sampled is True assert entry.reflection is not None assert entry.reflection.outcome == "success" async def test_success_sampled_at_rate_boundary(self): """At rate=0.1, random < 0.1 runs; random >= 0.1 skips.""" cfg = EvolutionConfig(success_sample_rate=0.1, observe_only=False) reflector = SuccessReflector() # random < 0.1 -> evolution runs mixin_run = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) with patch("agentkit.evolution.lifecycle.random.random", return_value=0.05): entry = await mixin_run.evolve_after_task( _make_task(), _make_result(status=TaskStatus.COMPLETED) ) assert entry.sampled is True assert entry.reflection is not None # random >= 0.1 -> evolution skipped mixin_skip = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) with patch("agentkit.evolution.lifecycle.random.random", return_value=0.15): entry = await mixin_skip.evolve_after_task( _make_task(), _make_result(status=TaskStatus.COMPLETED) ) assert entry.sampled is False assert entry.reflection is None async def test_no_config_preserves_backward_compat(self): """Without auto_evolution_config, no sample gate applies (backward compat).""" reflector = SuccessReflector() mixin = EvolutionMixin(reflector=reflector) task = _make_task() result = _make_result(status=TaskStatus.COMPLETED) entry = await mixin.evolve_after_task(task, result) assert entry.sampled is True assert entry.reflection is not None # ── R5: Observe-only mode ──────────────────────────────── class TestObserveOnly: """R5: observe-only mode records but does not feed optimizer.""" async def test_observe_only_records_without_optimizing(self): """Observe-only: reflection recorded, optimizer not fed.""" cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=True, min_confidence=0.0) reflector = LowQualityReflector() optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=1) mixin = EvolutionMixin( reflector=reflector, prompt_optimizer=optimizer, auto_evolution_config=cfg, ) mixin.set_current_module(_make_module()) task = _make_task() result = _make_failure_result() entry = await mixin.evolve_after_task(task, result) assert entry.observe_only is True assert entry.reflection is not None assert entry.optimized_module is None # Optimizer should NOT have been fed success_count, _ = optimizer.example_count assert success_count == 0 async def test_observe_only_false_allows_optimization(self): """When observe_only=False, optimization can proceed (if gates pass).""" cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=False, min_confidence=0.0) reflector = LowQualityReflector() optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=1) # Pre-fill enough success examples to pass consumption gate for i in range(3): optimizer.add_example( input_data={"query": f"q_{i}"}, output_data={"result": f"r_{i}"}, quality_score=0.9, ) mixin = EvolutionMixin( reflector=reflector, prompt_optimizer=optimizer, auto_evolution_config=cfg, ) mixin.set_current_module(_make_module()) task = _make_task() result = _make_failure_result() entry = await mixin.evolve_after_task(task, result) assert entry.observe_only is False assert entry.optimized_module is not None # ── R5: PromptOptimizer consumption gate ───────────────── class TestConsumptionGate: """R5: optimizer consumption gate — sample count >= min_examples AND confidence.""" async def test_sample_count_below_threshold_skips_optimization(self): """PromptOptimizer sample count < min_examples -> skip optimization.""" cfg = EvolutionConfig( success_sample_rate=1.0, observe_only=False, min_examples=3, min_confidence=0.0, ) reflector = LowQualityReflector() optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=3) # Only 2 success examples — below threshold for i in range(2): optimizer.add_example( input_data={"query": f"q_{i}"}, output_data={"result": f"r_{i}"}, quality_score=0.9, ) mixin = EvolutionMixin( reflector=reflector, prompt_optimizer=optimizer, auto_evolution_config=cfg, ) mixin.set_current_module(_make_module()) task = _make_task() result = _make_failure_result() entry = await mixin.evolve_after_task(task, result) assert entry.optimized_module is None # gate not met def test_can_optimize_returns_false_below_threshold(self): """can_optimize() returns False when sample count < min_examples.""" optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=3) assert optimizer.can_optimize(min_confidence=0.5) is False def test_can_optimize_returns_true_above_threshold(self): """can_optimize() returns True when sample count and confidence met.""" optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=3) for i in range(3): optimizer.add_example( input_data={"query": f"q_{i}"}, output_data={"result": f"r_{i}"}, quality_score=0.9, ) assert optimizer.can_optimize(min_confidence=0.5) is True def test_can_optimize_returns_false_low_confidence(self): """can_optimize() returns False when mean quality < min_confidence.""" optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=3) for i in range(3): optimizer.add_example( input_data={"query": f"q_{i}"}, output_data={"result": f"r_{i}"}, quality_score=0.3, # below 0.5 threshold ) # These go to failure_examples (quality < 0.7), so success_examples is empty assert optimizer.can_optimize(min_confidence=0.5) is False # ── R5: Pitfall confidence threshold ───────────────────── class TestPitfallConfidence: """R5: low-confidence pitfalls marked observe-only.""" def test_compute_confidence_high_sample_high_rate(self): """3+ occurrences with high failure_rate -> high confidence.""" conf = _compute_confidence(failure_rate=0.6, total_occurrences=5) assert conf == pytest.approx(0.6) def test_compute_confidence_low_sample(self): """1 occurrence -> confidence scaled down by 1/3.""" conf = _compute_confidence(failure_rate=0.6, total_occurrences=1) assert conf == pytest.approx(0.6 * (1.0 / 3.0)) def test_compute_confidence_zero_samples(self): """0 occurrences -> zero confidence.""" assert _compute_confidence(failure_rate=0.5, total_occurrences=0) == 0.0 async def test_low_confidence_pitfall_marked_observe_only(self): """Pitfall with confidence < min_confidence is marked observe-only.""" store = InMemoryExperienceStore(decay_rate=0.01, alpha=0.7) # Only 1 failure experience -> low sample -> low confidence await store.record_experience( _make_experience( task_type="testing", outcome="failure", steps_summary=[ {"step_name": "Run Tests", "outcome": "failure", "error": "Flaky"}, ], ) ) detector = PitfallDetector( experience_store=store, similarity_threshold=0.3, min_confidence=0.5, ) from agentkit.core.plan_schema import PlanStep, PlanStepStatus steps = [ PlanStep( step_id="s1", name="Run Tests", description="Run tests", status=PlanStepStatus.PENDING, ) ] warnings = await detector.check_pitfalls( task_type="testing", planned_steps=steps, actor="test_agent" ) assert len(warnings) == 1 assert warnings[0].observe_only is True assert warnings[0].confidence < 0.5 assert warnings[0].actor == "test_agent" async def test_high_confidence_pitfall_not_observe_only(self): """Pitfall with confidence >= min_confidence is not observe-only.""" store = InMemoryExperienceStore(decay_rate=0.01, alpha=0.7) # 3+ failure experiences -> full sample factor -> high confidence for _ in range(4): await store.record_experience( _make_experience( task_type="deployment", outcome="failure", steps_summary=[ {"step_name": "Deploy", "outcome": "failure", "error": "OOM"}, ], ) ) detector = PitfallDetector( experience_store=store, similarity_threshold=0.3, min_confidence=0.5, ) from agentkit.core.plan_schema import PlanStep, PlanStepStatus steps = [ PlanStep( step_id="s1", name="Deploy", description="Deploy app", status=PlanStepStatus.PENDING ) ] warnings = await detector.check_pitfalls(task_type="deployment", planned_steps=steps) assert len(warnings) == 1 assert warnings[0].observe_only is False assert warnings[0].confidence >= 0.5 # ── R6: Actor marking ──────────────────────────────────── class TestActorMarking: """R6: actor marking on all evolution artifacts.""" async def test_log_entry_carries_actor(self): """EvolutionLogEntry carries the actor identity.""" cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=False) reflector = LowQualityReflector() mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) task = _make_task(agent_name="backend_engineer") result = _make_failure_result(agent_name="backend_engineer") entry = await mixin.evolve_after_task(task, result, actor="backend_engineer") assert entry.actor == "backend_engineer" async def test_actor_defaults_to_result_agent_name(self): """Actor defaults to result.agent_name when not explicitly provided.""" cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=True) reflector = LowQualityReflector() mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) task = _make_task(agent_name="qa_engineer") result = _make_failure_result(agent_name="qa_engineer") entry = await mixin.evolve_after_task(task, result) assert entry.actor == "qa_engineer" async def test_actor_marked_on_optimized_module(self): """Optimized Module carries the actor identity.""" cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=False, min_confidence=0.0) reflector = LowQualityReflector() optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=1) for i in range(3): optimizer.add_example( input_data={"query": f"q_{i}"}, output_data={"result": f"r_{i}"}, quality_score=0.9, ) mixin = EvolutionMixin( reflector=reflector, prompt_optimizer=optimizer, auto_evolution_config=cfg, ) mixin.set_current_module(_make_module()) task = _make_task(agent_name="tech_lead") result = _make_failure_result(agent_name="tech_lead") entry = await mixin.evolve_after_task(task, result, actor="tech_lead") assert entry.optimized_module is not None assert entry.optimized_module.actor == "tech_lead" async def test_actor_in_history(self): """get_evolution_history includes actor field.""" cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=True) reflector = LowQualityReflector() mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) await mixin.evolve_after_task( _make_task(), _make_failure_result(), actor="frontend_engineer" ) history = mixin.get_evolution_history() assert len(history) == 1 assert history[0]["actor"] == "frontend_engineer" async def test_pitfall_warning_carries_actor(self): """PitfallWarning carries the actor identity.""" store = InMemoryExperienceStore(decay_rate=0.01, alpha=0.7) await store.record_experience( _make_experience( task_type="testing", outcome="failure", steps_summary=[ {"step_name": "Run Tests", "outcome": "failure", "error": "Error"}, ], ) ) detector = PitfallDetector(experience_store=store, similarity_threshold=0.3) from agentkit.core.plan_schema import PlanStep, PlanStepStatus steps = [ PlanStep( step_id="s1", name="Run Tests", description="Run tests", status=PlanStepStatus.PENDING, ) ] warnings = await detector.check_pitfalls( task_type="testing", planned_steps=steps, actor="code_reviewer" ) assert len(warnings) == 1 assert warnings[0].actor == "code_reviewer" # ── R6: Cross-workspace sharing ────────────────────────── class TestCrossWorkspaceSharing: """R6: cross-workspace sharing defaults off; same-workspace always on.""" def test_same_workspace_sharing_always_allowed(self): """Same-actor sharing is always allowed.""" mixin = EvolutionMixin(reflector=Reflector()) assert mixin.can_share_artifact("agent_a", "agent_a") is True def test_cross_workspace_sharing_default_off(self): """Cross-workspace sharing rejected without opt-in (default).""" cfg = EvolutionConfig(cross_workspace_sharing=False) mixin = EvolutionMixin(reflector=Reflector(), auto_evolution_config=cfg) assert mixin.can_share_artifact("agent_a", "agent_b") is False def test_cross_workspace_sharing_with_opt_in(self): """Cross-workspace sharing allowed when explicitly opted in.""" cfg = EvolutionConfig(cross_workspace_sharing=True) mixin = EvolutionMixin(reflector=Reflector(), auto_evolution_config=cfg) assert mixin.can_share_artifact("agent_a", "agent_b") is True def test_no_config_cross_workspace_rejected(self): """Without config, cross-workspace sharing is rejected (safe default).""" mixin = EvolutionMixin(reflector=Reflector()) assert mixin.can_share_artifact("agent_a", "agent_b") is False # ── KTD-8: gave_up_after_reflections ───────────────────── class TestGaveUpAfterReflections: """KTD-8: gave_up_after_reflections triggers failure-path evolution.""" async def test_gave_up_treated_as_failure(self): """gave_up_after_reflections in output_data triggers failure path.""" cfg = EvolutionConfig(success_sample_rate=0.0, observe_only=True) reflector = LowQualityReflector() mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) task = _make_task() # status=COMPLETED but trace_outcome=gave_up_after_reflections result = _make_result( status=TaskStatus.COMPLETED, output_data={"trace_outcome": "gave_up_after_reflections"}, ) entry = await mixin.evolve_after_task(task, result) # Even though success_sample_rate=0.0, failure path always runs assert entry.sampled is True assert entry.reflection is not None async def test_gave_up_in_error_message_treated_as_failure(self): """gave_up_after_reflections in error_message triggers failure path.""" cfg = EvolutionConfig(success_sample_rate=0.0, observe_only=True) reflector = LowQualityReflector() mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) task = _make_task() result = _make_result( status=TaskStatus.COMPLETED, output_data={"content": "some output"}, error_message="gave_up_after_reflections: exhausted reinjections", ) entry = await mixin.evolve_after_task(task, result) assert entry.sampled is True assert entry.reflection is not None def test_is_failure_path_normal_success(self): """Normal success (COMPLETED, no gave_up signal) is not failure path.""" mixin = EvolutionMixin(reflector=Reflector()) result = _make_result(status=TaskStatus.COMPLETED, output_data={"key": "val"}) assert mixin._is_failure_path(result) is False def test_is_failure_path_failed_status(self): """FAILED status is failure path.""" mixin = EvolutionMixin(reflector=Reflector()) result = _make_result(status=TaskStatus.FAILED, output_data=None) assert mixin._is_failure_path(result) is True def test_is_failure_path_cancelled_status(self): """CANCELLED status is failure path.""" mixin = EvolutionMixin(reflector=Reflector()) result = _make_result(status=TaskStatus.CANCELLED, output_data=None) assert mixin._is_failure_path(result) is True # ── Error handling: evolution does not fail the stream ─── class TestEvolutionErrorHandling: """Evolution task error is caught and does not propagate to the caller. The _evolve_safe wrapper in config_driven.py catches all exceptions from evolve_after_task. These tests verify that pattern. """ async def test_evolve_safe_swallows_reflector_error(self): """_evolve_safe pattern: reflector error is caught, not propagated.""" class SafeWrapper(EvolutionMixin): """Simulates the _evolve_safe pattern from ConfigDrivenAgent.""" async def _evolve_safe(self, task: TaskMessage, result: TaskResult) -> None: try: await self.evolve_after_task(task, result) except Exception: pass # swallowed, matching config_driven.py:_evolve_safe mixin = SafeWrapper(reflector=ErrorReflector()) # Should not raise await mixin._evolve_safe(_make_task(), _make_failure_result()) async def test_apply_change_error_does_not_crash_evolution(self): """_apply_change errors are caught internally (existing behavior).""" cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=False, min_confidence=0.0) reflector = LowQualityReflector() optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=1) for i in range(3): optimizer.add_example( input_data={"query": f"q_{i}"}, output_data={"result": f"r_{i}"}, quality_score=0.9, ) mixin = EvolutionMixin( reflector=reflector, prompt_optimizer=optimizer, auto_evolution_config=cfg, ) mixin.set_current_module(_make_module()) # Should complete without raising even if internal steps have issues entry = await mixin.evolve_after_task(_make_task(), _make_failure_result()) assert entry is not None # ── Integration: fire-and-forget via asyncio.create_task ─ class TestFireAndForgetIntegration: """Evolution fires via U2's execute_stream hooks (fire-and-forget pattern). Validates that evolve_after_task works correctly when scheduled as a fire-and-forget asyncio task, matching _trigger_evolution_hooks behavior. """ async def test_evolve_after_task_completes_as_asyncio_task(self): """evolve_after_task completes when scheduled via asyncio.create_task.""" cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=True) reflector = LowQualityReflector() mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) task = _make_task() result = _make_failure_result() # Schedule as fire-and-forget task (mirrors _schedule_evolution) async def _evolve(): await mixin.evolve_after_task(task, result) t = asyncio.create_task(_evolve()) await t # wait for completion history = mixin.get_evolution_history() assert len(history) == 1 assert history[0]["reflection"] is not None async def test_concurrent_evolution_tasks_isolated(self): """Multiple concurrent evolution tasks don't interfere.""" cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=True) reflector = LowQualityReflector() mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) async def _run_one(task_id: str): await mixin.evolve_after_task( _make_task(task_id=task_id), _make_failure_result(task_id=task_id), ) await asyncio.gather( _run_one("task-a"), _run_one("task-b"), _run_one("task-c"), ) history = mixin.get_evolution_history() assert len(history) == 3 task_ids = {h["task_id"] for h in history} assert task_ids == {"task-a", "task-b", "task-c"} # ── Backpressure cap (U2 _schedule_evolution) ──────────── class TestBackpressureCap: """Backpressure cap reached -> evolution task dropped + logged. Tests U2's _schedule_evolution backpressure, which U6's auto-trigger relies on. """ async def test_evolution_task_dropped_when_cap_reached(self): """When pending tasks reach cap, new evolution tasks are dropped.""" import agentkit.core.config_driven as cd # Save original state to restore after test try: # Create blocking coroutines that won't complete during the test block_event = asyncio.Event() async def _blocking_evolve() -> None: await block_event.wait() cap = 4 # Fill up to cap for _ in range(cap): cd._schedule_evolution(_blocking_evolve(), cap=cap) assert len(cd._pending_evolution_tasks) == cap # Track dropped count before (access via module — int is immutable) dropped_before = cd._evolution_dropped_count # Try to schedule one more -> should be dropped cd._schedule_evolution(_blocking_evolve(), cap=cap) assert len(cd._pending_evolution_tasks) == cap # still at cap assert cd._evolution_dropped_count == dropped_before + 1 # Release the blocking tasks so they can complete and be cleaned up block_event.set() # Let the event loop process task completions await asyncio.sleep(0.05) finally: # Restore: clean up any remaining tasks block_event = asyncio.Event() block_event.set() # Wait for any stragglers if cd._pending_evolution_tasks: await asyncio.gather(*cd._pending_evolution_tasks, return_exceptions=True) cd._pending_evolution_tasks.clear() # ── AE3: Happy path — pitfall detection ────────────────── class TestAE3HappyPath: """AE3: task fails -> evolution fires (100%) -> Reflector records -> PitfallDetector detects; task succeeds -> evolution fires at 0.1 rate. """ async def test_failure_triggers_evolution_and_pitfall_detection(self): """Full happy path: failure -> evolution -> pitfall detection.""" # 1. Evolution fires on failure (100%) cfg = EvolutionConfig(success_sample_rate=0.0, observe_only=True) reflector = LowQualityReflector() mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) task = _make_task() result = _make_failure_result() entry = await mixin.evolve_after_task(task, result) assert entry.reflection is not None assert entry.reflection.outcome == "failure" # 2. PitfallDetector detects high-failure-rate step store = InMemoryExperienceStore(decay_rate=0.01, alpha=0.7) for _ in range(6): await store.record_experience( _make_experience( task_type="order_processing", outcome="failure", steps_summary=[ {"step_name": "Call API", "outcome": "failure", "error": "timeout"}, ], ) ) for _ in range(4): await store.record_experience( _make_experience( task_type="order_processing", outcome="success", success_rate=1.0, steps_summary=[ {"step_name": "Call API", "outcome": "success"}, ], ) ) detector = PitfallDetector(experience_store=store, similarity_threshold=0.3) from agentkit.core.plan_schema import PlanStep, PlanStepStatus steps = [ PlanStep( step_id="s1", name="Call API", description="Call external API", status=PlanStepStatus.PENDING, ) ] warnings = await detector.check_pitfalls(task_type="order_processing", planned_steps=steps) assert len(warnings) == 1 assert warnings[0].warning_level == WarningLevel.HIGH assert warnings[0].failure_rate >= 0.5 async def test_success_sampled_at_0_1_rate(self): """Success path: with rate=0.1, ~10% of tasks trigger evolution.""" cfg = EvolutionConfig(success_sample_rate=0.1, observe_only=True) reflector = SuccessReflector() triggered = 0 total = 100 for _ in range(total): mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg) entry = await mixin.evolve_after_task( _make_task(), _make_result(status=TaskStatus.COMPLETED) ) if entry.reflection is not None: triggered += 1 # With rate=0.1 over 100 trials, expect ~10 (allow wide tolerance) # ponytail: statistical test; flaky at extreme bounds. Upgrade to # deterministic mock if CI reliability becomes an issue. assert 1 <= triggered <= 25