"""SQLite persistence for calendar data. Follows the aiosqlite bare-connection pattern from ``documents/db.py``: no SQLAlchemy session injection, just ``async with aiosqlite.connect(...)``. All timestamps are ISO 8601 UTC (see KTD-11). """ from __future__ import annotations import json import logging import os from collections.abc import Mapping from pathlib import Path import aiosqlite from agentkit.calendar.models import ( CalendarEvent, EventType, ExternalCalendarConfig, Invitation, ReminderDelivery, ReminderRule, Tag, ) logger = logging.getLogger(__name__) _PROJECT_ROOT = Path(__file__).parents[3] DEFAULT_CALENDAR_DB_PATH = Path( os.environ.get("AGENTKIT_CALENDAR_DB", _PROJECT_ROOT / "data" / "calendar.db") ) _SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS calendar_event_types ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, name TEXT NOT NULL, color TEXT NOT NULL DEFAULT '#4A90D9', is_default INTEGER NOT NULL DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_event_types_user ON calendar_event_types(user_id); CREATE TABLE IF NOT EXISTS calendar_tags ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, name TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_tags_user ON calendar_tags(user_id); CREATE TABLE IF NOT EXISTS calendar_events ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, title TEXT NOT NULL, description TEXT NOT NULL DEFAULT '', start_time TEXT NOT NULL, end_time TEXT NOT NULL, is_all_day INTEGER NOT NULL DEFAULT 0, location TEXT NOT NULL DEFAULT '', event_type_id TEXT, rrule TEXT, source TEXT NOT NULL DEFAULT 'manual', is_invited INTEGER NOT NULL DEFAULT 0, conversation_id TEXT, external_id TEXT, external_provider TEXT, last_modified TEXT NOT NULL, created_at TEXT NOT NULL, FOREIGN KEY (event_type_id) REFERENCES calendar_event_types(id) ); CREATE INDEX IF NOT EXISTS idx_events_user ON calendar_events(user_id); CREATE INDEX IF NOT EXISTS idx_events_start_time ON calendar_events(start_time); CREATE INDEX IF NOT EXISTS idx_events_external ON calendar_events(external_id, external_provider); CREATE TABLE IF NOT EXISTS calendar_event_tags ( event_id TEXT NOT NULL, tag_id TEXT NOT NULL, PRIMARY KEY (event_id, tag_id), FOREIGN KEY (event_id) REFERENCES calendar_events(id) ON DELETE CASCADE, FOREIGN KEY (tag_id) REFERENCES calendar_tags(id) ); CREATE TABLE IF NOT EXISTS calendar_reminder_rules ( id TEXT PRIMARY KEY, event_id TEXT, event_type_id TEXT, offset_minutes INTEGER NOT NULL DEFAULT -15, channels TEXT NOT NULL DEFAULT '["client"]', FOREIGN KEY (event_id) REFERENCES calendar_events(id) ON DELETE CASCADE, FOREIGN KEY (event_type_id) REFERENCES calendar_event_types(id) ); CREATE TABLE IF NOT EXISTS calendar_reminder_deliveries ( id TEXT PRIMARY KEY, reminder_rule_id TEXT NOT NULL, event_id TEXT NOT NULL, scheduled_time TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', channel TEXT NOT NULL DEFAULT 'client', attempts INTEGER NOT NULL DEFAULT 0, last_error TEXT, -- ponytail: ON DELETE CASCADE ensures deliveries are removed when their -- reminder_rule is cascade-deleted (e.g. event deletion). Existing DBs -- created before this change need ALTER TABLE or DB recreation. FOREIGN KEY (reminder_rule_id) REFERENCES calendar_reminder_rules(id) ON DELETE CASCADE, FOREIGN KEY (event_id) REFERENCES calendar_events(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_deliveries_status ON calendar_reminder_deliveries(status, scheduled_time); CREATE TABLE IF NOT EXISTS calendar_external_configs ( id TEXT PRIMARY KEY, user_id TEXT NOT NULL, provider TEXT NOT NULL, credentials TEXT NOT NULL DEFAULT '', sync_frequency INTEGER NOT NULL DEFAULT 30, sync_scope TEXT NOT NULL DEFAULT '[]', last_sync TEXT, sync_token TEXT ); CREATE INDEX IF NOT EXISTS idx_external_configs_user ON calendar_external_configs(user_id); CREATE TABLE IF NOT EXISTS calendar_invitations ( id TEXT PRIMARY KEY, event_id TEXT NOT NULL, inviter_user_id TEXT NOT NULL, invitee_email TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'pending', responded_at TEXT, FOREIGN KEY (event_id) REFERENCES calendar_events(id) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS idx_invitations_email ON calendar_invitations(invitee_email, status); """ async def init_calendar_db(db_path: str | Path | None = None) -> Path: """Create all calendar tables if they do not exist. Idempotent.""" path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH path.parent.mkdir(parents=True, exist_ok=True) async with aiosqlite.connect(str(path)) as db: db.row_factory = aiosqlite.Row await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") await db.executescript(_SCHEMA_SQL) await db.commit() logger.info(f"Calendar DB initialized at {path}") return path # --------------------------------------------------------------------------- # Row → dataclass converters # --------------------------------------------------------------------------- def _row_to_event_type(row: aiosqlite.Row | Mapping[str, object]) -> EventType: return EventType( id=row["id"], user_id=row["user_id"], name=row["name"], color=row["color"], is_default=bool(row["is_default"]), ) def _row_to_tag(row: aiosqlite.Row | Mapping[str, object]) -> Tag: return Tag(id=row["id"], user_id=row["user_id"], name=row["name"]) def _row_to_event(row: aiosqlite.Row | Mapping[str, object]) -> CalendarEvent: return CalendarEvent( id=row["id"], user_id=row["user_id"], title=row["title"], description=row["description"], start_time=row["start_time"], end_time=row["end_time"], is_all_day=bool(row["is_all_day"]), location=row["location"], event_type_id=row["event_type_id"], rrule=row["rrule"], source=row["source"], is_invited=bool(row["is_invited"]), conversation_id=row["conversation_id"], external_id=row["external_id"], external_provider=row["external_provider"], last_modified=row["last_modified"], created_at=row["created_at"], ) def _row_to_reminder_rule(row: aiosqlite.Row | Mapping[str, object]) -> ReminderRule: return ReminderRule( id=row["id"], event_id=row["event_id"], event_type_id=row["event_type_id"], offset_minutes=row["offset_minutes"], channels=json.loads(row["channels"]), ) def _row_to_reminder_delivery(row: aiosqlite.Row | Mapping[str, object]) -> ReminderDelivery: return ReminderDelivery( id=row["id"], reminder_rule_id=row["reminder_rule_id"], event_id=row["event_id"], scheduled_time=row["scheduled_time"], status=row["status"], channel=row["channel"], attempts=row["attempts"], last_error=row["last_error"], ) def _row_to_external_config(row: aiosqlite.Row | Mapping[str, object]) -> ExternalCalendarConfig: return ExternalCalendarConfig( id=row["id"], user_id=row["user_id"], provider=row["provider"], credentials=row["credentials"], sync_frequency=row["sync_frequency"], sync_scope=json.loads(row["sync_scope"]), last_sync=row["last_sync"], sync_token=row["sync_token"], ) def _row_to_invitation(row: aiosqlite.Row | Mapping[str, object]) -> Invitation: return Invitation( id=row["id"], event_id=row["event_id"], inviter_user_id=row["inviter_user_id"], invitee_email=row["invitee_email"], status=row["status"], responded_at=row["responded_at"], ) # --------------------------------------------------------------------------- # Event CRUD # --------------------------------------------------------------------------- async def insert_event(event: CalendarEvent, db_path: str | Path | None = None) -> None: """Insert a calendar event.""" path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") await db.execute( "INSERT INTO calendar_events (id, user_id, title, description, " "start_time, end_time, is_all_day, location, event_type_id, rrule, " "source, is_invited, conversation_id, external_id, external_provider, " "last_modified, created_at) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ( event.id, event.user_id, event.title, event.description, event.start_time, event.end_time, int(event.is_all_day), event.location, event.event_type_id, event.rrule, event.source, int(event.is_invited), event.conversation_id, event.external_id, event.external_provider, event.last_modified, event.created_at, ), ) await db.commit() async def get_event(event_id: str, db_path: str | Path | None = None) -> CalendarEvent | None: """Return a single event by id, or None.""" path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") db.row_factory = aiosqlite.Row cursor = await db.execute("SELECT * FROM calendar_events WHERE id = ?", (event_id,)) row = await cursor.fetchone() return _row_to_event(row) if row else None async def get_event_by_external_id( external_id: str, external_provider: str, user_id: str, db_path: str | Path | None = None, ) -> CalendarEvent | None: """Return a single event by (external_id, provider, user_id), or None. Used by ICS import (U8) to skip duplicate UIDs already imported. """ path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM calendar_events " "WHERE external_id = ? AND external_provider = ? AND user_id = ?", (external_id, external_provider, user_id), ) row = await cursor.fetchone() return _row_to_event(row) if row else None async def list_events( user_id: str, start: str | None = None, end: str | None = None, event_type_id: str | None = None, tag_id: str | None = None, db_path: str | Path | None = None, ) -> list[CalendarEvent]: """List events for a user with optional filters.""" path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH query = "SELECT DISTINCT e.* FROM calendar_events e" params: list[str] = [] conditions: list[str] = ["e.user_id = ?"] params.append(user_id) if start is not None: conditions.append("e.start_time >= ?") params.append(start) if end is not None: conditions.append("e.start_time < ?") params.append(end) if event_type_id is not None: conditions.append("e.event_type_id = ?") params.append(event_type_id) if tag_id is not None: query += " JOIN calendar_event_tags et ON et.event_id = e.id" conditions.append("et.tag_id = ?") params.append(tag_id) query += " WHERE " + " AND ".join(conditions) + " ORDER BY e.start_time" async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") db.row_factory = aiosqlite.Row cursor = await db.execute(query, tuple(params)) rows = await cursor.fetchall() return [_row_to_event(row) for row in rows] async def list_all_events_in_time_range( start: str, end: str, db_path: str | Path | None = None ) -> list[CalendarEvent]: """List all events (across all users) with start_time in [start, end). Used by ReminderScheduler to scan for events entering the reminder window. """ path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM calendar_events WHERE start_time >= ? AND start_time < ? " "ORDER BY start_time", (start, end), ) rows = await cursor.fetchall() return [_row_to_event(row) for row in rows] async def update_event( event_id: str, fields: dict[str, object], db_path: str | Path | None = None ) -> bool: """Update specific fields of an event. Returns True if a row was updated.""" path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH # Map dataclass field names to column names, handle bool → int column_map = { "title": "title", "description": "description", "start_time": "start_time", "end_time": "end_time", "is_all_day": "is_all_day", "location": "location", "event_type_id": "event_type_id", "rrule": "rrule", "source": "source", "is_invited": "is_invited", "conversation_id": "conversation_id", "external_id": "external_id", "external_provider": "external_provider", "last_modified": "last_modified", } set_clauses: list[str] = [] params: list[object] = [] for field_name, value in fields.items(): col = column_map.get(field_name) if col is None: continue if field_name in ("is_all_day", "is_invited"): value = int(bool(value)) set_clauses.append(f"{col} = ?") params.append(value) if not set_clauses: return False params.append(event_id) sql = f"UPDATE calendar_events SET {', '.join(set_clauses)} WHERE id = ?" async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") cursor = await db.execute(sql, tuple(params)) await db.commit() return cursor.rowcount > 0 async def delete_event(event_id: str, db_path: str | Path | None = None) -> bool: """Delete an event and its dependent rows. Returns True if deleted.""" path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") # Manual cascade for event_tags (no ON DELETE on junction FK in some SQLite versions) await db.execute("DELETE FROM calendar_event_tags WHERE event_id = ?", (event_id,)) cursor = await db.execute("DELETE FROM calendar_events WHERE id = ?", (event_id,)) await db.commit() return cursor.rowcount > 0 # --------------------------------------------------------------------------- # Event Type CRUD # --------------------------------------------------------------------------- async def insert_event_type(et: EventType, db_path: str | Path | None = None) -> None: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") await db.execute( "INSERT INTO calendar_event_types (id, user_id, name, color, is_default) " "VALUES (?, ?, ?, ?, ?)", (et.id, et.user_id, et.name, et.color, int(et.is_default)), ) await db.commit() async def list_event_types(user_id: str, db_path: str | Path | None = None) -> list[EventType]: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM calendar_event_types WHERE user_id = ? ORDER BY name", (user_id,), ) rows = await cursor.fetchall() return [_row_to_event_type(row) for row in rows] async def update_event_type( type_id: str, fields: dict[str, object], db_path: str | Path | None = None ) -> bool: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH set_clauses: list[str] = [] params: list[object] = [] for field_name, value in fields.items(): if field_name == "name": set_clauses.append("name = ?") params.append(value) elif field_name == "color": set_clauses.append("color = ?") params.append(value) elif field_name == "is_default": set_clauses.append("is_default = ?") params.append(int(bool(value))) if not set_clauses: return False params.append(type_id) sql = f"UPDATE calendar_event_types SET {', '.join(set_clauses)} WHERE id = ?" async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") cursor = await db.execute(sql, tuple(params)) await db.commit() return cursor.rowcount > 0 async def delete_event_type(type_id: str, db_path: str | Path | None = None) -> bool: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") cursor = await db.execute("DELETE FROM calendar_event_types WHERE id = ?", (type_id,)) await db.commit() return cursor.rowcount > 0 # --------------------------------------------------------------------------- # Tag CRUD # --------------------------------------------------------------------------- async def insert_tag(tag: Tag, db_path: str | Path | None = None) -> None: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") await db.execute( "INSERT INTO calendar_tags (id, user_id, name) VALUES (?, ?, ?)", (tag.id, tag.user_id, tag.name), ) await db.commit() async def list_tags(user_id: str, db_path: str | Path | None = None) -> list[Tag]: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM calendar_tags WHERE user_id = ? ORDER BY name", (user_id,), ) rows = await cursor.fetchall() return [_row_to_tag(row) for row in rows] async def delete_tag(tag_id: str, db_path: str | Path | None = None) -> bool: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") await db.execute("DELETE FROM calendar_event_tags WHERE tag_id = ?", (tag_id,)) cursor = await db.execute("DELETE FROM calendar_tags WHERE id = ?", (tag_id,)) await db.commit() return cursor.rowcount > 0 # --------------------------------------------------------------------------- # Event-Tag junction # --------------------------------------------------------------------------- async def add_tag_to_event(event_id: str, tag_id: str, db_path: str | Path | None = None) -> None: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") await db.execute( "INSERT OR IGNORE INTO calendar_event_tags (event_id, tag_id) VALUES (?, ?)", (event_id, tag_id), ) await db.commit() async def remove_tag_from_event( event_id: str, tag_id: str, db_path: str | Path | None = None ) -> None: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") await db.execute( "DELETE FROM calendar_event_tags WHERE event_id = ? AND tag_id = ?", (event_id, tag_id), ) await db.commit() async def get_event_tags(event_id: str, db_path: str | Path | None = None) -> list[Tag]: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT t.* FROM calendar_tags t " "JOIN calendar_event_tags et ON et.tag_id = t.id " "WHERE et.event_id = ?", (event_id,), ) rows = await cursor.fetchall() return [_row_to_tag(row) for row in rows] # --------------------------------------------------------------------------- # Reminder Rule CRUD # --------------------------------------------------------------------------- async def insert_reminder_rule(rule: ReminderRule, db_path: str | Path | None = None) -> None: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") await db.execute( "INSERT INTO calendar_reminder_rules " "(id, event_id, event_type_id, offset_minutes, channels) " "VALUES (?, ?, ?, ?, ?)", ( rule.id, rule.event_id, rule.event_type_id, rule.offset_minutes, json.dumps(rule.channels), ), ) await db.commit() async def list_reminder_rules_for_event( event_id: str, db_path: str | Path | None = None ) -> list[ReminderRule]: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM calendar_reminder_rules WHERE event_id = ?", (event_id,), ) rows = await cursor.fetchall() return [_row_to_reminder_rule(row) for row in rows] async def list_reminder_rules_for_type( event_type_id: str, db_path: str | Path | None = None ) -> list[ReminderRule]: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM calendar_reminder_rules WHERE event_type_id = ?", (event_type_id,), ) rows = await cursor.fetchall() return [_row_to_reminder_rule(row) for row in rows] async def delete_reminder_rule(rule_id: str, db_path: str | Path | None = None) -> bool: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") cursor = await db.execute("DELETE FROM calendar_reminder_rules WHERE id = ?", (rule_id,)) await db.commit() return cursor.rowcount > 0 # --------------------------------------------------------------------------- # Reminder Delivery CRUD # --------------------------------------------------------------------------- async def insert_reminder_delivery( delivery: ReminderDelivery, db_path: str | Path | None = None ) -> None: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") await db.execute( "INSERT INTO calendar_reminder_deliveries " "(id, reminder_rule_id, event_id, scheduled_time, status, channel, " "attempts, last_error) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", ( delivery.id, delivery.reminder_rule_id, delivery.event_id, delivery.scheduled_time, delivery.status, delivery.channel, delivery.attempts, delivery.last_error, ), ) await db.commit() async def get_pending_deliveries( event_id: str, reminder_rule_id: str, db_path: str | Path | None = None, status: str = "sent", ) -> list[ReminderDelivery]: """Check idempotency — return existing deliveries for an event+rule. By default only ``sent`` deliveries are returned, so the scheduler's idempotency check skips rules that already succeeded. Stuck ``pending`` or ``failed`` deliveries are ignored, allowing retry on the next scan. """ path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM calendar_reminder_deliveries " "WHERE event_id = ? AND reminder_rule_id = ? AND status = ?", (event_id, reminder_rule_id, status), ) rows = await cursor.fetchall() return [_row_to_reminder_delivery(row) for row in rows] async def update_delivery_status( delivery_id: str, status: str, last_error: str | None = None, db_path: str | Path | None = None, ) -> bool: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") cursor = await db.execute( "UPDATE calendar_reminder_deliveries " "SET status = ?, attempts = attempts + 1, last_error = ? " "WHERE id = ?", (status, last_error, delivery_id), ) await db.commit() return cursor.rowcount > 0 # --------------------------------------------------------------------------- # External Calendar Config CRUD # --------------------------------------------------------------------------- async def insert_external_config( config: ExternalCalendarConfig, db_path: str | Path | None = None ) -> None: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") await db.execute( "INSERT INTO calendar_external_configs " "(id, user_id, provider, credentials, sync_frequency, sync_scope, " "last_sync, sync_token) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", ( config.id, config.user_id, config.provider, config.credentials, config.sync_frequency, json.dumps(config.sync_scope), config.last_sync, config.sync_token, ), ) await db.commit() async def list_external_configs( user_id: str, db_path: str | Path | None = None ) -> list[ExternalCalendarConfig]: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM calendar_external_configs WHERE user_id = ?", (user_id,), ) rows = await cursor.fetchall() return [_row_to_external_config(row) for row in rows] async def update_external_config( config_id: str, fields: dict[str, object], db_path: str | Path | None = None ) -> bool: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH set_clauses: list[str] = [] params: list[object] = [] for field_name, value in fields.items(): if field_name == "credentials": set_clauses.append("credentials = ?") params.append(value) elif field_name == "sync_frequency": set_clauses.append("sync_frequency = ?") params.append(value) elif field_name == "sync_scope": set_clauses.append("sync_scope = ?") params.append(json.dumps(value)) elif field_name == "last_sync": set_clauses.append("last_sync = ?") params.append(value) elif field_name == "sync_token": set_clauses.append("sync_token = ?") params.append(value) if not set_clauses: return False params.append(config_id) sql = f"UPDATE calendar_external_configs SET {', '.join(set_clauses)} WHERE id = ?" async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") cursor = await db.execute(sql, tuple(params)) await db.commit() return cursor.rowcount > 0 async def delete_external_config(config_id: str, db_path: str | Path | None = None) -> bool: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") cursor = await db.execute( "DELETE FROM calendar_external_configs WHERE id = ?", (config_id,) ) await db.commit() return cursor.rowcount > 0 # --------------------------------------------------------------------------- # Invitation CRUD # --------------------------------------------------------------------------- async def insert_invitation(invitation: Invitation, db_path: str | Path | None = None) -> None: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") await db.execute( "INSERT INTO calendar_invitations " "(id, event_id, inviter_user_id, invitee_email, status, responded_at) " "VALUES (?, ?, ?, ?, ?, ?)", ( invitation.id, invitation.event_id, invitation.inviter_user_id, invitation.invitee_email, invitation.status, invitation.responded_at, ), ) await db.commit() async def get_invitation( invitation_id: str, db_path: str | Path | None = None ) -> Invitation | None: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM calendar_invitations WHERE id = ?", (invitation_id,) ) row = await cursor.fetchone() return _row_to_invitation(row) if row else None async def list_invitations( invitee_email: str, db_path: str | Path | None = None ) -> list[Invitation]: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM calendar_invitations WHERE invitee_email = ? ORDER BY responded_at DESC", (invitee_email,), ) rows = await cursor.fetchall() return [_row_to_invitation(row) for row in rows] async def update_invitation_status( invitation_id: str, status: str, responded_at: str, db_path: str | Path | None = None, ) -> bool: path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH async with aiosqlite.connect(str(path)) as db: await db.execute("PRAGMA journal_mode=WAL") await db.execute("PRAGMA busy_timeout = 5000") await db.execute("PRAGMA foreign_keys = ON") cursor = await db.execute( "UPDATE calendar_invitations SET status = ?, responded_at = ? WHERE id = ?", (status, responded_at, invitation_id), ) await db.commit() return cursor.rowcount > 0