"""E2E Basic Function Tests — WebSocket chat protocol. Verifies the WebSocket chat protocol works correctly: 1. Connection lifecycle (connect → connected → ping/pong → disconnect) 2. Message exchange (user message → token stream → final_answer) 3. Confirmation flow (confirmation_request → confirmation_reply → confirmation_result) 4. AskHuman flow (ask_human → reply → continue) 5. Cancel flow (cancel → error/cancelled) 6. Expert team events (team_formed → expert_step → team_synthesis → team_dissolved) """ import asyncio import json import pytest from tests.e2e.conftest import WSChatHelper, create_session_via_api, register_skill_via_api # ═══════════════════════════════════════════════════════════════════════════ # 1. Connection Lifecycle # ═══════════════════════════════════════════════════════════════════════════ @pytest.mark.e2e_basic class TestWSConnection: @pytest.mark.asyncio async def test_connect_receives_connected_event(self, ws_helper: WSChatHelper, api_client): session_id = create_session_via_api(api_client) messages = await ws_helper.connect_and_chat(session_id, []) assert len(messages) >= 1 assert messages[0].get("type") == "connected" @pytest.mark.asyncio async def test_ping_pong(self, ws_helper: WSChatHelper, api_client): """Ping should receive pong response.""" try: import websockets except ImportError: pytest.skip("websockets not installed") session_id = create_session_via_api(api_client) uri = f"{ws_helper.base_ws_url}/api/v1/chat/ws/{session_id}?api_key={ws_helper.api_key}" received: list[dict] = [] async with websockets.connect(uri) as ws: # Wait for connected msg = await asyncio.wait_for(ws.recv(), timeout=10) received.append(json.loads(msg)) # Send ping await ws.send(json.dumps({"type": "ping"})) raw = await asyncio.wait_for(ws.recv(), timeout=10) resp = json.loads(raw) assert resp.get("type") == "pong" @pytest.mark.asyncio async def test_invalid_session_id(self, ws_helper: WSChatHelper): """Connecting with invalid session ID should fail.""" try: import websockets except ImportError: pytest.skip("websockets not installed") uri = f"{ws_helper.base_ws_url}/api/v1/chat/ws/nonexistent-session?api_key={ws_helper.api_key}" with pytest.raises(Exception): async with websockets.connect(uri) as ws: await asyncio.wait_for(ws.recv(), timeout=5) # ═══════════════════════════════════════════════════════════════════════════ # 2. Message Exchange # ═══════════════════════════════════════════════════════════════════════════ @pytest.mark.e2e_basic class TestWSMessageExchange: @pytest.mark.asyncio async def test_send_message_get_response(self, ws_helper: WSChatHelper, api_client): session_id = create_session_via_api(api_client) messages = await ws_helper.connect_and_chat( session_id, [{"type": "message", "content": "Hello, this is an e2e test"}], ) # Should receive at least: connected + some response (token/final_answer/error) assert len(messages) >= 2 # Last meaningful message should be final_answer or error response_types = [m.get("type") for m in messages] assert any(t in response_types for t in ("final_answer", "error", "token", "thinking")) @pytest.mark.asyncio async def test_message_types_are_valid(self, ws_helper: WSChatHelper, api_client): """All server-sent messages should have a valid 'type' field.""" session_id = create_session_via_api(api_client) messages = await ws_helper.connect_and_chat( session_id, [{"type": "message", "content": "Test valid message types"}], ) valid_types = { "connected", "token", "thinking", "step", "final_answer", "skill_match", "confirmation_request", "confirmation_result", "ask_human", "error", "pong", "team_formed", "expert_step", "expert_result", "plan_update", "team_synthesis", "team_dissolved", } for msg in messages: if isinstance(msg, dict) and "type" in msg: assert msg["type"] in valid_types, f"Invalid message type: {msg['type']}" # ═══════════════════════════════════════════════════════════════════════════ # 3. Cancel Flow # ═══════════════════════════════════════════════════════════════════════════ @pytest.mark.e2e_basic class TestWSCancel: @pytest.mark.asyncio async def test_cancel_message_accepted(self, ws_helper: WSChatHelper, api_client): """Sending cancel should be accepted by the server.""" try: import websockets except ImportError: pytest.skip("websockets not installed") session_id = create_session_via_api(api_client) uri = f"{ws_helper.base_ws_url}/api/v1/chat/ws/{session_id}?api_key={ws_helper.api_key}" async with websockets.connect(uri) as ws: # Wait for connected await asyncio.wait_for(ws.recv(), timeout=10) # Send a message first await ws.send(json.dumps({"type": "message", "content": "Start a task"})) # Immediately send cancel await ws.send(json.dumps({"type": "cancel"})) # Server should handle gracefully (no crash) # ═══════════════════════════════════════════════════════════════════════════ # 4. Skill Match Event # ═══════════════════════════════════════════════════════════════════════════ @pytest.mark.e2e_basic class TestWSSkillMatch: @pytest.mark.asyncio async def test_skill_match_notification(self, ws_helper: WSChatHelper, api_client): """When a skill is matched, server should send skill_match event.""" register_skill_via_api(api_client, "ws_skill", keywords=["ws_skill_match"]) session_id = create_session_via_api(api_client) messages = await ws_helper.connect_and_chat( session_id, [{"type": "message", "content": "Please use ws_skill_match for this"}], ) # Check if skill_match event was sent (may or may not happen depending on routing) _ = [m.get("type") for m in messages] # noqa: F841 # At minimum, we should get a response (skill_match or direct answer) assert len(messages) >= 2