fischer-agentkit/tests/integration/test_chat_adaptive_e2e.py

237 lines
8.3 KiB
Python

"""Integration tests for Chat + Adaptive + Multi-Agent features (U8).
End-to-end tests that verify the new Phase 8 capabilities work together:
- Chat mode with session management
- Adaptive pipeline execution with reflection
- Multi-Agent communication via MessageBus
"""
import asyncio
import pytest
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock
from agentkit.bus.memory_bus import InMemoryMessageBus
from agentkit.bus.message import AgentMessage
from agentkit.core.orchestrator import Orchestrator, OrchestratorConfig
from agentkit.core.protocol import TaskMessage, TaskStatus
from agentkit.orchestrator.pipeline_engine import PipelineEngine
from agentkit.orchestrator.pipeline_schema import (
AdaptiveConfig,
Pipeline,
PipelineStage,
StageStatus,
)
from agentkit.session.manager import SessionManager
from agentkit.session.models import MessageRole
from agentkit.session.store import InMemorySessionStore
# ── Chat + Session Integration ────────────────────────────
class TestChatSessionIntegration:
@pytest.mark.asyncio
async def test_session_lifecycle_with_messages(self):
"""Full session lifecycle: create → chat → pause → resume → close."""
store = InMemorySessionStore()
manager = SessionManager(store=store)
# Create session
session = await manager.create_session(agent_name="test_agent")
assert session is not None
session_id = session.session_id
# Append messages
await manager.append_message(session_id, role=MessageRole.USER, content="Hello")
await manager.append_message(session_id, role=MessageRole.ASSISTANT, content="Hi there!")
await manager.append_message(session_id, role=MessageRole.USER, content="How are you?")
# Get messages
messages = await manager.get_messages(session_id)
assert len(messages) == 3
# Get chat messages (for LLM consumption)
chat_messages = await manager.get_chat_messages(session_id)
assert len(chat_messages) == 3
assert chat_messages[0]["role"] == "user"
# Pause and resume
paused = await manager.pause_session(session_id)
assert paused.status.value == "paused"
resumed = await manager.resume_session(session_id)
assert resumed.status.value == "active"
# Close
closed = await manager.close_session(session_id)
assert closed.status.value == "closed"
@pytest.mark.asyncio
async def test_closed_session_rejects_messages(self):
"""Closed sessions should not accept new messages."""
store = InMemorySessionStore()
manager = SessionManager(store=store)
session = await manager.create_session(agent_name="test_agent")
session_id = session.session_id
await manager.close_session(session_id)
with pytest.raises(ValueError, match="closed"):
await manager.append_message(session_id, role=MessageRole.USER, content="test")
# ── Adaptive Pipeline Integration ─────────────────────────
class TestAdaptivePipelineIntegration:
@pytest.mark.asyncio
async def test_pipeline_succeeds_without_adaptive(self):
"""Pipeline should succeed normally without adaptive config."""
engine = PipelineEngine() # dry-run mode
pipeline = Pipeline(
name="test_pipeline",
version="1.0",
description="Test",
stages=[
PipelineStage(name="step1", agent="a", action="do"),
PipelineStage(name="step2", agent="b", action="do"),
],
)
result = await engine.execute(pipeline)
assert result.status == StageStatus.COMPLETED
@pytest.mark.asyncio
async def test_pipeline_adaptive_config_default_disabled(self):
"""AdaptiveConfig default should have enabled=False."""
config = AdaptiveConfig()
assert config.enabled is False
@pytest.mark.asyncio
async def test_circular_dependency_fails_gracefully(self):
"""Pipeline with circular dependency should fail gracefully."""
engine = PipelineEngine()
pipeline = Pipeline(
name="circular",
version="1.0",
description="Circular",
stages=[
PipelineStage(name="a", agent="x", action="do", depends_on=["b"]),
PipelineStage(name="b", agent="y", action="do", depends_on=["a"]),
],
)
config = AdaptiveConfig(enabled=True, max_reflections=2)
result = await engine.execute(pipeline, adaptive_config=config)
assert result.status == StageStatus.FAILED
# ── Multi-Agent Communication Integration ─────────────────
class TestMultiAgentCommunication:
@pytest.mark.asyncio
async def test_worker_publishes_progress_to_orchestrator(self):
"""Workers should publish progress messages to orchestrator via MessageBus."""
bus = InMemoryMessageBus()
# Create mock agent pool
mock_agent = AsyncMock()
mock_result = MagicMock()
mock_result.output_data = {"analysis": "complete"}
mock_agent.execute = AsyncMock(return_value=mock_result)
pool = MagicMock()
pool.get_agent = lambda name: mock_agent
pool.list_agents = lambda: [
{"name": "analyst", "agent_type": "worker", "description": "Analyst"},
]
orch = Orchestrator(agent_pool=pool, message_bus=bus)
# Subscribe orchestrator to receive progress
progress: list[AgentMessage] = []
await bus.subscribe("orchestrator", lambda msg: progress.append(msg))
# Execute task
task = TaskMessage(
task_id="t1",
agent_name="analyst",
task_type="analyze",
priority=0,
input_data={"data": "test"},
callback_url=None,
created_at=datetime.now(timezone.utc),
timeout_seconds=60,
)
result = await orch.execute(task)
assert result.status == TaskStatus.COMPLETED
# Verify progress was published
await asyncio.sleep(0.2)
assert len(progress) >= 1
assert progress[0].topic == "task.progress"
@pytest.mark.asyncio
async def test_agent_to_agent_communication(self):
"""Two agents should be able to communicate via MessageBus."""
bus = InMemoryMessageBus()
received_by_a: list[AgentMessage] = []
received_by_b: list[AgentMessage] = []
async def handler_a(msg: AgentMessage):
received_by_a.append(msg)
async def handler_b(msg: AgentMessage):
received_by_b.append(msg)
await bus.subscribe("agent_a", handler_a)
await bus.subscribe("agent_b", handler_b)
# Agent A sends to Agent B
await bus.publish(AgentMessage(
sender="agent_a",
recipient="agent_b",
topic="task.result",
payload={"output": "analysis complete"},
))
await asyncio.sleep(0.1)
assert len(received_by_b) == 1
assert received_by_b[0].payload["output"] == "analysis complete"
assert len(received_by_a) == 0 # Not addressed to A
# ── Config Integration ────────────────────────────────────
class TestConfigIntegration:
def test_adaptive_config_serialization(self):
"""AdaptiveConfig should be serializable."""
from agentkit.orchestrator.pipeline_schema import AdaptiveConfig
config = AdaptiveConfig(
enabled=True,
max_reflections=5,
reflection_model="deepseek/deepseek-chat",
skip_stages=["cleanup"],
)
data = config.model_dump()
assert data["enabled"] is True
assert data["max_reflections"] == 5
assert "cleanup" in data["skip_stages"]
def test_orchestrator_config_serialization(self):
"""OrchestratorConfig should be a simple dataclass."""
config = OrchestratorConfig(
adaptive=True,
max_iterations=5,
quality_threshold=0.9,
)
assert config.adaptive is True
assert config.max_iterations == 5
assert config.quality_threshold == 0.9