feat(calendar): U2 backend service & REST API

Add CalendarService business logic layer and 14 REST endpoints:
- service.py: event CRUD with RRULE expansion, event types, tags,
  invitations, non-admin user search (G5/A3), type-level default
  reminder rule cloning
- routes/calendar.py: JWT-authenticated endpoints for events, types,
  tags, invitations, user search — with ownership checks
- 17 new tests (12 service + 5 routes), 33 total calendar tests passing
This commit is contained in:
chiguyong 2026-06-23 21:43:39 +08:00
parent 2ea799f6c4
commit d36e45bbe7
4 changed files with 1442 additions and 0 deletions

View File

@ -0,0 +1,332 @@
"""CalendarService — business-logic layer for calendar operations.
REST routes (U2) and agent tools are thin wrappers over this service.
The service dispatches to ``db`` functions for persistence and to
``recurrence`` for RRULE expansion.
"""
from __future__ import annotations
import dataclasses
import logging
import uuid
from datetime import datetime, timezone
from pathlib import Path
import aiosqlite
from agentkit.calendar.db import (
DEFAULT_CALENDAR_DB_PATH,
add_tag_to_event,
delete_event as db_delete_event,
get_event as db_get_event,
insert_event,
insert_event_type,
insert_invitation,
insert_reminder_rule,
insert_tag,
list_event_types as db_list_event_types,
list_events as db_list_events,
list_invitations as db_list_invitations,
list_reminder_rules_for_type,
list_tags as db_list_tags,
update_event as db_update_event,
update_event_type as db_update_event_type,
update_invitation_status,
)
from agentkit.calendar.models import (
CalendarEvent,
EventType,
Invitation,
Tag,
_now_iso,
)
from agentkit.calendar.recurrence import expand_rrule
from agentkit.server.auth.models import DEFAULT_AUTH_DB_PATH
logger = logging.getLogger(__name__)
def _parse_dt(dt_str: str) -> datetime:
"""Parse ISO 8601 string to timezone-aware datetime (UTC)."""
dt = datetime.fromisoformat(dt_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
def _format_dt(dt: datetime) -> str:
"""Format datetime as ISO 8601 UTC string."""
return dt.astimezone(timezone.utc).isoformat()
class CalendarService:
"""Create, query, and manage calendar events, types, tags, and invitations.
Mirrors ``DocumentService``: ``__init__`` stores a db_path, async methods
delegate to ``calendar.db`` functions. RRULE expansion is handled here
so routes and tools get a flat list of occurrences.
"""
def __init__(
self,
db_path: str | Path | None = None,
auth_db_path: str | Path | None = None,
) -> None:
self.db_path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH
self.auth_db_path = Path(auth_db_path) if auth_db_path is not None else DEFAULT_AUTH_DB_PATH
# ------------------------------------------------------------------
# Event CRUD
# ------------------------------------------------------------------
async def create_event(
self,
user_id: str,
title: str,
start_time: str,
end_time: str,
description: str = "",
location: str = "",
is_all_day: bool = False,
event_type_id: str | None = None,
rrule: str | None = None,
source: str = "manual",
is_invited: bool = False,
conversation_id: str | None = None,
tag_ids: list[str] | None = None,
) -> CalendarEvent:
"""Create a calendar event with UUID, timestamps, tags, and cloned reminders."""
now = _now_iso()
event = CalendarEvent(
id=uuid.uuid4().hex,
user_id=user_id,
title=title,
description=description,
start_time=start_time,
end_time=end_time,
is_all_day=is_all_day,
location=location,
event_type_id=event_type_id,
rrule=rrule,
source=source,
is_invited=is_invited,
conversation_id=conversation_id,
last_modified=now,
created_at=now,
)
await insert_event(event, self.db_path)
# Link tags if provided
if tag_ids:
for tag_id in tag_ids:
await add_tag_to_event(event.id, tag_id, self.db_path)
# Clone type-level default reminder rules to the event
if event_type_id:
type_rules = await list_reminder_rules_for_type(event_type_id, self.db_path)
for rule in type_rules:
cloned = dataclasses.replace(
rule,
id=uuid.uuid4().hex,
event_id=event.id,
event_type_id=None,
)
await insert_reminder_rule(cloned, self.db_path)
logger.info(f"Created event {event.id} ({title}) for user {user_id}")
return event
async def get_event(self, event_id: str) -> CalendarEvent | None:
"""Return a single event by id, or None."""
return await db_get_event(event_id, self.db_path)
async def list_events(
self,
user_id: str,
start: str | None = None,
end: str | None = None,
event_type_id: str | None = None,
tag_id: str | None = None,
) -> list[CalendarEvent]:
"""List events for a user, expanding recurring events within [start, end].
Non-recurring events are filtered by date range manually (not in the
DB query) so that recurring events whose first occurrence falls
outside the range are still included and expanded.
"""
# Fetch all events for the user with type/tag filters — no date filter
# at the DB level so recurring events are not excluded.
events = await db_list_events(
user_id,
event_type_id=event_type_id,
tag_id=tag_id,
db_path=self.db_path,
)
result: list[CalendarEvent] = []
for event in events:
if event.rrule:
# Expand recurring event within [start, end] range
occurrences = expand_rrule(
event.rrule,
event.start_time,
range_start=start,
range_end=end,
)
for occ_start_str in occurrences:
occ = self._make_occurrence(event, occ_start_str)
result.append(occ)
else:
# Non-recurring: filter by date range manually
if _is_in_range(event.start_time, start, end):
result.append(event)
# Sort by start_time for consistent ordering
result.sort(key=lambda e: e.start_time)
return result
def _make_occurrence(self, event: CalendarEvent, occ_start_str: str) -> CalendarEvent:
"""Create a copy of *event* with start/end times shifted to the occurrence."""
occ_start_dt = _parse_dt(occ_start_str)
original_start = _parse_dt(event.start_time)
original_end = _parse_dt(event.end_time)
duration = original_end - original_start
occ_end_dt = occ_start_dt + duration
return dataclasses.replace(
event,
start_time=_format_dt(occ_start_dt),
end_time=_format_dt(occ_end_dt),
)
async def update_event(self, event_id: str, fields: dict) -> bool:
"""Update specific fields of an event. Auto-updates last_modified."""
fields = {**fields, "last_modified": _now_iso()}
return await db_update_event(event_id, fields, self.db_path)
async def delete_event(self, event_id: str) -> bool:
"""Delete an event and its dependent rows."""
return await db_delete_event(event_id, self.db_path)
# ------------------------------------------------------------------
# Event Type CRUD
# ------------------------------------------------------------------
async def list_event_types(self, user_id: str) -> list[EventType]:
"""List all event types for a user."""
return await db_list_event_types(user_id, self.db_path)
async def create_event_type(
self,
user_id: str,
name: str,
color: str = "#4A90D9",
) -> EventType:
"""Create a new event type."""
et = EventType(
id=uuid.uuid4().hex,
user_id=user_id,
name=name,
color=color,
)
await insert_event_type(et, self.db_path)
return et
async def update_event_type(self, type_id: str, fields: dict) -> bool:
"""Update specific fields of an event type."""
return await db_update_event_type(type_id, fields, self.db_path)
# ------------------------------------------------------------------
# Tag CRUD
# ------------------------------------------------------------------
async def list_tags(self, user_id: str) -> list[Tag]:
"""List all tags for a user."""
return await db_list_tags(user_id, self.db_path)
async def create_tag(self, user_id: str, name: str) -> Tag:
"""Create a new tag."""
tag = Tag(
id=uuid.uuid4().hex,
user_id=user_id,
name=name,
)
await insert_tag(tag, self.db_path)
return tag
# ------------------------------------------------------------------
# Invitation CRUD
# ------------------------------------------------------------------
async def create_invitation(
self,
event_id: str,
inviter_user_id: str,
invitee_email: str,
) -> Invitation:
"""Create an invitation with status='pending'."""
invitation = Invitation(
id=uuid.uuid4().hex,
event_id=event_id,
inviter_user_id=inviter_user_id,
invitee_email=invitee_email,
status="pending",
)
await insert_invitation(invitation, self.db_path)
return invitation
async def respond_to_invitation(self, invitation_id: str, status: str) -> bool:
"""Update invitation status and set responded_at."""
return await update_invitation_status(
invitation_id,
status,
_now_iso(),
self.db_path,
)
async def list_invitations(self, invitee_email: str) -> list[Invitation]:
"""List all invitations for an invitee email."""
return await db_list_invitations(invitee_email, self.db_path)
# ------------------------------------------------------------------
# User search (auth DB)
# ------------------------------------------------------------------
async def search_users(self, q: str) -> list[dict]:
"""Search users by username or email. Returns top 10 matches.
Only ``username`` and ``email`` are returned never user_id or
password fields (G5/A3 least-privilege user search).
"""
pattern = f"%{q}%"
async with aiosqlite.connect(str(self.auth_db_path)) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT username, email FROM users WHERE username LIKE ? OR email LIKE ? LIMIT 10",
(pattern, pattern),
)
rows = await cursor.fetchall()
return [{"username": row["username"], "email": row["email"]} for row in rows]
async def get_user_email(self, user_id: str) -> str | None:
"""Look up a user's email from the auth DB by user_id."""
async with aiosqlite.connect(str(self.auth_db_path)) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT email FROM users WHERE id = ?",
(user_id,),
)
row = await cursor.fetchone()
return row["email"] if row else None
def _is_in_range(dt_str: str, start: str | None, end: str | None) -> bool:
"""Check if *dt_str* falls within the half-open range [start, end)."""
dt = _parse_dt(dt_str)
if start is not None:
if dt < _parse_dt(start):
return False
if end is not None:
if dt >= _parse_dt(end):
return False
return True

View File

@ -0,0 +1,399 @@
"""REST API routes for calendar operations (U2).
Thin wrapper over CalendarService. All business logic lives in the
service layer routes handle HTTP concerns (auth, request validation).
Endpoints (all under /api/v1/calendar):
- POST /calendar/events create event
- GET /calendar/events list with filters (start, end, type_id, tag_id)
- GET /calendar/events/{event_id} get single event
- PATCH /calendar/events/{event_id} update event
- DELETE /calendar/events/{event_id} delete event
- POST /calendar/events/{event_id}/invitations invite user by email
- POST /calendar/invitations/{invitation_id}/respond accept/decline/tentative
- GET /calendar/invitations list invitations for current user
- GET /calendar/users/search?q=xxx search users (G5/A3)
- GET /calendar/event-types list event types
- POST /calendar/event-types create event type
- PATCH /calendar/event-types/{type_id} update event type
- GET /calendar/tags list tags
- POST /calendar/tags create tag
"""
from __future__ import annotations
import logging
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from pydantic import BaseModel, Field
from agentkit.server.auth.dependencies import require_authenticated
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/calendar", tags=["calendar"])
_VALID_INVITATION_STATUSES = {"accepted", "declined", "tentative"}
# ---------------------------------------------------------------------------
# Service accessor
# ---------------------------------------------------------------------------
def _get_calendar_service(request: Request):
"""Get CalendarService from app.state. Raises 503 if not initialized."""
service = getattr(request.app.state, "calendar_service", None)
if service is None:
raise HTTPException(
status_code=503,
detail="Calendar service not available. Server may not have initialized it.",
)
return service
# ---------------------------------------------------------------------------
# Request / response models
# ---------------------------------------------------------------------------
class CreateEventRequest(BaseModel):
title: str
start_time: str
end_time: str
description: str = ""
location: str = ""
is_all_day: bool = False
event_type_id: str | None = None
rrule: str | None = None
tag_ids: list[str] = Field(default_factory=list)
class UpdateEventRequest(BaseModel):
title: str | None = None
start_time: str | None = None
end_time: str | None = None
description: str | None = None
location: str | None = None
is_all_day: bool | None = None
event_type_id: str | None = None
rrule: str | None = None
model_config = {"extra": "allow"}
class CreateEventTypeRequest(BaseModel):
name: str
color: str = "#4A90D9"
class UpdateEventTypeRequest(BaseModel):
name: str | None = None
color: str | None = None
is_default: bool | None = None
class CreateTagRequest(BaseModel):
name: str
class CreateInvitationRequest(BaseModel):
invitee_email: str
class RespondInvitationRequest(BaseModel):
status: str
# ---------------------------------------------------------------------------
# Event endpoints
# ---------------------------------------------------------------------------
@router.post("/events")
async def create_event(
body: CreateEventRequest,
request: Request,
user: dict = Depends(require_authenticated),
) -> dict[str, Any]:
"""Create a new calendar event."""
service = _get_calendar_service(request)
event = await service.create_event(
user_id=user["user_id"],
title=body.title,
start_time=body.start_time,
end_time=body.end_time,
description=body.description,
location=body.location,
is_all_day=body.is_all_day,
event_type_id=body.event_type_id,
rrule=body.rrule,
source="manual",
tag_ids=body.tag_ids,
)
return {"success": True, "event": event.to_dict()}
@router.get("/events")
async def list_events(
request: Request,
start: str | None = Query(None),
end: str | None = Query(None),
type_id: str | None = Query(None),
tag_id: str | None = Query(None),
user: dict = Depends(require_authenticated),
) -> dict[str, Any]:
"""List events for the current user with optional filters."""
service = _get_calendar_service(request)
events = await service.list_events(
user_id=user["user_id"],
start=start,
end=end,
event_type_id=type_id,
tag_id=tag_id,
)
return {
"success": True,
"events": [e.to_dict() for e in events],
"count": len(events),
}
@router.get("/events/{event_id}")
async def get_event(
event_id: str,
request: Request,
user: dict = Depends(require_authenticated),
) -> dict[str, Any]:
"""Get a single event by id."""
service = _get_calendar_service(request)
event = await service.get_event(event_id)
if event is None:
raise HTTPException(status_code=404, detail="Event not found")
if event.user_id != user["user_id"]:
raise HTTPException(status_code=403, detail="Access denied")
return {"success": True, "event": event.to_dict()}
@router.patch("/events/{event_id}")
async def update_event(
event_id: str,
body: UpdateEventRequest,
request: Request,
user: dict = Depends(require_authenticated),
) -> dict[str, Any]:
"""Update specific fields of an event."""
service = _get_calendar_service(request)
event = await service.get_event(event_id)
if event is None:
raise HTTPException(status_code=404, detail="Event not found")
if event.user_id != user["user_id"]:
raise HTTPException(status_code=403, detail="Access denied")
# Build fields dict from non-None values (extra fields allowed by model_config)
fields: dict[str, Any] = {
name: value
for name, value in body.model_dump(exclude_unset=True).items()
if value is not None
}
if not fields:
return {"success": True, "event": event.to_dict(), "updated": False}
updated = await service.update_event(event_id, fields)
refreshed = await service.get_event(event_id)
return {
"success": True,
"event": refreshed.to_dict() if refreshed else event.to_dict(),
"updated": updated,
}
@router.delete("/events/{event_id}")
async def delete_event(
event_id: str,
request: Request,
user: dict = Depends(require_authenticated),
) -> dict[str, Any]:
"""Delete an event."""
service = _get_calendar_service(request)
event = await service.get_event(event_id)
if event is None:
raise HTTPException(status_code=404, detail="Event not found")
if event.user_id != user["user_id"]:
raise HTTPException(status_code=403, detail="Access denied")
deleted = await service.delete_event(event_id)
return {"success": True, "deleted": deleted}
# ---------------------------------------------------------------------------
# Invitation endpoints
# ---------------------------------------------------------------------------
@router.post("/events/{event_id}/invitations")
async def create_invitation(
event_id: str,
body: CreateInvitationRequest,
request: Request,
user: dict = Depends(require_authenticated),
) -> dict[str, Any]:
"""Invite a user to an event by email."""
service = _get_calendar_service(request)
event = await service.get_event(event_id)
if event is None:
raise HTTPException(status_code=404, detail="Event not found")
if event.user_id != user["user_id"]:
raise HTTPException(status_code=403, detail="Only the event owner can invite")
invitation = await service.create_invitation(
event_id=event_id,
inviter_user_id=user["user_id"],
invitee_email=body.invitee_email,
)
return {"success": True, "invitation": invitation.to_dict()}
@router.post("/invitations/{invitation_id}/respond")
async def respond_to_invitation(
invitation_id: str,
body: RespondInvitationRequest,
request: Request,
user: dict = Depends(require_authenticated),
) -> dict[str, Any]:
"""Respond to an invitation (accept/decline/tentative)."""
if body.status not in _VALID_INVITATION_STATUSES:
raise HTTPException(
status_code=400,
detail=f"Invalid status. Must be one of: {sorted(_VALID_INVITATION_STATUSES)}",
)
service = _get_calendar_service(request)
updated = await service.respond_to_invitation(invitation_id, body.status)
if not updated:
raise HTTPException(status_code=404, detail="Invitation not found")
return {"success": True, "status": body.status}
@router.get("/invitations")
async def list_invitations(
request: Request,
user: dict = Depends(require_authenticated),
) -> dict[str, Any]:
"""List invitations for the current user (by email)."""
service = _get_calendar_service(request)
email = await service.get_user_email(user["user_id"])
if email is None:
return {"success": True, "invitations": [], "count": 0}
invitations = await service.list_invitations(email)
return {
"success": True,
"invitations": [inv.to_dict() for inv in invitations],
"count": len(invitations),
}
# ---------------------------------------------------------------------------
# User search endpoint (G5/A3)
# ---------------------------------------------------------------------------
@router.get("/users/search")
async def search_users(
request: Request,
q: str = Query(..., min_length=1),
user: dict = Depends(require_authenticated),
) -> dict[str, Any]:
"""Search users by username or email. Returns top 10 matches.
Only username and email are returned never user_id or password
fields (least-privilege, G5/A3).
"""
service = _get_calendar_service(request)
users = await service.search_users(q)
return {"success": True, "users": users, "count": len(users)}
# ---------------------------------------------------------------------------
# Event Type endpoints
# ---------------------------------------------------------------------------
@router.get("/event-types")
async def list_event_types(
request: Request,
user: dict = Depends(require_authenticated),
) -> dict[str, Any]:
"""List all event types for the current user."""
service = _get_calendar_service(request)
types = await service.list_event_types(user["user_id"])
return {
"success": True,
"event_types": [t.to_dict() for t in types],
"count": len(types),
}
@router.post("/event-types")
async def create_event_type(
body: CreateEventTypeRequest,
request: Request,
user: dict = Depends(require_authenticated),
) -> dict[str, Any]:
"""Create a new event type."""
service = _get_calendar_service(request)
et = await service.create_event_type(
user_id=user["user_id"],
name=body.name,
color=body.color,
)
return {"success": True, "event_type": et.to_dict()}
@router.patch("/event-types/{type_id}")
async def update_event_type(
type_id: str,
body: UpdateEventTypeRequest,
request: Request,
user: dict = Depends(require_authenticated),
) -> dict[str, Any]:
"""Update specific fields of an event type."""
service = _get_calendar_service(request)
fields: dict[str, Any] = {}
for name, value in body.model_dump(exclude_unset=True).items():
if value is not None:
fields[name] = value
if not fields:
return {"success": True, "updated": False}
updated = await service.update_event_type(type_id, fields)
return {"success": True, "updated": updated}
# ---------------------------------------------------------------------------
# Tag endpoints
# ---------------------------------------------------------------------------
@router.get("/tags")
async def list_tags(
request: Request,
user: dict = Depends(require_authenticated),
) -> dict[str, Any]:
"""List all tags for the current user."""
service = _get_calendar_service(request)
tags = await service.list_tags(user["user_id"])
return {"success": True, "tags": [t.to_dict() for t in tags], "count": len(tags)}
@router.post("/tags")
async def create_tag(
body: CreateTagRequest,
request: Request,
user: dict = Depends(require_authenticated),
) -> dict[str, Any]:
"""Create a new tag."""
service = _get_calendar_service(request)
tag = await service.create_tag(user_id=user["user_id"], name=body.name)
return {"success": True, "tag": tag.to_dict()}

View File

@ -0,0 +1,270 @@
"""Tests for calendar REST API routes (U2)."""
from __future__ import annotations
import asyncio
import uuid
from pathlib import Path
from typing import Any
import aiosqlite
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from agentkit.calendar.db import init_calendar_db
from agentkit.calendar.service import CalendarService
from agentkit.server.auth.dependencies import require_authenticated
from agentkit.server.auth.models import init_auth_db
from agentkit.server.routes import calendar as calendar_routes
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
TEST_USER_ID = "test-user-id"
TEST_USER_EMAIL = "testuser@example.com"
def _make_test_user() -> dict[str, Any]:
return {
"user_id": TEST_USER_ID,
"username": "testuser",
"role": "member",
}
async def _seed_auth_user(auth_db_path: Path) -> None:
"""Seed the test user into the auth DB so get_user_email works."""
async with aiosqlite.connect(str(auth_db_path)) as db:
await db.execute(
"INSERT INTO users (id, username, email, password_hash, role, is_active, "
"is_terminal_authorized, is_server_terminal_authorized, created_at, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
TEST_USER_ID,
"testuser",
TEST_USER_EMAIL,
"fake-hash",
"member",
1,
0,
0,
"2026-01-01T00:00:00+00:00",
"2026-01-01T00:00:00+00:00",
),
)
await db.commit()
async def _seed_searchable_users(auth_db_path: Path) -> None:
"""Seed extra users for search tests."""
for name in ("alice", "bob", "charlie"):
async with aiosqlite.connect(str(auth_db_path)) as db:
await db.execute(
"INSERT INTO users (id, username, email, password_hash, role, is_active, "
"is_terminal_authorized, is_server_terminal_authorized, created_at, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
uuid.uuid4().hex,
name,
f"{name}@example.com",
"fake-hash",
"member",
1,
0,
0,
"2026-01-01T00:00:00+00:00",
"2026-01-01T00:00:00+00:00",
),
)
await db.commit()
@pytest.fixture
def calendar_db_path(tmp_path: Path) -> Path:
path = tmp_path / "test_calendar.db"
asyncio.run(init_calendar_db(path))
return path
@pytest.fixture
def auth_db_path(tmp_path: Path) -> Path:
path = tmp_path / "test_auth.db"
asyncio.run(init_auth_db(path))
asyncio.run(_seed_auth_user(path))
return path
@pytest.fixture
def app(calendar_db_path: Path, auth_db_path: Path) -> FastAPI:
"""Create a test app with CalendarService and mock auth."""
service = CalendarService(db_path=calendar_db_path, auth_db_path=auth_db_path)
app = FastAPI()
app.state.calendar_service = service
app.state.auth_db_path = str(auth_db_path)
app.include_router(calendar_routes.router, prefix="/api/v1")
# Override auth dependency to return a test user
app.dependency_overrides[require_authenticated] = lambda: _make_test_user()
return app
@pytest.fixture
def client(app: FastAPI) -> TestClient:
return TestClient(app)
@pytest.fixture
def unauth_app(calendar_db_path: Path, auth_db_path: Path) -> FastAPI:
"""App without auth override — simulates unauthenticated requests."""
service = CalendarService(db_path=calendar_db_path, auth_db_path=auth_db_path)
app = FastAPI()
app.state.calendar_service = service
app.state.auth_db_path = str(auth_db_path)
app.include_router(calendar_routes.router, prefix="/api/v1")
# No dependency override → require_authenticated will see no current_user
return app
@pytest.fixture
def unauth_client(unauth_app: FastAPI) -> TestClient:
return TestClient(unauth_app)
# ---------------------------------------------------------------------------
# Auth requirement
# ---------------------------------------------------------------------------
def test_route_create_event_requires_auth(unauth_client: TestClient) -> None:
"""No auth → 401."""
resp = unauth_client.post(
"/api/v1/calendar/events",
json={
"title": "Test",
"start_time": "2026-07-01T10:00:00+00:00",
"end_time": "2026-07-01T11:00:00+00:00",
},
)
assert resp.status_code == 401
# ---------------------------------------------------------------------------
# Create event
# ---------------------------------------------------------------------------
def test_route_create_event_success(client: TestClient) -> None:
"""Create event via API returns 200 with event data."""
resp = client.post(
"/api/v1/calendar/events",
json={
"title": "Sprint Planning",
"start_time": "2026-07-01T10:00:00+00:00",
"end_time": "2026-07-01T11:00:00+00:00",
"description": "Bi-weekly sprint planning",
"location": "Room A",
},
)
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
event = data["event"]
assert event["title"] == "Sprint Planning"
assert event["start_time"] == "2026-07-01T10:00:00+00:00"
assert event["description"] == "Bi-weekly sprint planning"
assert event["location"] == "Room A"
assert event["source"] == "manual"
assert "id" in event
# ---------------------------------------------------------------------------
# List events
# ---------------------------------------------------------------------------
def test_route_list_events_returns_events(client: TestClient) -> None:
"""Create events, list via API returns them."""
for i in range(3):
client.post(
"/api/v1/calendar/events",
json={
"title": f"Event {i}",
"start_time": f"2026-07-{i + 1:02d}T10:00:00+00:00",
"end_time": f"2026-07-{i + 1:02d}T11:00:00+00:00",
},
)
resp = client.get("/api/v1/calendar/events")
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
assert data["count"] == 3
titles = {e["title"] for e in data["events"]}
assert titles == {"Event 0", "Event 1", "Event 2"}
# ---------------------------------------------------------------------------
# Tag filter via API (G2)
# ---------------------------------------------------------------------------
def test_route_list_events_filters_by_tag(client: TestClient) -> None:
"""G2 tag filter via API — only tagged events returned."""
# Create a tag first
tag_resp = client.post("/api/v1/calendar/tags", json={"name": "important"})
assert tag_resp.status_code == 200
tag_id = tag_resp.json()["tag"]["id"]
# Event 1: with tag
client.post(
"/api/v1/calendar/events",
json={
"title": "Tagged Event",
"start_time": "2026-07-01T10:00:00+00:00",
"end_time": "2026-07-01T11:00:00+00:00",
"tag_ids": [tag_id],
},
)
# Event 2: without tag
client.post(
"/api/v1/calendar/events",
json={
"title": "Untagged Event",
"start_time": "2026-07-02T10:00:00+00:00",
"end_time": "2026-07-02T11:00:00+00:00",
},
)
# List with tag filter
resp = client.get("/api/v1/calendar/events", params={"tag_id": tag_id})
assert resp.status_code == 200
data = resp.json()
assert data["count"] == 1
assert data["events"][0]["title"] == "Tagged Event"
# ---------------------------------------------------------------------------
# User search via API (G5)
# ---------------------------------------------------------------------------
def test_route_search_users(client: TestClient, auth_db_path: Path) -> None:
"""G5 user search via API returns matching users."""
asyncio.run(_seed_searchable_users(auth_db_path))
resp = client.get("/api/v1/calendar/users/search", params={"q": "ali"})
assert resp.status_code == 200
data = resp.json()
assert data["success"] is True
assert data["count"] == 1
assert data["users"][0]["username"] == "alice"
assert data["users"][0]["email"] == "alice@example.com"
# Must not expose sensitive fields
assert "id" not in data["users"][0]
assert "password_hash" not in data["users"][0]

View File

@ -0,0 +1,441 @@
"""Tests for CalendarService (U2)."""
from __future__ import annotations
import asyncio
import uuid
from pathlib import Path
import aiosqlite
import pytest
from agentkit.calendar.db import (
get_event_tags,
init_calendar_db,
insert_reminder_rule,
list_reminder_rules_for_event,
)
from agentkit.calendar.models import ReminderRule
from agentkit.calendar.service import CalendarService
from agentkit.server.auth.models import init_auth_db
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def calendar_db_path(tmp_path: Path) -> Path:
path = tmp_path / "test_calendar.db"
asyncio.run(init_calendar_db(path))
return path
@pytest.fixture
def auth_db_path(tmp_path: Path) -> Path:
path = tmp_path / "test_auth.db"
asyncio.run(init_auth_db(path))
return path
@pytest.fixture
def service(calendar_db_path: Path, auth_db_path: Path) -> CalendarService:
return CalendarService(db_path=calendar_db_path, auth_db_path=auth_db_path)
async def _seed_user(
auth_db_path: Path,
user_id: str | None = None,
username: str = "testuser",
email: str = "test@example.com",
) -> str:
"""Insert a user into the auth DB and return its id."""
uid = user_id or uuid.uuid4().hex
async with aiosqlite.connect(str(auth_db_path)) as db:
await db.execute(
"INSERT INTO users (id, username, email, password_hash, role, is_active, "
"is_terminal_authorized, is_server_terminal_authorized, created_at, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
uid,
username,
email,
"fake-hash",
"member",
1,
0,
0,
"2026-01-01T00:00:00+00:00",
"2026-01-01T00:00:00+00:00",
),
)
await db.commit()
return uid
# ---------------------------------------------------------------------------
# Event creation with type and tags
# ---------------------------------------------------------------------------
async def test_create_event_with_type_and_tags(
service: CalendarService, calendar_db_path: Path
) -> None:
"""Create event with type_id and 2 tags, verify all linked."""
user_id = "user-1"
# Create event type
et = await service.create_event_type(user_id, "Meeting", color="#FF0000")
# Create tags
tag1 = await service.create_tag(user_id, "urgent")
tag2 = await service.create_tag(user_id, "work")
# Create event with type and tags
event = await service.create_event(
user_id=user_id,
title="Sprint Planning",
start_time="2026-07-01T10:00:00+00:00",
end_time="2026-07-01T11:00:00+00:00",
event_type_id=et.id,
tag_ids=[tag1.id, tag2.id],
)
# Verify event was created
fetched = await service.get_event(event.id)
assert fetched is not None
assert fetched.title == "Sprint Planning"
assert fetched.event_type_id == et.id
# Verify tags are linked
tags = await get_event_tags(event.id, calendar_db_path)
tag_names = {t.name for t in tags}
assert tag_names == {"urgent", "work"}
# ---------------------------------------------------------------------------
# Date range filtering
# ---------------------------------------------------------------------------
async def test_list_events_filters_by_date_range(service: CalendarService) -> None:
"""3 events across days, filter returns correct subset."""
user_id = "user-1"
for i, day in enumerate([1, 15, 28]):
await service.create_event(
user_id=user_id,
title=f"Event Day {day}",
start_time=f"2026-07-{day:02d}T10:00:00+00:00",
end_time=f"2026-07-{day:02d}T11:00:00+00:00",
)
# Range covering only day 15
events = await service.list_events(
user_id=user_id,
start="2026-07-10T00:00:00+00:00",
end="2026-07-20T00:00:00+00:00",
)
assert len(events) == 1
assert events[0].title == "Event Day 15"
# ---------------------------------------------------------------------------
# Type and tag filter combination (G2)
# ---------------------------------------------------------------------------
async def test_list_events_filters_by_type_and_tag(
service: CalendarService, calendar_db_path: Path
) -> None:
"""Filter by both type_id and tag_id returns only matching events."""
user_id = "user-1"
et_meeting = await service.create_event_type(user_id, "Meeting")
et_personal = await service.create_event_type(user_id, "Personal")
tag_work = await service.create_tag(user_id, "work")
tag_family = await service.create_tag(user_id, "family")
# Event 1: Meeting + work
e1 = await service.create_event(
user_id=user_id,
title="Standup",
start_time="2026-07-01T09:00:00+00:00",
end_time="2026-07-01T09:30:00+00:00",
event_type_id=et_meeting.id,
tag_ids=[tag_work.id],
)
# Event 2: Personal + family
await service.create_event(
user_id=user_id,
title="Birthday",
start_time="2026-07-02T18:00:00+00:00",
end_time="2026-07-02T21:00:00+00:00",
event_type_id=et_personal.id,
tag_ids=[tag_family.id],
)
# Event 3: Meeting + family (cross combination)
await service.create_event(
user_id=user_id,
title="Team Dinner",
start_time="2026-07-03T19:00:00+00:00",
end_time="2026-07-03T21:00:00+00:00",
event_type_id=et_meeting.id,
tag_ids=[tag_family.id],
)
# Filter by type=Meeting AND tag=work → only Event 1
events = await service.list_events(
user_id=user_id,
event_type_id=et_meeting.id,
tag_id=tag_work.id,
)
assert len(events) == 1
assert events[0].id == e1.id
# ---------------------------------------------------------------------------
# Tag-only filter (G2)
# ---------------------------------------------------------------------------
async def test_list_events_filters_by_tag_only(
service: CalendarService, calendar_db_path: Path
) -> None:
"""5 events, 2 with tag X, filter tag_id=X returns only 2."""
user_id = "user-1"
tag_x = await service.create_tag(user_id, "X")
tagged_ids: list[str] = []
for i in range(5):
tag_ids = [tag_x.id] if i < 2 else None
event = await service.create_event(
user_id=user_id,
title=f"Event {i}",
start_time=f"2026-07-{i + 1:02d}T10:00:00+00:00",
end_time=f"2026-07-{i + 1:02d}T11:00:00+00:00",
tag_ids=tag_ids,
)
if i < 2:
tagged_ids.append(event.id)
events = await service.list_events(user_id=user_id, tag_id=tag_x.id)
assert len(events) == 2
returned_ids = {e.id for e in events}
assert returned_ids == set(tagged_ids)
# ---------------------------------------------------------------------------
# Recurring event expansion
# ---------------------------------------------------------------------------
async def test_list_events_expands_recurring(service: CalendarService) -> None:
"""Event with FREQ=DAILY;COUNT=3, list range covering 2 days → 2 occurrences."""
user_id = "user-1"
await service.create_event(
user_id=user_id,
title="Daily Standup",
start_time="2026-07-01T10:00:00+00:00",
end_time="2026-07-01T10:15:00+00:00",
rrule="FREQ=DAILY;COUNT=3",
)
# Range covers Jul 1 and Jul 2 (half-open [Jul 1, Jul 3))
events = await service.list_events(
user_id=user_id,
start="2026-07-01T00:00:00+00:00",
end="2026-07-03T00:00:00+00:00",
)
assert len(events) == 2
# First occurrence on Jul 1
assert events[0].start_time.startswith("2026-07-01")
assert events[0].end_time.startswith("2026-07-01")
# Second occurrence on Jul 2
assert events[1].start_time.startswith("2026-07-02")
assert events[1].end_time.startswith("2026-07-02")
# Duration preserved (15 minutes)
assert "T10:00:00" in events[0].start_time
assert "T10:15:00" in events[0].end_time
assert "T10:00:00" in events[1].start_time
assert "T10:15:00" in events[1].end_time
# ---------------------------------------------------------------------------
# Partial update
# ---------------------------------------------------------------------------
async def test_update_event_partial_fields(service: CalendarService) -> None:
"""PATCH only title, other fields unchanged."""
event = await service.create_event(
user_id="user-1",
title="Original Title",
start_time="2026-07-01T10:00:00+00:00",
end_time="2026-07-01T11:00:00+00:00",
description="Original description",
location="Room A",
)
original = await service.get_event(event.id)
assert original is not None
updated = await service.update_event(event.id, {"title": "New Title"})
assert updated is True
refreshed = await service.get_event(event.id)
assert refreshed is not None
assert refreshed.title == "New Title"
# Other fields unchanged
assert refreshed.description == "Original description"
assert refreshed.location == "Room A"
assert refreshed.start_time == original.start_time
assert refreshed.end_time == original.end_time
# last_modified should be updated
assert refreshed.last_modified != original.last_modified
# ---------------------------------------------------------------------------
# Delete cascade
# ---------------------------------------------------------------------------
async def test_delete_event_cascades_reminders_and_tags(
service: CalendarService, calendar_db_path: Path
) -> None:
"""Delete event, verify reminder rules and junction rows removed."""
user_id = "user-1"
tag = await service.create_tag(user_id, "work")
event = await service.create_event(
user_id=user_id,
title="To Delete",
start_time="2026-07-01T10:00:00+00:00",
end_time="2026-07-01T11:00:00+00:00",
tag_ids=[tag.id],
)
# Add a reminder rule directly to the event
rule = ReminderRule(
id=uuid.uuid4().hex,
event_id=event.id,
offset_minutes=-30,
channels=["email"],
)
await insert_reminder_rule(rule, calendar_db_path)
# Verify rule and tag exist
rules_before = await list_reminder_rules_for_event(event.id, calendar_db_path)
assert len(rules_before) == 1
tags_before = await get_event_tags(event.id, calendar_db_path)
assert len(tags_before) == 1
# Delete the event
deleted = await service.delete_event(event.id)
assert deleted is True
# Verify event is gone
assert await service.get_event(event.id) is None
# Verify reminder rules are gone (cascade)
rules_after = await list_reminder_rules_for_event(event.id, calendar_db_path)
assert len(rules_after) == 0
# Verify junction rows are gone
tags_after = await get_event_tags(event.id, calendar_db_path)
assert len(tags_after) == 0
# Tag itself should still exist (only the junction is removed)
all_tags = await service.list_tags(user_id)
assert len(all_tags) == 1
# ---------------------------------------------------------------------------
# Invitation flow
# ---------------------------------------------------------------------------
async def test_create_invitation_and_respond(service: CalendarService) -> None:
"""Invite, respond 'accepted', verify status + responded_at set."""
event = await service.create_event(
user_id="user-1",
title="Team Meeting",
start_time="2026-07-01T10:00:00+00:00",
end_time="2026-07-01T11:00:00+00:00",
)
invitation = await service.create_invitation(
event_id=event.id,
inviter_user_id="user-1",
invitee_email="alice@example.com",
)
assert invitation.status == "pending"
assert invitation.responded_at is None
updated = await service.respond_to_invitation(invitation.id, "accepted")
assert updated is True
invitations = await service.list_invitations("alice@example.com")
assert len(invitations) == 1
assert invitations[0].status == "accepted"
assert invitations[0].responded_at is not None
# ---------------------------------------------------------------------------
# User search (G5)
# ---------------------------------------------------------------------------
async def test_search_users_by_username(service: CalendarService, auth_db_path: Path) -> None:
"""Seed users in auth DB, search returns matches."""
await _seed_user(auth_db_path, username="alice", email="alice@example.com")
await _seed_user(auth_db_path, username="bob", email="bob@example.com")
await _seed_user(auth_db_path, username="charlie", email="charlie@example.com")
results = await service.search_users("ali")
assert len(results) == 1
assert results[0]["username"] == "alice"
assert results[0]["email"] == "alice@example.com"
# Must NOT return user_id or password fields
assert "id" not in results[0]
assert "password_hash" not in results[0]
async def test_search_users_no_match_returns_empty(
service: CalendarService, auth_db_path: Path
) -> None:
"""Search 'zzz' returns []."""
await _seed_user(auth_db_path, username="alice", email="alice@example.com")
results = await service.search_users("zzz")
assert results == []
async def test_search_users_returns_max_10(service: CalendarService, auth_db_path: Path) -> None:
"""Seed 15 matching users, verify only 10 returned."""
for i in range(15):
await _seed_user(
auth_db_path,
username=f"user{i:02d}",
email=f"user{i:02d}@example.com",
)
results = await service.search_users("user")
assert len(results) == 10
# ---------------------------------------------------------------------------
# Event type color persistence
# ---------------------------------------------------------------------------
async def test_event_type_default_color_persistence(service: CalendarService) -> None:
"""Create type with color, verify roundtrip."""
et = await service.create_event_type("user-1", "Important", color="#00FF00")
assert et.color == "#00FF00"
types = await service.list_event_types("user-1")
assert len(types) == 1
assert types[0].color == "#00FF00"
assert types[0].name == "Important"