feat: integrate SQ/EQ into portal WebSocket and CLI (Phase 4)

- app.py: initialize EventQueue + SubmissionQueue in app.state, close on shutdown
- portal.py: emit unified events (task.created/started/completed/failed,
  turn.thinking/tool_call/tool_result/final_answer) to EQ alongside WebSocket messages
- cli/chat.py: optional --event-queue flag for event emission
- EQ is bypass-only: emit failures never affect WebSocket or CLI main flow
- WebSocket message format unchanged (backward compatible)

Tests: 650 passed, 0 failed, 4 skipped
This commit is contained in:
chiguyong 2026-06-17 11:05:04 +08:00
parent 773a62ead2
commit ecf87391a5
4 changed files with 827 additions and 1 deletions

View File

@ -32,9 +32,14 @@ def chat(
None, "--system-prompt", "-s", help="Custom system prompt"
),
no_stream: bool = typer.Option(False, "--no-stream", help="Disable token streaming"),
event_queue: bool = typer.Option(
False,
"--event-queue",
help="Enable EventQueue for structured event emission (for future extensions)",
),
):
"""Start an interactive chat session with an Agent."""
asyncio.run(_chat_async(model, agent_name, config, system_prompt, no_stream))
asyncio.run(_chat_async(model, agent_name, config, system_prompt, no_stream, event_queue))
async def _chat_async(
@ -43,6 +48,7 @@ async def _chat_async(
config_arg: str | None,
system_prompt: str | None,
no_stream: bool,
enable_event_queue: bool = False,
) -> None:
"""Async implementation of the chat command."""
from agentkit.cli.onboarding import run_onboarding
@ -87,6 +93,58 @@ async def _chat_async(
session_manager = SessionManager(store=InMemorySessionStore())
session = await session_manager.create_session(agent_name=agent_name)
# Initialize EventQueue if --event-queue flag is set (lightweight, optional)
# The EQ is a side-channel for structured event emission; CLI output format is unchanged.
eq = None
_emit = None # type: ignore[assignment]
if enable_event_queue:
import uuid as _uuid
from agentkit.core.event_queue import EventQueue
from agentkit.core.protocol import (
Event,
SessionEventType,
TaskEventType,
TurnEventType,
)
eq = EventQueue()
# Map ReAct engine event_type strings to TurnEventType constants for EQ emission
_CLI_REACT_EVENT_MAP: dict[str, str] = {
"thinking": TurnEventType.THINKING,
"tool_call": TurnEventType.TOOL_CALL,
"tool_result": TurnEventType.TOOL_RESULT,
"final_answer": TurnEventType.FINAL_ANSWER,
}
async def _emit( # type: ignore[no-redef]
event_type: str,
task_id: str,
session_id: str,
data: dict | None = None,
) -> None:
"""Emit an event to the EQ. Best-effort: never raises."""
try:
await eq.emit( # type: ignore[union-attr]
Event.create(
event_type=event_type,
task_id=task_id,
session_id=session_id,
data=data or {},
)
)
except Exception:
pass # EQ is best-effort; never break CLI flow
# Emit session.started event
await _emit(
SessionEventType.SESSION_STARTED,
task_id="",
session_id=session.session_id,
data={"agent_name": agent_name, "model": model},
)
# Build tools list — all available tools for chat mode
search_api_keys = _extract_search_keys(server_config)
tools: list[Tool] = [
@ -199,6 +257,16 @@ async def _chat_async(
conversation_had_messages = True
# Generate task_id for this user message and emit task.created to EQ (if enabled)
cli_task_id = str(_uuid.uuid4()) if enable_event_queue else ""
if _emit is not None:
await _emit(
TaskEventType.TASK_CREATED,
task_id=cli_task_id,
session_id=session.session_id,
data={"message": user_input},
)
# Append user message to session
await session_manager.append_message(
session_id=session.session_id,
@ -227,6 +295,15 @@ async def _chat_async(
f"[dim]Skill: {routing.skill_name} ({routing.match_method}, {int(routing.match_confidence * 100)}%)[/dim]"
)
# Emit task.started to EQ (if enabled)
if _emit is not None:
await _emit(
TaskEventType.TASK_STARTED,
task_id=cli_task_id,
session_id=session.session_id,
data={"agent_name": routing.skill_name or agent_name},
)
exec_system_prompt = routing.system_prompt
exec_tools = routing.tools
exec_model = routing.model
@ -254,6 +331,21 @@ async def _chat_async(
content=output,
agent_name=agent_name,
)
# Emit turn.final_answer and task.completed to EQ (if enabled)
if _emit is not None:
await _emit(
TurnEventType.FINAL_ANSWER,
task_id=cli_task_id,
session_id=session.session_id,
data={"output": output},
)
await _emit(
TaskEventType.TASK_COMPLETED,
task_id=cli_task_id,
session_id=session.session_id,
data={"output": output},
)
else:
# Streaming mode — Live displays under the "Agent:" label
full_content = ""
@ -286,6 +378,18 @@ async def _chat_async(
if full_content:
live.update(Text(full_content))
# Emit turn events to EQ (if enabled)
# Maps ReAct event types to TurnEventType constants
if _emit is not None:
_turn_type = _CLI_REACT_EVENT_MAP.get(event.event_type)
if _turn_type is not None:
await _emit(
_turn_type,
task_id=cli_task_id,
session_id=session.session_id,
data=event.data,
)
# Live already displayed the final content, no need to rprint again
await session_manager.append_message(
@ -295,8 +399,25 @@ async def _chat_async(
agent_name=agent_name,
)
# Emit task.completed to EQ (if enabled)
if _emit is not None:
await _emit(
TaskEventType.TASK_COMPLETED,
task_id=cli_task_id,
session_id=session.session_id,
data={"output": full_content},
)
except Exception as e:
rprint(f"\n[red]Error: {e}[/red]")
# Emit task.failed to EQ (if enabled)
if _emit is not None:
await _emit(
TaskEventType.TASK_FAILED,
task_id=cli_task_id,
session_id=session.session_id,
data={"error": str(e)},
)
# ── Session end: generate daily log ────────────────────────────
if conversation_had_messages:
@ -323,6 +444,19 @@ async def _chat_async(
except Exception:
pass # Daily log generation is best-effort
# Close EventQueue if it was enabled (emit session.ended and close)
if eq is not None and _emit is not None:
try:
await _emit(
SessionEventType.SESSION_ENDED,
task_id="",
session_id=session.session_id,
data={},
)
except Exception:
pass
eq.close()
# ruff: noqa: F821 — string annotations resolved at runtime via from __future__ import annotations

View File

@ -9,6 +9,7 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from agentkit.core.agent_pool import AgentPool
from agentkit.core.event_queue import EventQueue, SubmissionQueue
from agentkit.llm.gateway import LLMGateway
from agentkit.llm.providers.anthropic import AnthropicProvider
from agentkit.llm.providers.gemini import GeminiProvider
@ -340,6 +341,16 @@ async def lifespan(app: FastAPI):
if server_config is not None:
server_config.stop_watching()
# Close SQ/EQ dual-queue system (Phase 4 integration)
# SubmissionQueue.close() marks the queue as closed (no new submissions).
# EventQueue.close() sends sentinel to all subscribers so they can exit gracefully.
event_queue = getattr(app.state, "event_queue", None)
if event_queue is not None:
event_queue.close()
submission_queue = getattr(app.state, "submission_queue", None)
if submission_queue is not None:
submission_queue.close()
await task_store.stop_cleanup()
@ -626,6 +637,13 @@ def create_app(
app.state.api_key = effective_api_key
app.state._config_reload_lock = asyncio.Lock()
# Initialize SQ/EQ dual-queue system (Phase 4 integration)
# - SubmissionQueue (SQ): receives user input, returns task_id
# - EventQueue (EQ): pushes Agent events, supports multi-subscriber broadcast
# Both are closed on app shutdown (see lifespan()).
app.state.submission_queue = SubmissionQueue()
app.state.event_queue = EventQueue()
# Initialize session manager for Chat mode
from agentkit.session.manager import SessionManager
from agentkit.session.store import create_session_store

View File

@ -20,6 +20,8 @@ from fastapi.security import APIKeyHeader, APIKeyQuery
from pydantic import BaseModel
from agentkit.core.config_driven import ConfigDrivenAgent
from agentkit.core.event_queue import EventQueue
from agentkit.core.protocol import Event, TaskEventType, TurnEventType
from agentkit.core.react import ReActEngine
from agentkit.chat.skill_routing import ExecutionMode, SkillRoutingResult
from agentkit.chat.request_preprocessor import RequestPreprocessor
@ -35,6 +37,16 @@ logger = logging.getLogger(__name__)
router = APIRouter(tags=["portal"])
# Map ReAct engine event_type strings to TurnEventType constants for EQ emission.
# Only events with a corresponding TurnEventType are forwarded to the EQ;
# other events (e.g. "token") are still sent over WebSocket but not duplicated to EQ.
_REACT_EVENT_TYPE_MAP: dict[str, str] = {
"thinking": TurnEventType.THINKING,
"tool_call": TurnEventType.TOOL_CALL,
"tool_result": TurnEventType.TOOL_RESULT,
"final_answer": TurnEventType.FINAL_ANSWER,
}
# ---------------------------------------------------------------------------
# API Key Authentication
# ---------------------------------------------------------------------------
@ -50,6 +62,39 @@ def _ensure_non_empty(text: str | None) -> str:
return EMPTY_LLM_RESPONSE
async def _emit_event_safe(
event_queue: EventQueue | None,
event_type: str,
task_id: str,
session_id: str,
data: dict | None = None,
) -> None:
"""Emit an event to the EventQueue without blocking or raising.
The EQ is a side-channel: emit failures must never break the WebSocket flow.
All exceptions are swallowed and logged at warning level.
Args:
event_queue: The EventQueue to emit to (no-op if None)
event_type: Event type (see TaskEventType / TurnEventType)
task_id: Associated task ID
session_id: Associated session ID (conversation_id)
data: Optional event payload
"""
if event_queue is None:
return
try:
event = Event.create(
event_type=event_type,
task_id=task_id,
session_id=session_id,
data=data or {},
)
await event_queue.emit(event)
except Exception as e:
logger.warning(f"EventQueue emit failed (type={event_type}): {e}", exc_info=True)
async def _verify_api_key(
request: Request,
api_key_header: str | None = Security(_api_key_header),
@ -645,6 +690,8 @@ async def portal_websocket(websocket: WebSocket):
# Wait for first chat message before creating conversation
conv: Conversation | None = None
# task_id is per-user-message; tracked here so the outer except can emit task.failed
task_id: str | None = None
try:
while True:
@ -693,6 +740,18 @@ async def portal_websocket(websocket: WebSocket):
conv = await _conversation_store.get_or_create(conv_id)
await websocket.send_json({"type": "connected", "conversation_id": conv.id})
# Generate task_id for this user message and emit task.created to EQ
# (EQ is a side-channel: emit failures never break the WebSocket flow)
task_id = str(uuid.uuid4())
event_queue: EventQueue | None = getattr(websocket.app.state, "event_queue", None)
await _emit_event_safe(
event_queue,
TaskEventType.TASK_CREATED,
task_id=task_id,
session_id=conv.id,
data={"message": message_text},
)
# Add user message to conversation
await _conversation_store.add_message(conv.id, "user", message_text)
start_time = datetime.now(timezone.utc)
@ -771,6 +830,20 @@ async def portal_websocket(websocket: WebSocket):
}
)
# Emit task.started to EQ (execution begins after routing)
await _emit_event_safe(
event_queue,
TaskEventType.TASK_STARTED,
task_id=task_id,
session_id=conv.id,
data={
"agent_name": routing_result.agent_name or "default",
"execution_mode": routing_result.execution_mode.value
if hasattr(routing_result.execution_mode, "value")
else str(routing_result.execution_mode),
},
)
# Execute based on routing result's execution_mode
# This is the single source of truth for path selection,
# replacing fragile string-matching on match_method.
@ -796,6 +869,23 @@ async def portal_websocket(websocket: WebSocket):
# Store assistant reply for multi-turn context continuity
response_content = _ensure_non_empty(response.content)
await _conversation_store.add_message(conv.id, "assistant", response_content)
# Emit turn.final_answer and task.completed to EQ
await _emit_event_safe(
event_queue,
TurnEventType.FINAL_ANSWER,
task_id=task_id,
session_id=conv.id,
data={"output": response_content},
)
await _emit_event_safe(
event_queue,
TaskEventType.TASK_COMPLETED,
task_id=task_id,
session_id=conv.id,
data={"output": response_content},
)
await websocket.send_json(
{
"type": "result",
@ -860,6 +950,23 @@ async def portal_websocket(websocket: WebSocket):
# Store assistant reply for multi-turn context continuity
response_content = _ensure_non_empty(response.content)
await _conversation_store.add_message(conv.id, "assistant", response_content)
# Emit turn.final_answer and task.completed to EQ (fallback path)
await _emit_event_safe(
event_queue,
TurnEventType.FINAL_ANSWER,
task_id=task_id,
session_id=conv.id,
data={"output": response_content},
)
await _emit_event_safe(
event_queue,
TaskEventType.TASK_COMPLETED,
task_id=task_id,
session_id=conv.id,
data={"output": response_content},
)
await websocket.send_json(
{
"type": "result",
@ -915,6 +1022,19 @@ async def portal_websocket(websocket: WebSocket):
):
if event.event_type == "final_answer":
collected_output.append(event.data.get("output", ""))
# Map ReAct event types to TurnEventType and emit to EQ
# (side-channel: failures are swallowed by _emit_event_safe)
_turn_event_type = _REACT_EVENT_TYPE_MAP.get(event.event_type)
if _turn_event_type is not None:
await _emit_event_safe(
event_queue,
_turn_event_type,
task_id=task_id,
session_id=conv.id,
data=event.data,
)
await websocket.send_json(
{
"type": "step",
@ -927,6 +1047,14 @@ async def portal_websocket(websocket: WebSocket):
}
)
except Exception as e:
# Emit task.failed to EQ before sending error to WebSocket
await _emit_event_safe(
event_queue,
TaskEventType.TASK_FAILED,
task_id=task_id,
session_id=conv.id,
data={"error": str(e)},
)
await websocket.send_json({"type": "error", "data": {"message": str(e)}})
continue
@ -936,6 +1064,25 @@ async def portal_websocket(websocket: WebSocket):
await _conversation_store.add_message(conv.id, "assistant", response_text)
outcome = "success" if response_text != EMPTY_LLM_RESPONSE else "failure"
# Emit task.completed (success) or task.failed (empty response) to EQ
if outcome == "success":
await _emit_event_safe(
event_queue,
TaskEventType.TASK_COMPLETED,
task_id=task_id,
session_id=conv.id,
data={"output": response_text},
)
else:
await _emit_event_safe(
event_queue,
TaskEventType.TASK_FAILED,
task_id=task_id,
session_id=conv.id,
data={"error": "Empty LLM response"},
)
await websocket.send_json(
{
"type": "result",
@ -956,6 +1103,17 @@ async def portal_websocket(websocket: WebSocket):
logger.debug(f"Portal WebSocket disconnected for conversation {conv.id if conv else 'N/A'}")
except Exception as e:
logger.error(f"Portal WebSocket error: {e}")
# Emit task.failed to EQ if a task was in progress
# (task_id is set when a user message is received; None before that)
if task_id is not None and conv is not None:
event_queue = getattr(websocket.app.state, "event_queue", None)
await _emit_event_safe(
event_queue,
TaskEventType.TASK_FAILED,
task_id=task_id,
session_id=conv.id,
data={"error": str(e)},
)
try:
await websocket.send_json({"type": "error", "data": {"message": str(e)}})
except Exception:

View File

@ -0,0 +1,516 @@
"""Tests for SQ/EQ integration into app.py and portal.py (Phase 4)
Test scenarios:
- app.py initializes event_queue and submission_queue on app.state
- portal WebSocket emits correct event types to EQ
- EQ emit failure does not break WebSocket flow
- Events contain correct session_id and task_id
- _emit_event_safe helper handles None EQ gracefully
"""
from __future__ import annotations
from unittest.mock import AsyncMock
import pytest
from agentkit.core.event_queue import EventQueue, SubmissionQueue
from agentkit.core.protocol import (
Event,
TaskEventType,
TurnEventType,
)
from agentkit.llm.gateway import LLMGateway
from agentkit.llm.protocol import LLMResponse, TokenUsage
from agentkit.server.app import create_app
from agentkit.server.routes.portal import _emit_event_safe
from agentkit.skills.registry import SkillRegistry
from agentkit.tools.registry import ToolRegistry
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def mock_llm_gateway():
gateway = LLMGateway()
mock_provider = AsyncMock()
mock_provider.chat.return_value = LLMResponse(
content="Hello from LLM",
model="test-model",
usage=TokenUsage(prompt_tokens=10, completion_tokens=20),
)
gateway.register_provider("test", mock_provider)
return gateway
@pytest.fixture
def skill_registry():
return SkillRegistry()
@pytest.fixture
def tool_registry():
return ToolRegistry()
@pytest.fixture
def app(mock_llm_gateway, skill_registry, tool_registry):
return create_app(
llm_gateway=mock_llm_gateway,
skill_registry=skill_registry,
tool_registry=tool_registry,
)
# ---------------------------------------------------------------------------
# app.py initialization tests
# ---------------------------------------------------------------------------
class TestAppInitialization:
"""Verify app.py initializes SQ/EQ on app.state."""
def test_app_has_event_queue(self, app):
"""app.state should have an EventQueue instance."""
assert hasattr(app.state, "event_queue")
assert isinstance(app.state.event_queue, EventQueue)
def test_app_has_submission_queue(self, app):
"""app.state should have a SubmissionQueue instance."""
assert hasattr(app.state, "submission_queue")
assert isinstance(app.state.submission_queue, SubmissionQueue)
def test_event_queue_initially_open(self, app):
"""EventQueue should be open (not closed) after app creation."""
assert app.state.event_queue.is_closed is False
def test_submission_queue_initially_open(self, app):
"""SubmissionQueue should be open (not closed) after app creation."""
assert app.state.submission_queue.is_closed is False
def test_event_queue_supports_subscribers(self, app):
"""EventQueue should start with zero subscribers."""
assert app.state.event_queue.subscriber_count == 0
# ---------------------------------------------------------------------------
# _emit_event_safe helper tests
# ---------------------------------------------------------------------------
class TestEmitEventSafe:
"""Verify the _emit_event_safe helper handles edge cases."""
@pytest.mark.asyncio
async def test_emit_with_none_queue_is_noop(self):
"""Emitting to None queue should be a no-op (no exception)."""
# No exception should be raised
await _emit_event_safe(
None,
TaskEventType.TASK_CREATED,
task_id="test-task",
session_id="test-session",
data={"message": "hello"},
)
@pytest.mark.asyncio
async def test_emit_with_valid_queue_emits_event(self):
"""Emitting to a valid EventQueue should emit the event."""
eq = EventQueue()
await _emit_event_safe(
eq,
TaskEventType.TASK_CREATED,
task_id="test-task",
session_id="test-session",
data={"message": "hello"},
)
# Buffer should contain the event
assert len(eq._buffer) == 1
event = eq._buffer[0]
assert event.event_type == TaskEventType.TASK_CREATED
assert event.task_id == "test-task"
assert event.session_id == "test-session"
assert event.data == {"message": "hello"}
@pytest.mark.asyncio
async def test_emit_with_failing_queue_does_not_raise(self):
"""Emit failures should be swallowed (never raise to caller)."""
# Create a broken EventQueue that raises on emit
class BrokenEventQueue(EventQueue):
async def emit(self, event: Event) -> None:
raise RuntimeError("Simulated EQ failure")
broken_eq = BrokenEventQueue()
# Should NOT raise — error is caught and logged
await _emit_event_safe(
broken_eq,
TaskEventType.TASK_CREATED,
task_id="test-task",
session_id="test-session",
)
@pytest.mark.asyncio
async def test_emit_with_none_data_uses_empty_dict(self):
"""Emitting with data=None should default to empty dict."""
eq = EventQueue()
await _emit_event_safe(
eq,
TaskEventType.TASK_STARTED,
task_id="t1",
session_id="s1",
data=None,
)
assert eq._buffer[0].data == {}
# ---------------------------------------------------------------------------
# Portal WebSocket EQ integration tests
# ---------------------------------------------------------------------------
class TestPortalWebSocketEQIntegration:
"""Verify portal WebSocket emits events to EQ alongside WebSocket messages.
NOTE: Starlette TestClient's sync WS client has known issues with disconnect
handling (see test_portal_routes.py::TestPortalWebSocket for skipped tests).
These tests verify EQ integration through the helper function and app state
rather than full end-to-end WS flows.
"""
@pytest.mark.asyncio
async def test_app_state_event_queue_is_functional(self, app):
"""The EventQueue on app.state should be functional (accept emits)."""
eq: EventQueue = app.state.event_queue
# Emit a task.created event as the WebSocket handler would
await _emit_event_safe(
eq,
TaskEventType.TASK_CREATED,
task_id="test-task-id",
session_id="test-session-id",
data={"message": "hello"},
)
# Verify the event was buffered
assert len(eq._buffer) == 1
event = eq._buffer[0]
assert event.event_type == "task.created"
assert event.task_id == "test-task-id"
assert event.session_id == "test-session-id"
@pytest.mark.asyncio
async def test_eq_emit_failure_does_not_break_app(self, app):
"""If EQ.emit() raises, the app should continue to function normally."""
# Replace EQ with a broken one that raises on emit
class BrokenEventQueue(EventQueue):
async def emit(self, event: Event) -> None:
raise RuntimeError("Simulated EQ failure")
app.state.event_queue = BrokenEventQueue()
# The _emit_event_safe helper should swallow the error
await _emit_event_safe(
app.state.event_queue,
TaskEventType.TASK_CREATED,
task_id="t1",
session_id="s1",
)
# App state should still be accessible
assert app.state.event_queue is not None
@pytest.mark.asyncio
async def test_event_queue_close_on_shutdown(self):
"""EventQueue should be closed when app shuts down."""
app = create_app(
llm_gateway=LLMGateway(),
skill_registry=SkillRegistry(),
tool_registry=ToolRegistry(),
)
eq = app.state.event_queue
sq = app.state.submission_queue
assert eq.is_closed is False
assert sq.is_closed is False
# Simulate lifespan shutdown by calling close directly
# (TestClient lifespan handling would also work but is heavier)
eq.close()
sq.close()
assert eq.is_closed is True
assert sq.is_closed is True
@pytest.mark.asyncio
async def test_full_task_lifecycle_events_simulated(self, app):
"""Simulate the full task lifecycle that the WebSocket handler would emit.
This verifies that the EventQueue correctly receives and buffers
the sequence of events emitted during a typical WebSocket chat flow.
"""
eq: EventQueue = app.state.event_queue
task_id = "simulated-task-123"
session_id = "simulated-session-456"
# Simulate the event sequence from portal_websocket handler
await _emit_event_safe(
eq, TaskEventType.TASK_CREATED, task_id, session_id, {"message": "hello"}
)
await _emit_event_safe(
eq,
TaskEventType.TASK_STARTED,
task_id,
session_id,
{"agent_name": "default", "execution_mode": "direct_chat"},
)
await _emit_event_safe(
eq,
TurnEventType.FINAL_ANSWER,
task_id,
session_id,
{"output": "Hello back!"},
)
await _emit_event_safe(
eq,
TaskEventType.TASK_COMPLETED,
task_id,
session_id,
{"output": "Hello back!"},
)
# Verify all 4 events were buffered in order
assert len(eq._buffer) == 4
event_types = [e.event_type for e in eq._buffer]
assert event_types == [
"task.created",
"task.started",
"turn.final_answer",
"task.completed",
]
# Verify all events carry the same task_id and session_id
for event in eq._buffer:
assert event.task_id == task_id
assert event.session_id == session_id
# ---------------------------------------------------------------------------
# Event type correctness tests
# ---------------------------------------------------------------------------
class TestEventTypes:
"""Verify event type constants are correctly used."""
@pytest.mark.asyncio
async def test_task_created_event_type(self):
"""task.created should map to TaskEventType.TASK_CREATED."""
eq = EventQueue()
await _emit_event_safe(
eq,
TaskEventType.TASK_CREATED,
task_id="t1",
session_id="s1",
data={"message": "test"},
)
event = eq._buffer[0]
assert event.event_type == "task.created"
@pytest.mark.asyncio
async def test_task_started_event_type(self):
"""task.started should map to TaskEventType.TASK_STARTED."""
eq = EventQueue()
await _emit_event_safe(
eq,
TaskEventType.TASK_STARTED,
task_id="t1",
session_id="s1",
)
event = eq._buffer[0]
assert event.event_type == "task.started"
@pytest.mark.asyncio
async def test_task_completed_event_type(self):
"""task.completed should map to TaskEventType.TASK_COMPLETED."""
eq = EventQueue()
await _emit_event_safe(
eq,
TaskEventType.TASK_COMPLETED,
task_id="t1",
session_id="s1",
data={"output": "done"},
)
event = eq._buffer[0]
assert event.event_type == "task.completed"
@pytest.mark.asyncio
async def test_task_failed_event_type(self):
"""task.failed should map to TaskEventType.TASK_FAILED."""
eq = EventQueue()
await _emit_event_safe(
eq,
TaskEventType.TASK_FAILED,
task_id="t1",
session_id="s1",
data={"error": "oops"},
)
event = eq._buffer[0]
assert event.event_type == "task.failed"
@pytest.mark.asyncio
async def test_turn_thinking_event_type(self):
"""turn.thinking should map to TurnEventType.THINKING."""
eq = EventQueue()
await _emit_event_safe(
eq,
TurnEventType.THINKING,
task_id="t1",
session_id="s1",
)
event = eq._buffer[0]
assert event.event_type == "turn.thinking"
@pytest.mark.asyncio
async def test_turn_tool_call_event_type(self):
"""turn.tool_call should map to TurnEventType.TOOL_CALL."""
eq = EventQueue()
await _emit_event_safe(
eq,
TurnEventType.TOOL_CALL,
task_id="t1",
session_id="s1",
)
event = eq._buffer[0]
assert event.event_type == "turn.tool_call"
@pytest.mark.asyncio
async def test_turn_tool_result_event_type(self):
"""turn.tool_result should map to TurnEventType.TOOL_RESULT."""
eq = EventQueue()
await _emit_event_safe(
eq,
TurnEventType.TOOL_RESULT,
task_id="t1",
session_id="s1",
)
event = eq._buffer[0]
assert event.event_type == "turn.tool_result"
@pytest.mark.asyncio
async def test_turn_final_answer_event_type(self):
"""turn.final_answer should map to TurnEventType.FINAL_ANSWER."""
eq = EventQueue()
await _emit_event_safe(
eq,
TurnEventType.FINAL_ANSWER,
task_id="t1",
session_id="s1",
)
event = eq._buffer[0]
assert event.event_type == "turn.final_answer"
# ---------------------------------------------------------------------------
# Event field correctness tests
# ---------------------------------------------------------------------------
class TestEventFields:
"""Verify events contain correct session_id and task_id."""
@pytest.mark.asyncio
async def test_event_carries_correct_task_id(self):
"""Emitted event should carry the provided task_id."""
eq = EventQueue()
expected_task_id = "unique-task-id-123"
await _emit_event_safe(
eq,
TaskEventType.TASK_CREATED,
task_id=expected_task_id,
session_id="s1",
)
assert eq._buffer[0].task_id == expected_task_id
@pytest.mark.asyncio
async def test_event_carries_correct_session_id(self):
"""Emitted event should carry the provided session_id."""
eq = EventQueue()
expected_session_id = "conv-id-456"
await _emit_event_safe(
eq,
TaskEventType.TASK_CREATED,
task_id="t1",
session_id=expected_session_id,
)
assert eq._buffer[0].session_id == expected_session_id
@pytest.mark.asyncio
async def test_event_has_iso_timestamp(self):
"""Emitted event should have a non-empty ISO 8601 timestamp."""
eq = EventQueue()
await _emit_event_safe(
eq,
TaskEventType.TASK_CREATED,
task_id="t1",
session_id="s1",
)
timestamp = eq._buffer[0].timestamp
assert isinstance(timestamp, str)
assert len(timestamp) > 0
# ISO 8601 format should contain 'T' separator
assert "T" in timestamp
@pytest.mark.asyncio
async def test_multiple_events_preserve_order(self):
"""Multiple events should be buffered in emission order."""
eq = EventQueue()
await _emit_event_safe(eq, TaskEventType.TASK_CREATED, "t1", "s1")
await _emit_event_safe(eq, TaskEventType.TASK_STARTED, "t1", "s1")
await _emit_event_safe(eq, TurnEventType.FINAL_ANSWER, "t1", "s1")
await _emit_event_safe(eq, TaskEventType.TASK_COMPLETED, "t1", "s1")
event_types = [e.event_type for e in eq._buffer]
assert event_types == [
"task.created",
"task.started",
"turn.final_answer",
"task.completed",
]
# ---------------------------------------------------------------------------
# SubmissionQueue integration tests
# ---------------------------------------------------------------------------
class TestSubmissionQueueIntegration:
"""Verify SubmissionQueue is properly initialized on app.state."""
@pytest.mark.asyncio
async def test_submission_queue_accepts_submissions(self, app):
"""SubmissionQueue on app.state should accept submissions."""
sq: SubmissionQueue = app.state.submission_queue
task_id = await sq.submit("hello", "session-1")
assert isinstance(task_id, str)
assert len(task_id) > 0
@pytest.mark.asyncio
async def test_submission_queue_close_marks_closed(self, app):
"""Closing SubmissionQueue should mark it as closed."""
sq: SubmissionQueue = app.state.submission_queue
assert sq.is_closed is False
sq.close()
assert sq.is_closed is True
@pytest.mark.asyncio
async def test_submission_queue_rejects_after_close(self, app):
"""Closed SubmissionQueue should reject new submissions."""
sq: SubmissionQueue = app.state.submission_queue
sq.close()
with pytest.raises(RuntimeError, match="SubmissionQueue is closed"):
await sq.submit("hello", "session-1")