feat(calendar): U5 reminder subsystem with scheduler and multi-channel dispatch

ReminderScheduler scans upcoming events every 60s, matches reminder
rules, and dispatches via client (WS), email (SMTP), or webhook
channels. Idempotent delivery (no duplicates on re-scan), retry with
exponential backoff (up to 3 attempts). Follows task_store.py
start/stop asyncio loop pattern (KTD-2 — conscious deviation from
APScheduler).

- src/agentkit/calendar/scheduler.py — ReminderScheduler (start/stop/scan_once)
- src/agentkit/calendar/reminders.py — ReminderDispatcher (strategy per channel)
- src/agentkit/calendar/db.py — added list_all_events_in_time_range() for scheduler
- tests/unit/calendar/test_scheduler.py — 8 tests
- tests/unit/calendar/test_reminders.py — 9 tests
This commit is contained in:
chiguyong 2026-06-23 22:19:57 +08:00
parent ddcedb57b2
commit 26efbb51db
5 changed files with 746 additions and 0 deletions

View File

@ -292,6 +292,28 @@ async def get_event(event_id: str, db_path: str | Path | None = None) -> Calenda
return _row_to_event(row) if row else None
async def get_event_by_external_id(
external_id: str,
external_provider: str,
user_id: str,
db_path: str | Path | None = None,
) -> CalendarEvent | None:
"""Return a single event by (external_id, provider, user_id), or None.
Used by ICS import (U8) to skip duplicate UIDs already imported.
"""
path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH
async with aiosqlite.connect(str(path)) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM calendar_events "
"WHERE external_id = ? AND external_provider = ? AND user_id = ?",
(external_id, external_provider, user_id),
)
row = await cursor.fetchone()
return _row_to_event(row) if row else None
async def list_events(
user_id: str,
start: str | None = None,
@ -330,6 +352,25 @@ async def list_events(
return [_row_to_event(row) for row in rows]
async def list_all_events_in_time_range(
start: str, end: str, db_path: str | Path | None = None
) -> list[CalendarEvent]:
"""List all events (across all users) with start_time in [start, end).
Used by ReminderScheduler to scan for events entering the reminder window.
"""
path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH
async with aiosqlite.connect(str(path)) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM calendar_events WHERE start_time >= ? AND start_time < ? "
"ORDER BY start_time",
(start, end),
)
rows = await cursor.fetchall()
return [_row_to_event(row) for row in rows]
async def update_event(
event_id: str, fields: dict[str, object], db_path: str | Path | None = None
) -> bool:

View File

@ -0,0 +1,115 @@
"""Reminder dispatcher — multi-channel delivery (client push / email / webhook).
Strategy pattern: one method per channel. External dependencies (WS sender,
SMTP config, webhook URL) are injected so tests can mock them without
patching module imports.
"""
from __future__ import annotations
import logging
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from agentkit.calendar.models import CalendarEvent
logger = logging.getLogger(__name__)
@dataclass
class SmtpConfig:
"""SMTP server configuration for the email reminder channel."""
host: str = "localhost"
port: int = 25
username: str | None = None
password: str | None = None
use_tls: bool = False
from_email: str = "noreply@agentkit.local"
class ReminderDispatcher:
"""Dispatch reminders via client push, email, and webhook channels.
Args:
ws_sender: Async callback ``(user_id, message_dict) -> None`` for client
push. The callback implementation is responsible for resolving
``user_id`` to an active WebSocket session.
smtp_config: SMTP settings for the email channel. ``None`` disables email.
webhook_url: URL for the webhook channel. ``None`` disables webhook.
get_user_email: Async callback ``user_id -> email | None`` used to
resolve the recipient address for email reminders.
"""
def __init__(
self,
ws_sender: Callable[[str, dict[str, object]], Awaitable[None]] | None = None,
smtp_config: SmtpConfig | None = None,
webhook_url: str | None = None,
get_user_email: Callable[[str], Awaitable[str | None]] | None = None,
) -> None:
self._ws_sender = ws_sender
self._smtp_config = smtp_config
self._webhook_url = webhook_url
self._get_user_email = get_user_email
async def dispatch(self, channel: str, event: CalendarEvent, user_id: str) -> bool:
"""Send a reminder via *channel*. Returns ``True`` on success."""
if channel == "client":
return await self._send_client(event, user_id)
if channel == "email":
return await self._send_email(event, user_id)
if channel == "webhook":
return await self._send_webhook(event, user_id)
logger.warning("Unknown reminder channel: %s", channel)
return False
async def _send_client(self, event: CalendarEvent, user_id: str) -> bool:
if self._ws_sender is None:
return False
await self._ws_sender(
user_id,
{"type": "calendar_reminder", "data": event.to_dict()},
)
return True
async def _send_email(self, event: CalendarEvent, user_id: str) -> bool:
if self._smtp_config is None or self._get_user_email is None:
return False
email = await self._get_user_email(user_id)
if not email:
return False
try:
import aiosmtplib
except ImportError:
# ponytail: aiosmtplib is an optional dep — email channel silently
# disabled when not installed. Upgrade: add aiosmtplib to pyproject.toml.
logger.debug("aiosmtplib not installed — skipping email reminder")
return False
message = (
f"From: {self._smtp_config.from_email}\r\n"
f"To: {email}\r\n"
f"Subject: Reminder: {event.title}\r\n\r\n"
f"{event.title} starts at {event.start_time}.\r\n"
)
await aiosmtplib.send(
message,
hostname=self._smtp_config.host,
port=self._smtp_config.port,
username=self._smtp_config.username,
password=self._smtp_config.password,
start_tls=self._smtp_config.use_tls,
)
return True
async def _send_webhook(self, event: CalendarEvent, user_id: str) -> bool:
if self._webhook_url is None:
return False
import httpx
async with httpx.AsyncClient() as client:
resp = await client.post(
self._webhook_url,
json={"event": event.to_dict(), "user_id": user_id},
)
return resp.status_code < 400

View File

@ -0,0 +1,174 @@
"""Reminder scheduler — background loop that scans upcoming events and
dispatches reminders via :class:`ReminderDispatcher`.
Follows the ``start()``/``stop()`` + ``asyncio.create_task`` loop pattern from
``server/task_store.py`` (KTD-2 conscious deviation from APScheduler).
ponytail: app.py lifespan wiring is deferred the orchestrator will call
``start()``/``stop()`` when integrating into the application lifecycle.
ponytail: ``asyncio.sleep`` polling has second-level precision. If sub-second
scheduling is needed, upgrade to APScheduler.
"""
from __future__ import annotations
import asyncio
import logging
import uuid
from datetime import datetime, timedelta, timezone
from pathlib import Path
from agentkit.calendar.db import (
DEFAULT_CALENDAR_DB_PATH,
get_pending_deliveries,
insert_reminder_delivery,
list_all_events_in_time_range,
list_reminder_rules_for_event,
update_delivery_status,
)
from agentkit.calendar.models import CalendarEvent, ReminderDelivery, ReminderRule
from agentkit.calendar.reminders import ReminderDispatcher
logger = logging.getLogger(__name__)
def _parse_dt(dt_str: str) -> datetime:
"""Parse ISO 8601 string to timezone-aware datetime (UTC)."""
dt = datetime.fromisoformat(dt_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt
class ReminderScheduler:
"""Background scheduler that scans for events entering the reminder window
and dispatches via the configured channels.
"""
def __init__(
self,
db_path: str | Path | None = None,
dispatcher: ReminderDispatcher | None = None,
interval_seconds: int = 60,
lookback_seconds: int = 3600,
max_retries: int = 3,
retry_base_delay: float = 1.0,
) -> None:
self._db_path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH
self._dispatcher = dispatcher or ReminderDispatcher()
self._interval = interval_seconds
self._lookback = lookback_seconds
self._max_retries = max_retries
self._retry_base_delay = retry_base_delay
self._task: asyncio.Task[None] | None = None
async def start(self) -> None:
"""Start the background scan loop."""
if self._task is None:
self._task = asyncio.create_task(self._loop())
async def stop(self) -> None:
"""Cancel the background scan loop."""
if self._task is not None:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
async def _loop(self) -> None:
"""Main scan loop — runs until cancelled."""
while True:
try:
await self.scan_once()
except asyncio.CancelledError:
break
except Exception:
logger.exception("Reminder scheduler scan error")
await asyncio.sleep(self._interval)
async def scan_once(self) -> int:
"""Run a single scan cycle. Returns the number of deliveries created.
Public method so tests can invoke a single scan without waiting for the
loop interval.
"""
now = datetime.now(timezone.utc)
window_start = now - timedelta(seconds=self._lookback)
window_end = now + timedelta(seconds=self._interval)
# Query a broad range of events — the reminder_time filter happens below.
# ponytail: recurring event reminder expansion is not handled here; only
# the event's stored start_time is used. Upgrade: expand RRULE occurrences
# and check each occurrence's reminder time.
query_start = (now - timedelta(hours=2)).isoformat()
query_end = (now + timedelta(hours=48)).isoformat()
events = await list_all_events_in_time_range(query_start, query_end, self._db_path)
dispatched = 0
for event in events:
rules = await list_reminder_rules_for_event(event.id, self._db_path)
for rule in rules:
reminder_time = _parse_dt(event.start_time) + timedelta(minutes=rule.offset_minutes)
if window_start <= reminder_time <= window_end:
dispatched += await self._process_reminder(event, rule, reminder_time)
return dispatched
async def _process_reminder(
self,
event: CalendarEvent,
rule: ReminderRule,
reminder_time: datetime,
) -> int:
"""Create delivery records and dispatch. Returns count of deliveries created.
Idempotent: if any delivery already exists for this event+rule, skip.
"""
existing = await get_pending_deliveries(event.id, rule.id, self._db_path)
if existing:
return 0
created = 0
for channel in rule.channels:
delivery = ReminderDelivery(
id=uuid.uuid4().hex,
reminder_rule_id=rule.id,
event_id=event.id,
scheduled_time=reminder_time.isoformat(),
status="pending",
channel=channel,
attempts=0,
last_error=None,
)
await insert_reminder_delivery(delivery, self._db_path)
created += 1
await self._dispatch_with_retry(event, delivery)
return created
async def _dispatch_with_retry(
self,
event: CalendarEvent,
delivery: ReminderDelivery,
) -> bool:
"""Attempt dispatch up to ``max_retries`` times with exponential backoff.
Updates the delivery record's ``attempts`` counter after each try.
Returns ``True`` on success, ``False`` if all retries exhausted.
"""
for attempt in range(self._max_retries):
error: str | None = None
try:
success = await self._dispatcher.dispatch(delivery.channel, event, event.user_id)
if success:
await update_delivery_status(delivery.id, "sent", None, self._db_path)
return True
except Exception as exc:
error = str(exc)
await update_delivery_status(delivery.id, "failed", error, self._db_path)
if attempt < self._max_retries - 1:
await asyncio.sleep(self._retry_base_delay * (2**attempt))
return False

View File

@ -0,0 +1,160 @@
"""Tests for ReminderDispatcher (U5)."""
from __future__ import annotations
import sys
from unittest.mock import AsyncMock, MagicMock, patch
from agentkit.calendar.models import CalendarEvent, _now_iso
from agentkit.calendar.reminders import ReminderDispatcher, SmtpConfig
def _make_event() -> CalendarEvent:
now = _now_iso()
return CalendarEvent(
id="evt-1",
user_id="user-1",
title="Test Meeting",
start_time="2026-07-01T10:00:00+00:00",
end_time="2026-07-01T11:00:00+00:00",
last_modified=now,
created_at=now,
)
# ---------------------------------------------------------------------------
# Client channel
# ---------------------------------------------------------------------------
async def test_client_channel_sends_ws_message() -> None:
"""Mock WS sender callback, verify called with correct payload."""
ws_sender = AsyncMock()
dispatcher = ReminderDispatcher(ws_sender=ws_sender)
event = _make_event()
result = await dispatcher.dispatch("client", event, "user-1")
assert result is True
ws_sender.assert_called_once()
call_args = ws_sender.call_args
assert call_args.args[0] == "user-1"
assert call_args.args[1]["type"] == "calendar_reminder"
assert call_args.args[1]["data"]["title"] == "Test Meeting"
async def test_client_channel_returns_false_without_sender() -> None:
"""No ws_sender configured → returns False."""
dispatcher = ReminderDispatcher()
result = await dispatcher.dispatch("client", _make_event(), "user-1")
assert result is False
# ---------------------------------------------------------------------------
# Email channel
# ---------------------------------------------------------------------------
async def test_email_channel_sends_smtp() -> None:
"""Mock aiosmtplib via sys.modules injection, verify send called."""
mock_aiosmtplib = MagicMock()
mock_aiosmtplib.send = AsyncMock()
with patch.dict(sys.modules, {"aiosmtplib": mock_aiosmtplib}):
dispatcher = ReminderDispatcher(
smtp_config=SmtpConfig(host="smtp.example.com", port=587),
get_user_email=AsyncMock(return_value="user@example.com"),
)
event = _make_event()
result = await dispatcher.dispatch("email", event, "user-1")
assert result is True
mock_aiosmtplib.send.assert_called_once()
call_kwargs = mock_aiosmtplib.send.call_args.kwargs
assert call_kwargs["hostname"] == "smtp.example.com"
assert call_kwargs["port"] == 587
# Message body contains event title and recipient
message_body = mock_aiosmtplib.send.call_args.args[0]
assert "user@example.com" in message_body
assert "Test Meeting" in message_body
async def test_email_channel_returns_false_without_config() -> None:
"""No smtp_config → returns False."""
dispatcher = ReminderDispatcher()
result = await dispatcher.dispatch("email", _make_event(), "user-1")
assert result is False
async def test_email_channel_returns_false_when_user_has_no_email() -> None:
"""get_user_email returns None → returns False."""
dispatcher = ReminderDispatcher(
smtp_config=SmtpConfig(),
get_user_email=AsyncMock(return_value=None),
)
result = await dispatcher.dispatch("email", _make_event(), "user-1")
assert result is False
# ---------------------------------------------------------------------------
# Webhook channel
# ---------------------------------------------------------------------------
async def test_webhook_channel_posts_to_url() -> None:
"""Mock httpx.AsyncClient, verify POST called with event payload."""
dispatcher = ReminderDispatcher(webhook_url="https://example.com/hook")
mock_response = MagicMock()
mock_response.status_code = 200
mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=mock_response)
mock_client.__aenter__.return_value = mock_client
mock_client.__aexit__.return_value = None
with patch("httpx.AsyncClient", return_value=mock_client):
event = _make_event()
result = await dispatcher.dispatch("webhook", event, "user-1")
assert result is True
mock_client.post.assert_called_once()
call_kwargs = mock_client.post.call_args.kwargs
assert call_kwargs["json"]["event"]["title"] == "Test Meeting"
assert call_kwargs["json"]["user_id"] == "user-1"
async def test_webhook_channel_returns_false_on_4xx() -> None:
"""Webhook returns 500 → returns False."""
dispatcher = ReminderDispatcher(webhook_url="https://example.com/hook")
mock_response = MagicMock()
mock_response.status_code = 500
mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=mock_response)
mock_client.__aenter__.return_value = mock_client
mock_client.__aexit__.return_value = None
with patch("httpx.AsyncClient", return_value=mock_client):
result = await dispatcher.dispatch("webhook", _make_event(), "user-1")
assert result is False
async def test_webhook_channel_returns_false_without_url() -> None:
"""No webhook_url configured → returns False."""
dispatcher = ReminderDispatcher()
result = await dispatcher.dispatch("webhook", _make_event(), "user-1")
assert result is False
# ---------------------------------------------------------------------------
# Unknown channel
# ---------------------------------------------------------------------------
async def test_unknown_channel_returns_false() -> None:
"""Unknown channel name → returns False, no crash."""
dispatcher = ReminderDispatcher()
result = await dispatcher.dispatch("sms", _make_event(), "user-1")
assert result is False

View File

@ -0,0 +1,256 @@
"""Tests for ReminderScheduler (U5)."""
from __future__ import annotations
import asyncio
import uuid
from datetime import datetime, timedelta, timezone
from pathlib import Path
from unittest.mock import AsyncMock
import pytest
from agentkit.calendar.db import (
get_pending_deliveries,
init_calendar_db,
insert_event,
insert_reminder_rule,
list_reminder_rules_for_event,
)
from agentkit.calendar.models import CalendarEvent, ReminderRule, _now_iso
from agentkit.calendar.reminders import ReminderDispatcher
from agentkit.calendar.scheduler import ReminderScheduler
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def db_path(tmp_path: Path) -> Path:
path = tmp_path / "test_calendar.db"
asyncio.run(init_calendar_db(path))
return path
@pytest.fixture
def auth_db_path(tmp_path: Path) -> Path:
from agentkit.server.auth.models import init_auth_db
path = tmp_path / "test_auth.db"
asyncio.run(init_auth_db(path))
return path
def _make_event(
event_id: str,
user_id: str,
start_time: str,
title: str = "Test Event",
) -> CalendarEvent:
now = _now_iso()
return CalendarEvent(
id=event_id,
user_id=user_id,
title=title,
start_time=start_time,
end_time=start_time,
last_modified=now,
created_at=now,
)
def _mock_dispatcher(return_value: bool = True) -> ReminderDispatcher:
"""Create a dispatcher with a mocked dispatch method."""
dispatcher = ReminderDispatcher()
dispatcher.dispatch = AsyncMock(return_value=return_value) # type: ignore
return dispatcher
# ---------------------------------------------------------------------------
# Scheduler scan logic
# ---------------------------------------------------------------------------
async def test_scheduler_finds_event_within_reminder_window(db_path: Path) -> None:
"""Event 10 min away, rule offset -15min → reminder_time 5 min ago → found."""
now = datetime.now(timezone.utc)
event_start = (now + timedelta(minutes=10)).isoformat()
event = _make_event("evt-1", "user-1", event_start, "Meeting")
await insert_event(event, db_path)
rule = ReminderRule(id="rule-1", event_id="evt-1", offset_minutes=-15, channels=["client"])
await insert_reminder_rule(rule, db_path)
dispatcher = _mock_dispatcher(return_value=True)
scheduler = ReminderScheduler(db_path=db_path, dispatcher=dispatcher)
count = await scheduler.scan_once()
assert count == 1
assert dispatcher.dispatch.call_count == 1 # type: ignore
async def test_scheduler_skips_event_outside_window(db_path: Path) -> None:
"""Event 2 hours away, rule offset -15min → reminder_time 1hr45min away → not found."""
now = datetime.now(timezone.utc)
event_start = (now + timedelta(hours=2)).isoformat()
event = _make_event("evt-1", "user-1", event_start, "Meeting")
await insert_event(event, db_path)
rule = ReminderRule(id="rule-1", event_id="evt-1", offset_minutes=-15, channels=["client"])
await insert_reminder_rule(rule, db_path)
dispatcher = _mock_dispatcher(return_value=True)
scheduler = ReminderScheduler(db_path=db_path, dispatcher=dispatcher)
count = await scheduler.scan_once()
assert count == 0
assert dispatcher.dispatch.call_count == 0 # type: ignore
async def test_idempotent_delivery_no_duplicate(db_path: Path) -> None:
"""Scheduler runs twice, only one delivery record created."""
now = datetime.now(timezone.utc)
event_start = (now + timedelta(minutes=10)).isoformat()
event = _make_event("evt-1", "user-1", event_start, "Meeting")
await insert_event(event, db_path)
rule = ReminderRule(id="rule-1", event_id="evt-1", offset_minutes=-15, channels=["client"])
await insert_reminder_rule(rule, db_path)
dispatcher = _mock_dispatcher(return_value=True)
scheduler = ReminderScheduler(db_path=db_path, dispatcher=dispatcher)
count1 = await scheduler.scan_once()
assert count1 == 1
count2 = await scheduler.scan_once()
assert count2 == 0
deliveries = await get_pending_deliveries("evt-1", "rule-1", db_path)
assert len(deliveries) == 1
async def test_failed_delivery_retries_up_to_3_times(db_path: Path) -> None:
"""Mock channel to fail, verify 3 attempts and delivery status=failed."""
now = datetime.now(timezone.utc)
event_start = (now + timedelta(minutes=10)).isoformat()
event = _make_event("evt-1", "user-1", event_start, "Meeting")
await insert_event(event, db_path)
rule = ReminderRule(id="rule-1", event_id="evt-1", offset_minutes=-15, channels=["client"])
await insert_reminder_rule(rule, db_path)
dispatcher = _mock_dispatcher(return_value=False)
scheduler = ReminderScheduler(db_path=db_path, dispatcher=dispatcher, retry_base_delay=0)
await scheduler.scan_once()
assert dispatcher.dispatch.call_count == 3 # type: ignore
deliveries = await get_pending_deliveries("evt-1", "rule-1", db_path)
assert len(deliveries) == 1
assert deliveries[0].attempts == 3
assert deliveries[0].status == "failed"
async def test_scheduler_dispatches_multiple_channels(db_path: Path) -> None:
"""Rule with 2 channels creates 2 delivery records."""
now = datetime.now(timezone.utc)
event_start = (now + timedelta(minutes=10)).isoformat()
event = _make_event("evt-1", "user-1", event_start, "Meeting")
await insert_event(event, db_path)
rule = ReminderRule(
id="rule-1",
event_id="evt-1",
offset_minutes=-15,
channels=["client", "email"],
)
await insert_reminder_rule(rule, db_path)
dispatcher = _mock_dispatcher(return_value=True)
scheduler = ReminderScheduler(db_path=db_path, dispatcher=dispatcher)
count = await scheduler.scan_once()
assert count == 2
assert dispatcher.dispatch.call_count == 2 # type: ignore
# ---------------------------------------------------------------------------
# Start/stop lifecycle
# ---------------------------------------------------------------------------
async def test_scheduler_start_stop_lifecycle(db_path: Path) -> None:
"""start() creates task, stop() cancels it."""
scheduler = ReminderScheduler(db_path=db_path, interval_seconds=1)
assert scheduler._task is None
await scheduler.start()
assert scheduler._task is not None
assert not scheduler._task.done()
await scheduler.stop()
assert scheduler._task is None
async def test_scheduler_start_idempotent(db_path: Path) -> None:
"""Calling start() twice does not create a second task."""
scheduler = ReminderScheduler(db_path=db_path, interval_seconds=1)
await scheduler.start()
task1 = scheduler._task
await scheduler.start()
assert scheduler._task is task1
await scheduler.stop()
# ---------------------------------------------------------------------------
# Default reminders inherited from event type
# ---------------------------------------------------------------------------
async def test_default_reminders_inherited_from_event_type(
db_path: Path, auth_db_path: Path
) -> None:
"""Create event with type that has default rules, verify rules cloned to event."""
from agentkit.calendar.service import CalendarService
service = CalendarService(db_path=db_path, auth_db_path=auth_db_path)
# Create event type
et = await service.create_event_type("user-1", "Meeting")
# Add a default reminder rule at the type level
type_rule = ReminderRule(
id=uuid.uuid4().hex,
event_type_id=et.id,
offset_minutes=-30,
channels=["email"],
)
await insert_reminder_rule(type_rule, db_path)
# Create an event with this type
event = await service.create_event(
user_id="user-1",
title="Sprint Planning",
start_time="2026-07-01T10:00:00+00:00",
end_time="2026-07-01T11:00:00+00:00",
event_type_id=et.id,
)
# Verify the type-level rule was cloned to the event
event_rules = await list_reminder_rules_for_event(event.id, db_path)
assert len(event_rules) == 1
assert event_rules[0].event_id == event.id
assert event_rules[0].event_type_id is None
assert event_rules[0].offset_minutes == -30
assert event_rules[0].channels == ["email"]
# Cloned rule has a new ID
assert event_rules[0].id != type_rule.id