fischer-agentkit/tests/unit/test_evolution_auto_trigger.py

880 lines
34 KiB
Python

"""Tests for U6: auto evolution trigger + quality gate + actor marking.
Covers R5 (success sample rate, quality thresholds, observe-only) and
R6 (actor marking, cross-workspace sharing gate).
Test scenarios:
- Happy path (AE3): failure -> evolution fires (100%); success -> fires at 0.1 rate
- Observe-only mode: recorded but not fed to optimizer
- Backpressure cap reached: evolution task dropped + logged
- Low-confidence pitfall: marked observe-only
- Evolution task error: caught, does not fail the stream
- PromptOptimizer sample count < 3: skip optimization
- Actor marking present on all artifacts
- Cross-workspace sharing rejected without opt-in
- gave_up_after_reflections triggers failure-path evolution
"""
from __future__ import annotations
import asyncio
from datetime import datetime, timezone
from unittest.mock import patch
import pytest
from agentkit.core.protocol import TaskMessage, TaskResult, TaskStatus
from agentkit.evolution.config import EvolutionConfig
from agentkit.evolution.experience_schema import TaskExperience
from agentkit.evolution.experience_store import InMemoryExperienceStore
from agentkit.evolution.lifecycle import EvolutionMixin
from agentkit.evolution.pitfall_detector import (
PitfallDetector,
WarningLevel,
_compute_confidence,
)
from agentkit.evolution.prompt_optimizer import Module, PromptOptimizer, Signature
from agentkit.evolution.reflector import Reflection, Reflector
# ── Helpers ──────────────────────────────────────────────
def _make_task(
task_id: str = "test-001",
agent_name: str = "evolving_agent",
) -> TaskMessage:
return TaskMessage(
task_id=task_id,
agent_name=agent_name,
task_type="echo",
priority=0,
input_data={"query": "hello"},
callback_url=None,
created_at=datetime.now(timezone.utc),
)
def _make_result(
status: str = TaskStatus.COMPLETED,
output_data: dict | None = None,
error_message: str | None = None,
agent_name: str = "evolving_agent",
task_id: str = "test-001",
) -> TaskResult:
return TaskResult(
task_id=task_id,
agent_name=agent_name,
status=status,
output_data=output_data if output_data is not None else {"key": "value"},
error_message=error_message,
started_at=datetime.now(timezone.utc),
completed_at=datetime.now(timezone.utc),
metrics={"elapsed_seconds": 5.0},
)
def _make_failure_result(
agent_name: str = "evolving_agent",
task_id: str = "test-001",
) -> TaskResult:
return _make_result(
status=TaskStatus.FAILED,
output_data=None,
error_message="task failed",
agent_name=agent_name,
task_id=task_id,
)
def _make_module() -> Module:
return Module(
name="test_module",
signature=Signature(
input_fields={"query": "search query"},
output_fields={"result": "search result"},
instruction="Find the best result.",
),
)
class LowQualityReflector(Reflector):
"""Always produces failure outcome with improvement suggestions."""
async def reflect(self, task: TaskMessage, result: TaskResult) -> Reflection:
return Reflection(
task_id=task.task_id,
agent_name=result.agent_name,
outcome="failure",
quality_score=0.2,
patterns=["slow_execution"],
insights=["Low quality score indicates potential issues"],
suggestions=["Consider prompt optimization for this task type"],
)
class SuccessReflector(Reflector):
"""Always produces success outcome with suggestions (for testing success-path)."""
async def reflect(self, task: TaskMessage, result: TaskResult) -> Reflection:
return Reflection(
task_id=task.task_id,
agent_name=result.agent_name,
outcome="success",
quality_score=0.9,
patterns=["fast_execution"],
insights=["Good execution"],
suggestions=["Consider caching results for similar queries"],
)
class ErrorReflector(Reflector):
"""Always raises during reflection."""
async def reflect(self, task: TaskMessage, result: TaskResult) -> Reflection:
raise RuntimeError("reflector crashed")
def _make_experience(
task_type: str = "code_review",
outcome: str = "failure",
steps_summary: str | list = "",
success_rate: float = 0.0,
) -> TaskExperience:
return TaskExperience(
experience_id="",
task_type=task_type,
goal="test goal",
steps_summary=steps_summary,
outcome=outcome,
duration_seconds=10.0,
success_rate=success_rate,
failure_reasons=[],
optimization_tips=[],
created_at=datetime.now(timezone.utc),
)
# ── R5: Success sample rate gate ─────────────────────────
class TestSuccessSampleRate:
"""R5: success-path evolution gated by success_sample_rate; failure always runs."""
async def test_failure_always_triggers_evolution(self):
"""Failure path always triggers evolution regardless of sample rate."""
cfg = EvolutionConfig(success_sample_rate=0.0, observe_only=False)
reflector = LowQualityReflector()
mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg)
mixin.set_current_module(_make_module())
task = _make_task()
result = _make_failure_result()
entry = await mixin.evolve_after_task(task, result)
assert entry.sampled is True
assert entry.reflection is not None
assert entry.reflection.outcome == "failure"
async def test_success_skipped_when_rate_zero(self):
"""Success path skipped when success_sample_rate=0.0."""
cfg = EvolutionConfig(success_sample_rate=0.0, observe_only=False)
reflector = SuccessReflector()
mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg)
task = _make_task()
result = _make_result(status=TaskStatus.COMPLETED)
entry = await mixin.evolve_after_task(task, result)
assert entry.sampled is False
assert entry.reflection is None # evolution skipped before reflection
async def test_success_runs_when_rate_one(self):
"""Success path runs when success_sample_rate=1.0."""
cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=False)
reflector = SuccessReflector()
mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg)
task = _make_task()
result = _make_result(status=TaskStatus.COMPLETED)
entry = await mixin.evolve_after_task(task, result)
assert entry.sampled is True
assert entry.reflection is not None
assert entry.reflection.outcome == "success"
async def test_success_sampled_at_rate_boundary(self):
"""At rate=0.1, random < 0.1 runs; random >= 0.1 skips."""
cfg = EvolutionConfig(success_sample_rate=0.1, observe_only=False)
reflector = SuccessReflector()
# random < 0.1 -> evolution runs
mixin_run = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg)
with patch("agentkit.evolution.lifecycle.random.random", return_value=0.05):
entry = await mixin_run.evolve_after_task(
_make_task(), _make_result(status=TaskStatus.COMPLETED)
)
assert entry.sampled is True
assert entry.reflection is not None
# random >= 0.1 -> evolution skipped
mixin_skip = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg)
with patch("agentkit.evolution.lifecycle.random.random", return_value=0.15):
entry = await mixin_skip.evolve_after_task(
_make_task(), _make_result(status=TaskStatus.COMPLETED)
)
assert entry.sampled is False
assert entry.reflection is None
async def test_no_config_preserves_backward_compat(self):
"""Without auto_evolution_config, no sample gate applies (backward compat)."""
reflector = SuccessReflector()
mixin = EvolutionMixin(reflector=reflector)
task = _make_task()
result = _make_result(status=TaskStatus.COMPLETED)
entry = await mixin.evolve_after_task(task, result)
assert entry.sampled is True
assert entry.reflection is not None
# ── R5: Observe-only mode ────────────────────────────────
class TestObserveOnly:
"""R5: observe-only mode records but does not feed optimizer."""
async def test_observe_only_records_without_optimizing(self):
"""Observe-only: reflection recorded, optimizer not fed."""
cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=True, min_confidence=0.0)
reflector = LowQualityReflector()
optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=1)
mixin = EvolutionMixin(
reflector=reflector,
prompt_optimizer=optimizer,
auto_evolution_config=cfg,
)
mixin.set_current_module(_make_module())
task = _make_task()
result = _make_failure_result()
entry = await mixin.evolve_after_task(task, result)
assert entry.observe_only is True
assert entry.reflection is not None
assert entry.optimized_module is None
# Optimizer should NOT have been fed
success_count, _ = optimizer.example_count
assert success_count == 0
async def test_observe_only_false_allows_optimization(self):
"""When observe_only=False, optimization can proceed (if gates pass)."""
cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=False, min_confidence=0.0)
reflector = LowQualityReflector()
optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=1)
# Pre-fill enough success examples to pass consumption gate
for i in range(3):
optimizer.add_example(
input_data={"query": f"q_{i}"},
output_data={"result": f"r_{i}"},
quality_score=0.9,
)
mixin = EvolutionMixin(
reflector=reflector,
prompt_optimizer=optimizer,
auto_evolution_config=cfg,
)
mixin.set_current_module(_make_module())
task = _make_task()
result = _make_failure_result()
entry = await mixin.evolve_after_task(task, result)
assert entry.observe_only is False
assert entry.optimized_module is not None
# ── R5: PromptOptimizer consumption gate ─────────────────
class TestConsumptionGate:
"""R5: optimizer consumption gate — sample count >= min_examples AND confidence."""
async def test_sample_count_below_threshold_skips_optimization(self):
"""PromptOptimizer sample count < min_examples -> skip optimization."""
cfg = EvolutionConfig(
success_sample_rate=1.0,
observe_only=False,
min_examples=3,
min_confidence=0.0,
)
reflector = LowQualityReflector()
optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=3)
# Only 2 success examples — below threshold
for i in range(2):
optimizer.add_example(
input_data={"query": f"q_{i}"},
output_data={"result": f"r_{i}"},
quality_score=0.9,
)
mixin = EvolutionMixin(
reflector=reflector,
prompt_optimizer=optimizer,
auto_evolution_config=cfg,
)
mixin.set_current_module(_make_module())
task = _make_task()
result = _make_failure_result()
entry = await mixin.evolve_after_task(task, result)
assert entry.optimized_module is None # gate not met
def test_can_optimize_returns_false_below_threshold(self):
"""can_optimize() returns False when sample count < min_examples."""
optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=3)
assert optimizer.can_optimize(min_confidence=0.5) is False
def test_can_optimize_returns_true_above_threshold(self):
"""can_optimize() returns True when sample count and confidence met."""
optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=3)
for i in range(3):
optimizer.add_example(
input_data={"query": f"q_{i}"},
output_data={"result": f"r_{i}"},
quality_score=0.9,
)
assert optimizer.can_optimize(min_confidence=0.5) is True
def test_can_optimize_returns_false_low_confidence(self):
"""can_optimize() returns False when mean quality < min_confidence."""
optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=3)
for i in range(3):
optimizer.add_example(
input_data={"query": f"q_{i}"},
output_data={"result": f"r_{i}"},
quality_score=0.3, # below 0.5 threshold
)
# These go to failure_examples (quality < 0.7), so success_examples is empty
assert optimizer.can_optimize(min_confidence=0.5) is False
# ── R5: Pitfall confidence threshold ─────────────────────
class TestPitfallConfidence:
"""R5: low-confidence pitfalls marked observe-only."""
def test_compute_confidence_high_sample_high_rate(self):
"""3+ occurrences with high failure_rate -> high confidence."""
conf = _compute_confidence(failure_rate=0.6, total_occurrences=5)
assert conf == pytest.approx(0.6)
def test_compute_confidence_low_sample(self):
"""1 occurrence -> confidence scaled down by 1/3."""
conf = _compute_confidence(failure_rate=0.6, total_occurrences=1)
assert conf == pytest.approx(0.6 * (1.0 / 3.0))
def test_compute_confidence_zero_samples(self):
"""0 occurrences -> zero confidence."""
assert _compute_confidence(failure_rate=0.5, total_occurrences=0) == 0.0
async def test_low_confidence_pitfall_marked_observe_only(self):
"""Pitfall with confidence < min_confidence is marked observe-only."""
store = InMemoryExperienceStore(decay_rate=0.01, alpha=0.7)
# Only 1 failure experience -> low sample -> low confidence
await store.record_experience(
_make_experience(
task_type="testing",
outcome="failure",
steps_summary=[
{"step_name": "Run Tests", "outcome": "failure", "error": "Flaky"},
],
)
)
detector = PitfallDetector(
experience_store=store,
similarity_threshold=0.3,
min_confidence=0.5,
)
from agentkit.core.plan_schema import PlanStep, PlanStepStatus
steps = [
PlanStep(
step_id="s1",
name="Run Tests",
description="Run tests",
status=PlanStepStatus.PENDING,
)
]
warnings = await detector.check_pitfalls(
task_type="testing", planned_steps=steps, actor="test_agent"
)
assert len(warnings) == 1
assert warnings[0].observe_only is True
assert warnings[0].confidence < 0.5
assert warnings[0].actor == "test_agent"
async def test_high_confidence_pitfall_not_observe_only(self):
"""Pitfall with confidence >= min_confidence is not observe-only."""
store = InMemoryExperienceStore(decay_rate=0.01, alpha=0.7)
# 3+ failure experiences -> full sample factor -> high confidence
for _ in range(4):
await store.record_experience(
_make_experience(
task_type="deployment",
outcome="failure",
steps_summary=[
{"step_name": "Deploy", "outcome": "failure", "error": "OOM"},
],
)
)
detector = PitfallDetector(
experience_store=store,
similarity_threshold=0.3,
min_confidence=0.5,
)
from agentkit.core.plan_schema import PlanStep, PlanStepStatus
steps = [
PlanStep(
step_id="s1", name="Deploy", description="Deploy app", status=PlanStepStatus.PENDING
)
]
warnings = await detector.check_pitfalls(task_type="deployment", planned_steps=steps)
assert len(warnings) == 1
assert warnings[0].observe_only is False
assert warnings[0].confidence >= 0.5
# ── R6: Actor marking ────────────────────────────────────
class TestActorMarking:
"""R6: actor marking on all evolution artifacts."""
async def test_log_entry_carries_actor(self):
"""EvolutionLogEntry carries the actor identity."""
cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=False)
reflector = LowQualityReflector()
mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg)
task = _make_task(agent_name="backend_engineer")
result = _make_failure_result(agent_name="backend_engineer")
entry = await mixin.evolve_after_task(task, result, actor="backend_engineer")
assert entry.actor == "backend_engineer"
async def test_actor_defaults_to_result_agent_name(self):
"""Actor defaults to result.agent_name when not explicitly provided."""
cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=True)
reflector = LowQualityReflector()
mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg)
task = _make_task(agent_name="qa_engineer")
result = _make_failure_result(agent_name="qa_engineer")
entry = await mixin.evolve_after_task(task, result)
assert entry.actor == "qa_engineer"
async def test_actor_marked_on_optimized_module(self):
"""Optimized Module carries the actor identity."""
cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=False, min_confidence=0.0)
reflector = LowQualityReflector()
optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=1)
for i in range(3):
optimizer.add_example(
input_data={"query": f"q_{i}"},
output_data={"result": f"r_{i}"},
quality_score=0.9,
)
mixin = EvolutionMixin(
reflector=reflector,
prompt_optimizer=optimizer,
auto_evolution_config=cfg,
)
mixin.set_current_module(_make_module())
task = _make_task(agent_name="tech_lead")
result = _make_failure_result(agent_name="tech_lead")
entry = await mixin.evolve_after_task(task, result, actor="tech_lead")
assert entry.optimized_module is not None
assert entry.optimized_module.actor == "tech_lead"
async def test_actor_in_history(self):
"""get_evolution_history includes actor field."""
cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=True)
reflector = LowQualityReflector()
mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg)
await mixin.evolve_after_task(
_make_task(), _make_failure_result(), actor="frontend_engineer"
)
history = mixin.get_evolution_history()
assert len(history) == 1
assert history[0]["actor"] == "frontend_engineer"
async def test_pitfall_warning_carries_actor(self):
"""PitfallWarning carries the actor identity."""
store = InMemoryExperienceStore(decay_rate=0.01, alpha=0.7)
await store.record_experience(
_make_experience(
task_type="testing",
outcome="failure",
steps_summary=[
{"step_name": "Run Tests", "outcome": "failure", "error": "Error"},
],
)
)
detector = PitfallDetector(experience_store=store, similarity_threshold=0.3)
from agentkit.core.plan_schema import PlanStep, PlanStepStatus
steps = [
PlanStep(
step_id="s1",
name="Run Tests",
description="Run tests",
status=PlanStepStatus.PENDING,
)
]
warnings = await detector.check_pitfalls(
task_type="testing", planned_steps=steps, actor="code_reviewer"
)
assert len(warnings) == 1
assert warnings[0].actor == "code_reviewer"
# ── R6: Cross-workspace sharing ──────────────────────────
class TestCrossWorkspaceSharing:
"""R6: cross-workspace sharing defaults off; same-workspace always on."""
def test_same_workspace_sharing_always_allowed(self):
"""Same-actor sharing is always allowed."""
mixin = EvolutionMixin(reflector=Reflector())
assert mixin.can_share_artifact("agent_a", "agent_a") is True
def test_cross_workspace_sharing_default_off(self):
"""Cross-workspace sharing rejected without opt-in (default)."""
cfg = EvolutionConfig(cross_workspace_sharing=False)
mixin = EvolutionMixin(reflector=Reflector(), auto_evolution_config=cfg)
assert mixin.can_share_artifact("agent_a", "agent_b") is False
def test_cross_workspace_sharing_with_opt_in(self):
"""Cross-workspace sharing allowed when explicitly opted in."""
cfg = EvolutionConfig(cross_workspace_sharing=True)
mixin = EvolutionMixin(reflector=Reflector(), auto_evolution_config=cfg)
assert mixin.can_share_artifact("agent_a", "agent_b") is True
def test_no_config_cross_workspace_rejected(self):
"""Without config, cross-workspace sharing is rejected (safe default)."""
mixin = EvolutionMixin(reflector=Reflector())
assert mixin.can_share_artifact("agent_a", "agent_b") is False
# ── KTD-8: gave_up_after_reflections ─────────────────────
class TestGaveUpAfterReflections:
"""KTD-8: gave_up_after_reflections triggers failure-path evolution."""
async def test_gave_up_treated_as_failure(self):
"""gave_up_after_reflections in output_data triggers failure path."""
cfg = EvolutionConfig(success_sample_rate=0.0, observe_only=True)
reflector = LowQualityReflector()
mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg)
task = _make_task()
# status=COMPLETED but trace_outcome=gave_up_after_reflections
result = _make_result(
status=TaskStatus.COMPLETED,
output_data={"trace_outcome": "gave_up_after_reflections"},
)
entry = await mixin.evolve_after_task(task, result)
# Even though success_sample_rate=0.0, failure path always runs
assert entry.sampled is True
assert entry.reflection is not None
async def test_gave_up_in_error_message_treated_as_failure(self):
"""gave_up_after_reflections in error_message triggers failure path."""
cfg = EvolutionConfig(success_sample_rate=0.0, observe_only=True)
reflector = LowQualityReflector()
mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg)
task = _make_task()
result = _make_result(
status=TaskStatus.COMPLETED,
output_data={"content": "some output"},
error_message="gave_up_after_reflections: exhausted reinjections",
)
entry = await mixin.evolve_after_task(task, result)
assert entry.sampled is True
assert entry.reflection is not None
def test_is_failure_path_normal_success(self):
"""Normal success (COMPLETED, no gave_up signal) is not failure path."""
mixin = EvolutionMixin(reflector=Reflector())
result = _make_result(status=TaskStatus.COMPLETED, output_data={"key": "val"})
assert mixin._is_failure_path(result) is False
def test_is_failure_path_failed_status(self):
"""FAILED status is failure path."""
mixin = EvolutionMixin(reflector=Reflector())
result = _make_result(status=TaskStatus.FAILED, output_data=None)
assert mixin._is_failure_path(result) is True
def test_is_failure_path_cancelled_status(self):
"""CANCELLED status is failure path."""
mixin = EvolutionMixin(reflector=Reflector())
result = _make_result(status=TaskStatus.CANCELLED, output_data=None)
assert mixin._is_failure_path(result) is True
# ── Error handling: evolution does not fail the stream ───
class TestEvolutionErrorHandling:
"""Evolution task error is caught and does not propagate to the caller.
The _evolve_safe wrapper in config_driven.py catches all exceptions from
evolve_after_task. These tests verify that pattern.
"""
async def test_evolve_safe_swallows_reflector_error(self):
"""_evolve_safe pattern: reflector error is caught, not propagated."""
class SafeWrapper(EvolutionMixin):
"""Simulates the _evolve_safe pattern from ConfigDrivenAgent."""
async def _evolve_safe(self, task: TaskMessage, result: TaskResult) -> None:
try:
await self.evolve_after_task(task, result)
except Exception:
pass # swallowed, matching config_driven.py:_evolve_safe
mixin = SafeWrapper(reflector=ErrorReflector())
# Should not raise
await mixin._evolve_safe(_make_task(), _make_failure_result())
async def test_apply_change_error_does_not_crash_evolution(self):
"""_apply_change errors are caught internally (existing behavior)."""
cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=False, min_confidence=0.0)
reflector = LowQualityReflector()
optimizer = PromptOptimizer(max_demos=3, min_examples_for_optimization=1)
for i in range(3):
optimizer.add_example(
input_data={"query": f"q_{i}"},
output_data={"result": f"r_{i}"},
quality_score=0.9,
)
mixin = EvolutionMixin(
reflector=reflector,
prompt_optimizer=optimizer,
auto_evolution_config=cfg,
)
mixin.set_current_module(_make_module())
# Should complete without raising even if internal steps have issues
entry = await mixin.evolve_after_task(_make_task(), _make_failure_result())
assert entry is not None
# ── Integration: fire-and-forget via asyncio.create_task ─
class TestFireAndForgetIntegration:
"""Evolution fires via U2's execute_stream hooks (fire-and-forget pattern).
Validates that evolve_after_task works correctly when scheduled as a
fire-and-forget asyncio task, matching _trigger_evolution_hooks behavior.
"""
async def test_evolve_after_task_completes_as_asyncio_task(self):
"""evolve_after_task completes when scheduled via asyncio.create_task."""
cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=True)
reflector = LowQualityReflector()
mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg)
task = _make_task()
result = _make_failure_result()
# Schedule as fire-and-forget task (mirrors _schedule_evolution)
async def _evolve():
await mixin.evolve_after_task(task, result)
t = asyncio.create_task(_evolve())
await t # wait for completion
history = mixin.get_evolution_history()
assert len(history) == 1
assert history[0]["reflection"] is not None
async def test_concurrent_evolution_tasks_isolated(self):
"""Multiple concurrent evolution tasks don't interfere."""
cfg = EvolutionConfig(success_sample_rate=1.0, observe_only=True)
reflector = LowQualityReflector()
mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg)
async def _run_one(task_id: str):
await mixin.evolve_after_task(
_make_task(task_id=task_id),
_make_failure_result(task_id=task_id),
)
await asyncio.gather(
_run_one("task-a"),
_run_one("task-b"),
_run_one("task-c"),
)
history = mixin.get_evolution_history()
assert len(history) == 3
task_ids = {h["task_id"] for h in history}
assert task_ids == {"task-a", "task-b", "task-c"}
# ── Backpressure cap (U2 _schedule_evolution) ────────────
class TestBackpressureCap:
"""Backpressure cap reached -> evolution task dropped + logged.
Tests U2's _schedule_evolution backpressure, which U6's auto-trigger relies on.
"""
async def test_evolution_task_dropped_when_cap_reached(self):
"""When pending tasks reach cap, new evolution tasks are dropped."""
import agentkit.core.config_driven as cd
# Save original state to restore after test
try:
# Create blocking coroutines that won't complete during the test
block_event = asyncio.Event()
async def _blocking_evolve() -> None:
await block_event.wait()
cap = 4
# Fill up to cap
for _ in range(cap):
cd._schedule_evolution(_blocking_evolve(), cap=cap)
assert len(cd._pending_evolution_tasks) == cap
# Track dropped count before (access via module — int is immutable)
dropped_before = cd._evolution_dropped_count
# Try to schedule one more -> should be dropped
cd._schedule_evolution(_blocking_evolve(), cap=cap)
assert len(cd._pending_evolution_tasks) == cap # still at cap
assert cd._evolution_dropped_count == dropped_before + 1
# Release the blocking tasks so they can complete and be cleaned up
block_event.set()
# Let the event loop process task completions
await asyncio.sleep(0.05)
finally:
# Restore: clean up any remaining tasks
block_event = asyncio.Event()
block_event.set()
# Wait for any stragglers
if cd._pending_evolution_tasks:
await asyncio.gather(*cd._pending_evolution_tasks, return_exceptions=True)
cd._pending_evolution_tasks.clear()
# ── AE3: Happy path — pitfall detection ──────────────────
class TestAE3HappyPath:
"""AE3: task fails -> evolution fires (100%) -> Reflector records ->
PitfallDetector detects; task succeeds -> evolution fires at 0.1 rate.
"""
async def test_failure_triggers_evolution_and_pitfall_detection(self):
"""Full happy path: failure -> evolution -> pitfall detection."""
# 1. Evolution fires on failure (100%)
cfg = EvolutionConfig(success_sample_rate=0.0, observe_only=True)
reflector = LowQualityReflector()
mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg)
task = _make_task()
result = _make_failure_result()
entry = await mixin.evolve_after_task(task, result)
assert entry.reflection is not None
assert entry.reflection.outcome == "failure"
# 2. PitfallDetector detects high-failure-rate step
store = InMemoryExperienceStore(decay_rate=0.01, alpha=0.7)
for _ in range(6):
await store.record_experience(
_make_experience(
task_type="order_processing",
outcome="failure",
steps_summary=[
{"step_name": "Call API", "outcome": "failure", "error": "timeout"},
],
)
)
for _ in range(4):
await store.record_experience(
_make_experience(
task_type="order_processing",
outcome="success",
success_rate=1.0,
steps_summary=[
{"step_name": "Call API", "outcome": "success"},
],
)
)
detector = PitfallDetector(experience_store=store, similarity_threshold=0.3)
from agentkit.core.plan_schema import PlanStep, PlanStepStatus
steps = [
PlanStep(
step_id="s1",
name="Call API",
description="Call external API",
status=PlanStepStatus.PENDING,
)
]
warnings = await detector.check_pitfalls(task_type="order_processing", planned_steps=steps)
assert len(warnings) == 1
assert warnings[0].warning_level == WarningLevel.HIGH
assert warnings[0].failure_rate >= 0.5
async def test_success_sampled_at_0_1_rate(self):
"""Success path: with rate=0.1, ~10% of tasks trigger evolution."""
cfg = EvolutionConfig(success_sample_rate=0.1, observe_only=True)
reflector = SuccessReflector()
triggered = 0
total = 100
for _ in range(total):
mixin = EvolutionMixin(reflector=reflector, auto_evolution_config=cfg)
entry = await mixin.evolve_after_task(
_make_task(), _make_result(status=TaskStatus.COMPLETED)
)
if entry.reflection is not None:
triggered += 1
# With rate=0.1 over 100 trials, expect ~10 (allow wide tolerance)
# ponytail: statistical test; flaky at extreme bounds. Upgrade to
# deterministic mock if CI reliability becomes an issue.
assert 1 <= triggered <= 25