"""Integration tests for Chat + Adaptive + Multi-Agent features (U8). End-to-end tests that verify the new Phase 8 capabilities work together: - Chat mode with session management - Adaptive pipeline execution with reflection - Multi-Agent communication via MessageBus """ import asyncio import pytest from datetime import datetime, timezone from unittest.mock import AsyncMock, MagicMock from agentkit.bus.memory_bus import InMemoryMessageBus from agentkit.bus.message import AgentMessage from agentkit.core.orchestrator import Orchestrator, OrchestratorConfig from agentkit.core.protocol import TaskMessage, TaskStatus from agentkit.orchestrator.pipeline_engine import PipelineEngine from agentkit.orchestrator.pipeline_schema import ( AdaptiveConfig, Pipeline, PipelineStage, StageStatus, ) from agentkit.session.manager import SessionManager from agentkit.session.models import MessageRole from agentkit.session.store import InMemorySessionStore # ── Chat + Session Integration ──────────────────────────── class TestChatSessionIntegration: @pytest.mark.asyncio async def test_session_lifecycle_with_messages(self): """Full session lifecycle: create → chat → pause → resume → close.""" store = InMemorySessionStore() manager = SessionManager(store=store) # Create session session = await manager.create_session(agent_name="test_agent") assert session is not None session_id = session.session_id # Append messages await manager.append_message(session_id, role=MessageRole.USER, content="Hello") await manager.append_message(session_id, role=MessageRole.ASSISTANT, content="Hi there!") await manager.append_message(session_id, role=MessageRole.USER, content="How are you?") # Get messages messages = await manager.get_messages(session_id) assert len(messages) == 3 # Get chat messages (for LLM consumption) chat_messages = await manager.get_chat_messages(session_id) assert len(chat_messages) == 3 assert chat_messages[0]["role"] == "user" # Pause and resume paused = await manager.pause_session(session_id) assert paused.status.value == "paused" resumed = await manager.resume_session(session_id) assert resumed.status.value == "active" # Close closed = await manager.close_session(session_id) assert closed.status.value == "closed" @pytest.mark.asyncio async def test_closed_session_rejects_messages(self): """Closed sessions should not accept new messages.""" store = InMemorySessionStore() manager = SessionManager(store=store) session = await manager.create_session(agent_name="test_agent") session_id = session.session_id await manager.close_session(session_id) with pytest.raises(ValueError, match="closed"): await manager.append_message(session_id, role=MessageRole.USER, content="test") # ── Adaptive Pipeline Integration ───────────────────────── class TestAdaptivePipelineIntegration: @pytest.mark.asyncio async def test_pipeline_succeeds_without_adaptive(self): """Pipeline should succeed normally without adaptive config.""" engine = PipelineEngine() # dry-run mode pipeline = Pipeline( name="test_pipeline", version="1.0", description="Test", stages=[ PipelineStage(name="step1", agent="a", action="do"), PipelineStage(name="step2", agent="b", action="do"), ], ) result = await engine.execute(pipeline) assert result.status == StageStatus.COMPLETED @pytest.mark.asyncio async def test_pipeline_adaptive_config_default_disabled(self): """AdaptiveConfig default should have enabled=False.""" config = AdaptiveConfig() assert config.enabled is False @pytest.mark.asyncio async def test_circular_dependency_fails_gracefully(self): """Pipeline with circular dependency should fail gracefully.""" engine = PipelineEngine() pipeline = Pipeline( name="circular", version="1.0", description="Circular", stages=[ PipelineStage(name="a", agent="x", action="do", depends_on=["b"]), PipelineStage(name="b", agent="y", action="do", depends_on=["a"]), ], ) config = AdaptiveConfig(enabled=True, max_reflections=2) result = await engine.execute(pipeline, adaptive_config=config) assert result.status == StageStatus.FAILED # ── Multi-Agent Communication Integration ───────────────── class TestMultiAgentCommunication: @pytest.mark.asyncio async def test_worker_publishes_progress_to_orchestrator(self): """Workers should publish progress messages to orchestrator via MessageBus.""" bus = InMemoryMessageBus() # Create mock agent pool mock_agent = AsyncMock() mock_result = MagicMock() mock_result.output_data = {"analysis": "complete"} mock_agent.execute = AsyncMock(return_value=mock_result) pool = MagicMock() pool.get_agent = lambda name: mock_agent pool.list_agents = lambda: [ {"name": "analyst", "agent_type": "worker", "description": "Analyst"}, ] orch = Orchestrator(agent_pool=pool, message_bus=bus) # Subscribe orchestrator to receive progress progress: list[AgentMessage] = [] await bus.subscribe("orchestrator", lambda msg: progress.append(msg)) # Execute task task = TaskMessage( task_id="t1", agent_name="analyst", task_type="analyze", priority=0, input_data={"data": "test"}, callback_url=None, created_at=datetime.now(timezone.utc), timeout_seconds=60, ) result = await orch.execute(task) assert result.status == TaskStatus.COMPLETED # Verify progress was published await asyncio.sleep(0.2) assert len(progress) >= 1 assert progress[0].topic == "task.progress" @pytest.mark.asyncio async def test_agent_to_agent_communication(self): """Two agents should be able to communicate via MessageBus.""" bus = InMemoryMessageBus() received_by_a: list[AgentMessage] = [] received_by_b: list[AgentMessage] = [] async def handler_a(msg: AgentMessage): received_by_a.append(msg) async def handler_b(msg: AgentMessage): received_by_b.append(msg) await bus.subscribe("agent_a", handler_a) await bus.subscribe("agent_b", handler_b) # Agent A sends to Agent B await bus.publish(AgentMessage( sender="agent_a", recipient="agent_b", topic="task.result", payload={"output": "analysis complete"}, )) await asyncio.sleep(0.1) assert len(received_by_b) == 1 assert received_by_b[0].payload["output"] == "analysis complete" assert len(received_by_a) == 0 # Not addressed to A # ── Config Integration ──────────────────────────────────── class TestConfigIntegration: def test_adaptive_config_serialization(self): """AdaptiveConfig should be serializable.""" from agentkit.orchestrator.pipeline_schema import AdaptiveConfig config = AdaptiveConfig( enabled=True, max_reflections=5, reflection_model="deepseek/deepseek-chat", skip_stages=["cleanup"], ) data = config.model_dump() assert data["enabled"] is True assert data["max_reflections"] == 5 assert "cleanup" in data["skip_stages"] def test_orchestrator_config_serialization(self): """OrchestratorConfig should be a simple dataclass.""" config = OrchestratorConfig( adaptive=True, max_iterations=5, quality_threshold=0.9, ) assert config.adaptive is True assert config.max_iterations == 5 assert config.quality_threshold == 0.9