diff --git a/tests/integration/test_chat_adaptive_e2e.py b/tests/integration/test_chat_adaptive_e2e.py new file mode 100644 index 0000000..b0a83dd --- /dev/null +++ b/tests/integration/test_chat_adaptive_e2e.py @@ -0,0 +1,236 @@ +"""Integration tests for Chat + Adaptive + Multi-Agent features (U8). + +End-to-end tests that verify the new Phase 8 capabilities work together: +- Chat mode with session management +- Adaptive pipeline execution with reflection +- Multi-Agent communication via MessageBus +""" + +import asyncio +import pytest +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock + +from agentkit.bus.memory_bus import InMemoryMessageBus +from agentkit.bus.message import AgentMessage +from agentkit.core.orchestrator import Orchestrator, OrchestratorConfig +from agentkit.core.protocol import TaskMessage, TaskStatus +from agentkit.orchestrator.pipeline_engine import PipelineEngine +from agentkit.orchestrator.pipeline_schema import ( + AdaptiveConfig, + Pipeline, + PipelineStage, + StageStatus, +) +from agentkit.session.manager import SessionManager +from agentkit.session.models import MessageRole +from agentkit.session.store import InMemorySessionStore + + +# ── Chat + Session Integration ──────────────────────────── + + +class TestChatSessionIntegration: + @pytest.mark.asyncio + async def test_session_lifecycle_with_messages(self): + """Full session lifecycle: create → chat → pause → resume → close.""" + store = InMemorySessionStore() + manager = SessionManager(store=store) + + # Create session + session = await manager.create_session(agent_name="test_agent") + assert session is not None + session_id = session.session_id + + # Append messages + await manager.append_message(session_id, role=MessageRole.USER, content="Hello") + await manager.append_message(session_id, role=MessageRole.ASSISTANT, content="Hi there!") + await manager.append_message(session_id, role=MessageRole.USER, content="How are you?") + + # Get messages + messages = await manager.get_messages(session_id) + assert len(messages) == 3 + + # Get chat messages (for LLM consumption) + chat_messages = await manager.get_chat_messages(session_id) + assert len(chat_messages) == 3 + assert chat_messages[0]["role"] == "user" + + # Pause and resume + paused = await manager.pause_session(session_id) + assert paused.status.value == "paused" + + resumed = await manager.resume_session(session_id) + assert resumed.status.value == "active" + + # Close + closed = await manager.close_session(session_id) + assert closed.status.value == "closed" + + @pytest.mark.asyncio + async def test_closed_session_rejects_messages(self): + """Closed sessions should not accept new messages.""" + store = InMemorySessionStore() + manager = SessionManager(store=store) + + session = await manager.create_session(agent_name="test_agent") + session_id = session.session_id + await manager.close_session(session_id) + + with pytest.raises(ValueError, match="closed"): + await manager.append_message(session_id, role=MessageRole.USER, content="test") + + +# ── Adaptive Pipeline Integration ───────────────────────── + + +class TestAdaptivePipelineIntegration: + @pytest.mark.asyncio + async def test_pipeline_succeeds_without_adaptive(self): + """Pipeline should succeed normally without adaptive config.""" + engine = PipelineEngine() # dry-run mode + pipeline = Pipeline( + name="test_pipeline", + version="1.0", + description="Test", + stages=[ + PipelineStage(name="step1", agent="a", action="do"), + PipelineStage(name="step2", agent="b", action="do"), + ], + ) + + result = await engine.execute(pipeline) + assert result.status == StageStatus.COMPLETED + + @pytest.mark.asyncio + async def test_pipeline_adaptive_config_default_disabled(self): + """AdaptiveConfig default should have enabled=False.""" + config = AdaptiveConfig() + assert config.enabled is False + + @pytest.mark.asyncio + async def test_circular_dependency_fails_gracefully(self): + """Pipeline with circular dependency should fail gracefully.""" + engine = PipelineEngine() + pipeline = Pipeline( + name="circular", + version="1.0", + description="Circular", + stages=[ + PipelineStage(name="a", agent="x", action="do", depends_on=["b"]), + PipelineStage(name="b", agent="y", action="do", depends_on=["a"]), + ], + ) + + config = AdaptiveConfig(enabled=True, max_reflections=2) + result = await engine.execute(pipeline, adaptive_config=config) + assert result.status == StageStatus.FAILED + + +# ── Multi-Agent Communication Integration ───────────────── + + +class TestMultiAgentCommunication: + @pytest.mark.asyncio + async def test_worker_publishes_progress_to_orchestrator(self): + """Workers should publish progress messages to orchestrator via MessageBus.""" + bus = InMemoryMessageBus() + + # Create mock agent pool + mock_agent = AsyncMock() + mock_result = MagicMock() + mock_result.output_data = {"analysis": "complete"} + mock_agent.execute = AsyncMock(return_value=mock_result) + + pool = MagicMock() + pool.get_agent = lambda name: mock_agent + pool.list_agents = lambda: [ + {"name": "analyst", "agent_type": "worker", "description": "Analyst"}, + ] + + orch = Orchestrator(agent_pool=pool, message_bus=bus) + + # Subscribe orchestrator to receive progress + progress: list[AgentMessage] = [] + await bus.subscribe("orchestrator", lambda msg: progress.append(msg)) + + # Execute task + task = TaskMessage( + task_id="t1", + agent_name="analyst", + task_type="analyze", + priority=0, + input_data={"data": "test"}, + callback_url=None, + created_at=datetime.now(timezone.utc), + timeout_seconds=60, + ) + + result = await orch.execute(task) + assert result.status == TaskStatus.COMPLETED + + # Verify progress was published + await asyncio.sleep(0.2) + assert len(progress) >= 1 + assert progress[0].topic == "task.progress" + + @pytest.mark.asyncio + async def test_agent_to_agent_communication(self): + """Two agents should be able to communicate via MessageBus.""" + bus = InMemoryMessageBus() + + received_by_a: list[AgentMessage] = [] + received_by_b: list[AgentMessage] = [] + + async def handler_a(msg: AgentMessage): + received_by_a.append(msg) + + async def handler_b(msg: AgentMessage): + received_by_b.append(msg) + + await bus.subscribe("agent_a", handler_a) + await bus.subscribe("agent_b", handler_b) + + # Agent A sends to Agent B + await bus.publish(AgentMessage( + sender="agent_a", + recipient="agent_b", + topic="task.result", + payload={"output": "analysis complete"}, + )) + + await asyncio.sleep(0.1) + assert len(received_by_b) == 1 + assert received_by_b[0].payload["output"] == "analysis complete" + assert len(received_by_a) == 0 # Not addressed to A + + +# ── Config Integration ──────────────────────────────────── + + +class TestConfigIntegration: + def test_adaptive_config_serialization(self): + """AdaptiveConfig should be serializable.""" + from agentkit.orchestrator.pipeline_schema import AdaptiveConfig + + config = AdaptiveConfig( + enabled=True, + max_reflections=5, + reflection_model="deepseek/deepseek-chat", + skip_stages=["cleanup"], + ) + data = config.model_dump() + assert data["enabled"] is True + assert data["max_reflections"] == 5 + assert "cleanup" in data["skip_stages"] + + def test_orchestrator_config_serialization(self): + """OrchestratorConfig should be a simple dataclass.""" + config = OrchestratorConfig( + adaptive=True, + max_iterations=5, + quality_threshold=0.9, + ) + assert config.adaptive is True + assert config.max_iterations == 5 + assert config.quality_threshold == 0.9