171 lines
7.7 KiB
Python
171 lines
7.7 KiB
Python
"""E2E Basic Function Tests — WebSocket chat protocol.
|
|
|
|
Verifies the WebSocket chat protocol works correctly:
|
|
1. Connection lifecycle (connect → connected → ping/pong → disconnect)
|
|
2. Message exchange (user message → token stream → final_answer)
|
|
3. Confirmation flow (confirmation_request → confirmation_reply → confirmation_result)
|
|
4. AskHuman flow (ask_human → reply → continue)
|
|
5. Cancel flow (cancel → error/cancelled)
|
|
6. Expert team events (team_formed → expert_step → team_synthesis → team_dissolved)
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
|
|
import pytest
|
|
|
|
from tests.e2e.conftest import WSChatHelper, create_session_via_api, register_skill_via_api
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
# 1. Connection Lifecycle
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
|
|
|
|
@pytest.mark.e2e_basic
|
|
class TestWSConnection:
|
|
@pytest.mark.asyncio
|
|
async def test_connect_receives_connected_event(self, ws_helper: WSChatHelper, api_client):
|
|
session_id = create_session_via_api(api_client)
|
|
messages = await ws_helper.connect_and_chat(session_id, [])
|
|
assert len(messages) >= 1
|
|
assert messages[0].get("type") == "connected"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ping_pong(self, ws_helper: WSChatHelper, api_client):
|
|
"""Ping should receive pong response."""
|
|
try:
|
|
import websockets
|
|
except ImportError:
|
|
pytest.skip("websockets not installed")
|
|
|
|
session_id = create_session_via_api(api_client)
|
|
uri = f"{ws_helper.base_ws_url}/api/v1/chat/ws/{session_id}?api_key={ws_helper.api_key}"
|
|
|
|
received: list[dict] = []
|
|
async with websockets.connect(uri) as ws:
|
|
# Wait for connected
|
|
msg = await asyncio.wait_for(ws.recv(), timeout=10)
|
|
received.append(json.loads(msg))
|
|
|
|
# Send ping
|
|
await ws.send(json.dumps({"type": "ping"}))
|
|
raw = await asyncio.wait_for(ws.recv(), timeout=10)
|
|
resp = json.loads(raw)
|
|
assert resp.get("type") == "pong"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_invalid_session_id(self, ws_helper: WSChatHelper):
|
|
"""Connecting with invalid session ID should fail."""
|
|
try:
|
|
import websockets
|
|
except ImportError:
|
|
pytest.skip("websockets not installed")
|
|
|
|
uri = f"{ws_helper.base_ws_url}/api/v1/chat/ws/nonexistent-session?api_key={ws_helper.api_key}"
|
|
with pytest.raises(Exception):
|
|
async with websockets.connect(uri) as ws:
|
|
await asyncio.wait_for(ws.recv(), timeout=5)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
# 2. Message Exchange
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
|
|
|
|
@pytest.mark.e2e_basic
|
|
class TestWSMessageExchange:
|
|
@pytest.mark.asyncio
|
|
async def test_send_message_get_response(self, ws_helper: WSChatHelper, api_client):
|
|
session_id = create_session_via_api(api_client)
|
|
messages = await ws_helper.connect_and_chat(
|
|
session_id,
|
|
[{"type": "message", "content": "Hello, this is an e2e test"}],
|
|
)
|
|
# Should receive at least: connected + some response (token/final_answer/error)
|
|
assert len(messages) >= 2
|
|
# Last meaningful message should be final_answer or error
|
|
response_types = [m.get("type") for m in messages]
|
|
assert any(t in response_types for t in ("final_answer", "error", "token", "thinking"))
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_message_types_are_valid(self, ws_helper: WSChatHelper, api_client):
|
|
"""All server-sent messages should have a valid 'type' field."""
|
|
session_id = create_session_via_api(api_client)
|
|
messages = await ws_helper.connect_and_chat(
|
|
session_id,
|
|
[{"type": "message", "content": "Test valid message types"}],
|
|
)
|
|
valid_types = {
|
|
"connected",
|
|
"token",
|
|
"thinking",
|
|
"step",
|
|
"final_answer",
|
|
"skill_match",
|
|
"confirmation_request",
|
|
"confirmation_result",
|
|
"ask_human",
|
|
"error",
|
|
"pong",
|
|
"team_formed",
|
|
"expert_step",
|
|
"expert_result",
|
|
"plan_update",
|
|
"team_synthesis",
|
|
"team_dissolved",
|
|
}
|
|
for msg in messages:
|
|
if isinstance(msg, dict) and "type" in msg:
|
|
assert msg["type"] in valid_types, f"Invalid message type: {msg['type']}"
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
# 3. Cancel Flow
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
|
|
|
|
@pytest.mark.e2e_basic
|
|
class TestWSCancel:
|
|
@pytest.mark.asyncio
|
|
async def test_cancel_message_accepted(self, ws_helper: WSChatHelper, api_client):
|
|
"""Sending cancel should be accepted by the server."""
|
|
try:
|
|
import websockets
|
|
except ImportError:
|
|
pytest.skip("websockets not installed")
|
|
|
|
session_id = create_session_via_api(api_client)
|
|
uri = f"{ws_helper.base_ws_url}/api/v1/chat/ws/{session_id}?api_key={ws_helper.api_key}"
|
|
|
|
async with websockets.connect(uri) as ws:
|
|
# Wait for connected
|
|
await asyncio.wait_for(ws.recv(), timeout=10)
|
|
# Send a message first
|
|
await ws.send(json.dumps({"type": "message", "content": "Start a task"}))
|
|
# Immediately send cancel
|
|
await ws.send(json.dumps({"type": "cancel"}))
|
|
# Server should handle gracefully (no crash)
|
|
|
|
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
# 4. Skill Match Event
|
|
# ═══════════════════════════════════════════════════════════════════════════
|
|
|
|
|
|
@pytest.mark.e2e_basic
|
|
class TestWSSkillMatch:
|
|
@pytest.mark.asyncio
|
|
async def test_skill_match_notification(self, ws_helper: WSChatHelper, api_client):
|
|
"""When a skill is matched, server should send skill_match event."""
|
|
register_skill_via_api(api_client, "ws_skill", keywords=["ws_skill_match"])
|
|
session_id = create_session_via_api(api_client)
|
|
messages = await ws_helper.connect_and_chat(
|
|
session_id,
|
|
[{"type": "message", "content": "Please use ws_skill_match for this"}],
|
|
)
|
|
# Check if skill_match event was sent (may or may not happen depending on routing)
|
|
_ = [m.get("type") for m in messages] # noqa: F841
|
|
# At minimum, we should get a response (skill_match or direct answer)
|
|
assert len(messages) >= 2
|