feat(calendar): U6 CalDAV sync provider and SyncManager

AbstractSyncProvider interface with CalDAVSyncProvider implementation
for bidirectional Apple Calendar sync. SyncManager orchestrates all
providers (G8) — sync_all/sync_provider/resolve_conflict with
last-write-wins + WS notification on conflicts (G4). caldav library
calls wrapped in asyncio.to_thread for non-blocking operation.

- src/agentkit/calendar/sync/base.py — AbstractSyncProvider ABC
- src/agentkit/calendar/sync/caldav_provider.py — CalDAVSyncProvider
- src/agentkit/calendar/sync/manager.py — SyncManager (G8)
- pyproject.toml — added caldav>=1.3 dependency
- tests — 12 tests (9 CalDAV + 3 SyncManager)
This commit is contained in:
chiguyong 2026-06-23 22:52:29 +08:00
parent ffb184acc7
commit 40d326cd3f
6 changed files with 1248 additions and 0 deletions

View File

@ -28,6 +28,8 @@ dependencies = [
"python-dateutil>=2.9", "python-dateutil>=2.9",
# Calendar ICS import/export (U8) # Calendar ICS import/export (U8)
"icalendar>=5.0", "icalendar>=5.0",
# Calendar CalDAV sync — Apple Calendar (U6)
"caldav>=1.3",
# Document processing (U1-U9) # Document processing (U1-U9)
"python-docx>=1.1", "python-docx>=1.1",
"openpyxl>=3.1", "openpyxl>=3.1",

View File

@ -0,0 +1,34 @@
"""Abstract sync provider interface (U6).
All external calendar sync providers (CalDAV, Outlook, Google) implement
this interface so :class:`SyncManager` can orchestrate them uniformly.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from agentkit.calendar.models import CalendarEvent, ExternalCalendarConfig
class AbstractSyncProvider(ABC):
"""Interface for bidirectional external calendar sync."""
@abstractmethod
async def pull_changes(
self, config: ExternalCalendarConfig, since: str | None = None
) -> list[CalendarEvent]:
"""Pull remote changes into local DB. Returns pulled/updated events."""
...
@abstractmethod
async def push_changes(
self, config: ExternalCalendarConfig, events: list[CalendarEvent]
) -> list[CalendarEvent]:
"""Push local events to remote. Returns updated events (with external_id set)."""
...
@abstractmethod
async def test_connection(self, config: ExternalCalendarConfig) -> tuple[bool, str]:
"""Test connectivity. Returns (ok, error_msg). error_msg is "" on success."""
...

View File

@ -0,0 +1,394 @@
"""CalDAV sync provider — bidirectional sync with Apple Calendar (U6).
Uses the ``caldav`` library for the CalDAV protocol and ``icalendar`` for
ICS serialization/parsing. Conflict resolution is last-write-wins based on
``last_modified``; conflicts emit a ``calendar_sync_conflict`` WS notification
via the injectable ``notify_callback`` (G4).
"""
from __future__ import annotations
import asyncio
import json
import logging
import uuid
from collections.abc import Awaitable, Callable
from datetime import date, datetime, timedelta, timezone
from pathlib import Path
from typing import Any
import caldav
from icalendar import Calendar, Event
from icalendar.prop import vRecur
from agentkit.calendar.db import (
DEFAULT_CALENDAR_DB_PATH,
get_event_by_external_id,
insert_event,
update_event,
)
from agentkit.calendar.models import CalendarEvent, ExternalCalendarConfig, _now_iso
from agentkit.calendar.sync.base import AbstractSyncProvider
logger = logging.getLogger(__name__)
# Async callback signature: (event_type: str, payload: dict) -> None
NotifyCallback = Callable[[str, dict[str, Any]], Awaitable[None]]
def _parse_iso(dt_str: str) -> datetime:
"""Parse ISO 8601 string to UTC-aware datetime."""
dt = datetime.fromisoformat(dt_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc)
def _to_iso_utc(dt: datetime) -> str:
"""Convert datetime to ISO 8601 UTC string."""
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(timezone.utc).isoformat()
def _extract_dt(component: Any, key: str) -> tuple[str, bool]:
"""Extract date/datetime from icalendar component. Returns (iso, is_all_day)."""
prop = component.get(key)
if prop is None:
return "", False
val = prop.dt
if isinstance(val, date) and not isinstance(val, datetime):
return val.isoformat(), True
return _to_iso_utc(val), False
class CalDAVSyncProvider(AbstractSyncProvider):
"""Bidirectional CalDAV sync provider for Apple Calendar.
The ``client_factory`` parameter allows tests to inject a mock CalDAV
client without touching ``caldav.DAVClient``. When ``None``, a real
``caldav.DAVClient`` is constructed from ``config.credentials``.
"""
def __init__(
self,
db_path: str | Path | None = None,
client_factory: Callable[[ExternalCalendarConfig], Any] | None = None,
notify_callback: NotifyCallback | None = None,
) -> None:
self.db_path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH
self._client_factory = client_factory
self._notify = notify_callback
# ponytail: conflicts list is in-memory only; if the process restarts
# before a sync completes, conflict history is lost. Upgrade: persist
# to a calendar_sync_conflicts table.
# ------------------------------------------------------------------
# Client construction
# ------------------------------------------------------------------
def _make_client(self, config: ExternalCalendarConfig) -> Any:
"""Build a caldav.DAVClient from config credentials."""
if self._client_factory is not None:
return self._client_factory(config)
# ponytail: credentials stored as plain JSON dict; encryption deferred.
# Upgrade: use agentkit.server.auth.crypto to encrypt at rest.
creds = json.loads(config.credentials) if config.credentials else {}
return caldav.DAVClient(
url=creds.get("url", ""),
username=creds.get("username", ""),
password=creds.get("password", ""),
)
def _get_calendar(self, config: ExternalCalendarConfig) -> Any:
"""Connect and return the first calendar from the principal."""
client = self._make_client(config)
principal = client.principal()
calendars = principal.calendars()
if not calendars:
raise RuntimeError("No CalDAV calendars found for this account")
return calendars[0]
# ------------------------------------------------------------------
# Pull
# ------------------------------------------------------------------
async def pull_changes(
self, config: ExternalCalendarConfig, since: str | None = None
) -> list[CalendarEvent]:
"""Pull remote CalDAV events into local DB.
Fetches events in a date range starting from ``since`` (or 1 year ago
if None). Matches by ``external_id`` (CalDAV UID). Creates new local
events or updates existing ones. Conflicts (both sides modified) are
resolved last-write-wins with a WS notification.
"""
# caldav is synchronous — run in a thread to avoid blocking the loop
remote_events = await asyncio.to_thread(self._fetch_remote_events, config, since)
result: list[CalendarEvent] = []
for remote in remote_events:
local = await get_event_by_external_id(
remote.external_id, "caldav", config.user_id, self.db_path
)
if local is None:
# New remote event → create local
await insert_event(remote, self.db_path)
result.append(remote)
else:
# Existing → check for conflict
resolved = await self._resolve_pull_conflict(local, remote)
if resolved is not None:
result.append(resolved)
return result
def _fetch_remote_events(
self, config: ExternalCalendarConfig, since: str | None
) -> list[CalendarEvent]:
"""Synchronous CalDAV fetch — runs in thread."""
calendar = self._get_calendar(config)
# Date range: since → now+90d (or wide default)
if since:
start_dt = _parse_iso(since)
else:
start_dt = datetime.now(timezone.utc) - timedelta(days=365)
end_dt = datetime.now(timezone.utc) + timedelta(days=90)
caldav_events = calendar.date_search(start_dt, end_dt)
events: list[CalendarEvent] = []
for ce in caldav_events:
parsed = self._parse_caldav_event(ce, config.user_id)
if parsed is not None:
events.append(parsed)
return events
def _parse_caldav_event(self, caldav_event: Any, user_id: str) -> CalendarEvent | None:
"""Convert a caldav.Event to a CalendarEvent dataclass."""
try:
ical_data = caldav_event.data
cal = Calendar.from_ical(ical_data)
except Exception as e:
logger.warning("Failed to parse CalDAV event: %s", e)
return None
for component in cal.walk("VEVENT"):
uid = str(component.get("UID", "") or "") or None
if not uid:
continue
title = str(component.get("SUMMARY", "") or "")
if not title:
continue
start_str, is_all_day = _extract_dt(component, "DTSTART")
end_str, _ = _extract_dt(component, "DTEND")
if not end_str:
end_str = start_str
description = str(component.get("DESCRIPTION", "") or "")
location = str(component.get("LOCATION", "") or "")
rrule_str: str | None = None
rrule = component.get("RRULE")
if rrule is not None:
rrule_str = rrule.to_ical().decode("utf-8")
# LAST-MODIFIED from VEVENT (for conflict resolution)
lm_prop = component.get("LAST-MODIFIED")
if lm_prop is not None:
last_modified = _to_iso_utc(lm_prop.dt)
else:
last_modified = _now_iso()
now = _now_iso()
return CalendarEvent(
id=uuid.uuid4().hex,
user_id=user_id,
title=title,
description=description,
start_time=start_str,
end_time=end_str,
is_all_day=is_all_day,
location=location,
rrule=rrule_str,
source="manual",
external_id=uid,
external_provider="caldav",
last_modified=last_modified,
created_at=now,
)
return None
async def _resolve_pull_conflict(
self, local: CalendarEvent, remote: CalendarEvent
) -> CalendarEvent | None:
"""Resolve conflict when both local and remote exist.
If remote is newer update local. If local is newer conflict
(last-write-wins keeps local, but log + notify). If equal no-op.
Returns the winning event (or None if local kept unchanged).
"""
local_lm = (
_parse_iso(local.last_modified)
if local.last_modified
else datetime.min.replace(tzinfo=timezone.utc)
)
remote_lm = (
_parse_iso(remote.last_modified)
if remote.last_modified
else datetime.min.replace(tzinfo=timezone.utc)
)
if remote_lm > local_lm:
# Remote wins → update local
fields = {
"title": remote.title,
"description": remote.description,
"start_time": remote.start_time,
"end_time": remote.end_time,
"is_all_day": remote.is_all_day,
"location": remote.location,
"rrule": remote.rrule,
"last_modified": remote.last_modified,
}
await update_event(local.id, fields, self.db_path)
return remote
if local_lm > remote_lm:
# Local wins → conflict, keep local, notify
await self._notify_conflict(local, remote, winner="local")
return None
# Equal timestamps → no change needed
return None
# ------------------------------------------------------------------
# Push
# ------------------------------------------------------------------
async def push_changes(
self, config: ExternalCalendarConfig, events: list[CalendarEvent]
) -> list[CalendarEvent]:
"""Push local events to CalDAV. Returns events with external_id set."""
result: list[CalendarEvent] = []
for event in events:
updated = await self._push_single(config, event)
result.append(updated)
return result
async def _push_single(
self, config: ExternalCalendarConfig, event: CalendarEvent
) -> CalendarEvent:
"""Push a single event to CalDAV, return event with external_id set."""
ical_bytes = self._event_to_ics(event)
# caldav is synchronous — run in thread
saved_uid = await asyncio.to_thread(self._save_remote_event, config, ical_bytes, event)
# If event had no external_id, set it from the saved UID
if not event.external_id and saved_uid:
fields = {
"external_id": saved_uid,
"external_provider": "caldav",
"last_modified": _now_iso(),
}
await update_event(event.id, fields, self.db_path)
event.external_id = saved_uid
event.external_provider = "caldav"
return event
def _save_remote_event(
self, config: ExternalCalendarConfig, ical_bytes: bytes, event: CalendarEvent
) -> str | None:
"""Synchronous CalDAV save — runs in thread. Returns remote UID."""
calendar = self._get_calendar(config)
saved = calendar.save_event(ical_bytes)
# Extract UID from saved event
try:
cal = Calendar.from_ical(saved.data)
for comp in cal.walk("VEVENT"):
uid = str(comp.get("UID", "") or "") or None
if uid:
return uid
except Exception as e:
logger.warning("Failed to extract UID from saved event: %s", e)
return event.external_id
def _event_to_ics(self, event: CalendarEvent) -> bytes:
"""Convert CalendarEvent to ICS bytes using icalendar library."""
cal = Calendar()
cal.add("prodid", "-//Fischer AgentKit//Calendar//EN")
cal.add("version", "2.0")
vevent = Event()
# Use external_id if available, else local id as UID
vevent.add("uid", event.external_id or event.id)
vevent.add("summary", event.title)
if event.start_time:
start_dt = _parse_iso(event.start_time)
if event.is_all_day:
vevent.add("dtstart", start_dt.date())
else:
vevent.add("dtstart", start_dt)
if event.end_time:
end_dt = _parse_iso(event.end_time)
if event.is_all_day:
vevent.add("dtend", end_dt.date())
else:
vevent.add("dtend", end_dt)
if event.description:
vevent.add("description", event.description)
if event.location:
vevent.add("location", event.location)
if event.rrule:
vevent.add("rrule", vRecur.from_ical(event.rrule))
# LAST-MODIFIED for conflict resolution
if event.last_modified:
vevent.add("last-modified", _parse_iso(event.last_modified))
cal.add_component(vevent)
return cal.to_ical()
# ------------------------------------------------------------------
# Test connection
# ------------------------------------------------------------------
async def test_connection(self, config: ExternalCalendarConfig) -> tuple[bool, str]:
"""Test CalDAV connectivity. Returns (ok, error_msg)."""
try:
await asyncio.to_thread(self._get_calendar, config)
return True, ""
except Exception as e:
return False, str(e)
# ------------------------------------------------------------------
# Conflict notification
# ------------------------------------------------------------------
async def _notify_conflict(
self, local: CalendarEvent, remote: CalendarEvent, winner: str
) -> None:
"""Log conflict and send WS notification via callback (G4)."""
logger.info(
"Sync conflict for event %s (external_id=%s): local_lm=%s remote_lm=%s winner=%s",
local.id,
local.external_id,
local.last_modified,
remote.last_modified,
winner,
)
if self._notify is not None:
payload = {
"event_id": local.id,
"title": local.title,
"external_id": local.external_id,
"local_last_modified": local.last_modified,
"remote_last_modified": remote.last_modified,
"winner": winner,
}
await self._notify("calendar_sync_conflict", payload)

View File

@ -0,0 +1,223 @@
"""SyncManager — orchestrates external calendar sync providers (U6, G8).
Iterates user ``ExternalCalendarConfig`` entries, dispatches to the matching
provider (CalDAV, Outlook, ), and resolves conflicts via last-write-wins
with WS notification. The manager is intended to be registered on
``app.state.sync_manager`` and started/stopped in ``app.py`` lifespan.
ponytail: wiring into ``app.py`` lifespan is deferred this module provides
the API only. Upgrade: add ``SyncManager.start()``/``stop()`` asyncio loop
in ``app.py`` next to the reminder scheduler.
"""
from __future__ import annotations
import logging
from collections.abc import Awaitable, Callable
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from agentkit.calendar.db import (
DEFAULT_CALENDAR_DB_PATH,
list_events,
list_external_configs,
update_external_config,
)
from agentkit.calendar.models import CalendarEvent, ExternalCalendarConfig, _now_iso
from agentkit.calendar.sync.base import AbstractSyncProvider
from agentkit.calendar.sync.caldav_provider import CalDAVSyncProvider
logger = logging.getLogger(__name__)
# Async callback signature: (event_type: str, payload: dict) -> None
NotifyCallback = Callable[[str, dict[str, Any]], Awaitable[None]]
class SyncManager:
"""Orchestrates all external calendar sync providers for a user.
Providers are registered by ``ExternalCalendarConfig.provider`` name.
The ``notify_callback`` is forwarded to providers for conflict WS push.
"""
def __init__(
self,
db_path: str | Path | None = None,
notify_callback: NotifyCallback | None = None,
providers: dict[str, AbstractSyncProvider] | None = None,
) -> None:
self.db_path = Path(db_path) if db_path is not None else DEFAULT_CALENDAR_DB_PATH
self._notify = notify_callback
# ponytail: provider registry is hardcoded for now; if a third provider
# (e.g. Google) is added, switch to entry-point discovery. Upgrade:
# ``importlib.metadata.entry_points(group="agentkit.sync_providers")``.
self._providers: dict[str, AbstractSyncProvider] = providers or {
"caldav": CalDAVSyncProvider(db_path=self.db_path, notify_callback=notify_callback),
}
def _get_provider(self, provider_name: str) -> AbstractSyncProvider | None:
"""Return the provider for *provider_name*, or None if unsupported."""
return self._providers.get(provider_name)
# ------------------------------------------------------------------
# Sync orchestration
# ------------------------------------------------------------------
async def sync_all(self, user_id: str) -> dict[str, Any]:
"""Sync all external calendar configs for a user.
Returns ``{"synced": N, "errors": [...]}``.
"""
configs = await list_external_configs(user_id, self.db_path)
synced = 0
errors: list[str] = []
for config in configs:
try:
await self.sync_provider(config.id)
synced += 1
except Exception as e:
errors.append(f"{config.id}: {e}")
logger.warning("Sync failed for config %s: %s", config.id, e)
return {"synced": synced, "errors": errors}
async def sync_provider(self, config_id: str) -> dict[str, Any]:
"""Sync a single external calendar config by ID.
Pulls remote changes, then pushes local changes modified since
``last_sync``. Updates ``last_sync`` on success.
Returns ``{"pulled": N, "pushed": M}``.
"""
config = await self._get_config(config_id)
if config is None:
raise ValueError(f"ExternalCalendarConfig not found: {config_id}")
provider = self._get_provider(config.provider)
if provider is None:
raise ValueError(f"Unsupported provider: {config.provider}")
since = config.last_sync
# 1. Pull remote → local (creates/updates local events, resolves conflicts)
pulled = await provider.pull_changes(config, since=since)
# 2. Push local changes → remote (events modified since last_sync)
local_events = await self._get_modified_events(config, since)
pushed = await provider.push_changes(config, local_events)
# 3. Update last_sync
now = _now_iso()
await update_external_config(config.id, {"last_sync": now}, self.db_path)
config.last_sync = now
return {"pulled": len(pulled), "pushed": len(pushed)}
async def _get_config(self, config_id: str) -> ExternalCalendarConfig | None:
"""Fetch a single ExternalCalendarConfig by ID."""
# ponytail: no db.get_external_config(id) exists; we list all for the
# user. This is O(N) over the user's configs (typically <5). Upgrade:
# add ``get_external_config(config_id)`` to db.py if this becomes hot.
# We don't know the user_id here, so scan all configs in the DB.
import aiosqlite
from agentkit.calendar.db import _row_to_external_config
async with aiosqlite.connect(str(self.db_path)) as db:
db.row_factory = aiosqlite.Row
cursor = await db.execute(
"SELECT * FROM calendar_external_configs WHERE id = ?", (config_id,)
)
row = await cursor.fetchone()
return _row_to_external_config(row) if row else None
async def _get_modified_events(
self, config: ExternalCalendarConfig, since: str | None
) -> list[CalendarEvent]:
"""Return local events for the user modified since *since*.
When *since* is None (first sync), returns all events for the user
that do not yet have an ``external_id`` (i.e. new local events to push).
When *since* is set, returns events whose ``last_modified`` >= *since*.
"""
events = await list_events(config.user_id, db_path=self.db_path)
if since is None:
# First sync: push only events without external_id
return [e for e in events if not e.external_id]
since_dt = datetime.fromisoformat(since)
if since_dt.tzinfo is None:
since_dt = since_dt.replace(tzinfo=timezone.utc)
result: list[CalendarEvent] = []
for event in events:
if not event.last_modified:
continue
event_lm = datetime.fromisoformat(event.last_modified)
if event_lm.tzinfo is None:
event_lm = event_lm.replace(tzinfo=timezone.utc)
if event_lm >= since_dt:
result.append(event)
return result
# ------------------------------------------------------------------
# Conflict resolution
# ------------------------------------------------------------------
async def resolve_conflict(
self, local_event: CalendarEvent, remote_event: CalendarEvent
) -> CalendarEvent:
"""Resolve a sync conflict using last-write-wins (G8/G4).
Compares ``last_modified`` timestamps. The newer event wins. Sends a
``calendar_sync_conflict`` WS notification via the notify callback.
Returns the winning event.
"""
local_lm = (
datetime.fromisoformat(local_event.last_modified)
if local_event.last_modified
else datetime.min.replace(tzinfo=timezone.utc)
)
if local_lm.tzinfo is None:
local_lm = local_lm.replace(tzinfo=timezone.utc)
remote_lm = (
datetime.fromisoformat(remote_event.last_modified)
if remote_event.last_modified
else datetime.min.replace(tzinfo=timezone.utc)
)
if remote_lm.tzinfo is None:
remote_lm = remote_lm.replace(tzinfo=timezone.utc)
if remote_lm > local_lm:
winner = remote_event
winner_label = "remote"
elif local_lm > remote_lm:
winner = local_event
winner_label = "local"
else:
# Equal timestamps → local wins by default
winner = local_event
winner_label = "local"
logger.info(
"Conflict resolved for event %s: winner=%s (local_lm=%s remote_lm=%s)",
local_event.id,
winner_label,
local_event.last_modified,
remote_event.last_modified,
)
# G4: WS notification
if self._notify is not None:
payload = {
"event_id": local_event.id,
"title": local_event.title,
"external_id": local_event.external_id,
"local_last_modified": local_event.last_modified,
"remote_last_modified": remote_event.last_modified,
"winner": winner_label,
}
await self._notify("calendar_sync_conflict", payload)
return winner

View File

@ -0,0 +1,398 @@
"""Tests for CalDAVSyncProvider — bidirectional Apple Calendar sync (U6).
The ``caldav`` library is not installed in the test environment, so we inject
a mock module into ``sys.modules`` before importing the provider. All CalDAV
interactions are mocked via the ``client_factory`` injection point.
"""
from __future__ import annotations
import asyncio
import json
import sys
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock
import pytest
# caldav is not installed in the test env — inject a mock module so that
# `import caldav` in caldav_provider.py succeeds at import time.
if "caldav" not in sys.modules: # pragma: no cover
sys.modules["caldav"] = MagicMock()
from agentkit.calendar.db import (
get_event_by_external_id,
init_calendar_db,
insert_event,
list_events,
)
from agentkit.calendar.models import CalendarEvent, ExternalCalendarConfig
from agentkit.calendar.sync.caldav_provider import CalDAVSyncProvider
USER_ID = "user-1"
# ---------------------------------------------------------------------------
# ICS + Mock helpers
# ---------------------------------------------------------------------------
def make_ics(
uid: str,
summary: str,
start: str = "20260701T100000Z",
end: str = "20260701T110000Z",
description: str = "",
location: str = "",
rrule: str | None = None,
last_modified: str = "20260601T000000Z",
) -> bytes:
"""Build a minimal valid ICS bytes payload for a single VEVENT."""
lines = [
b"BEGIN:VCALENDAR",
b"VERSION:2.0",
b"PRODID:-//Test//Test//EN",
b"BEGIN:VEVENT",
f"UID:{uid}".encode(),
f"SUMMARY:{summary}".encode(),
f"DTSTART:{start}".encode(),
f"DTEND:{end}".encode(),
]
if description:
lines.append(f"DESCRIPTION:{description}".encode())
if location:
lines.append(f"LOCATION:{location}".encode())
if rrule:
lines.append(f"RRULE:{rrule}".encode())
if last_modified:
lines.append(f"LAST-MODIFIED:{last_modified}".encode())
lines.extend([b"END:VEVENT", b"END:VCALENDAR"])
return b"\r\n".join(lines)
class MockCaldavEvent:
"""Mock caldav.Event — exposes .data returning ICS bytes."""
def __init__(self, ics_data: bytes) -> None:
self.data = ics_data
def make_mock_client(
remote_events: list[bytes],
saved_uid: str | None = None,
raise_on_connect: Exception | None = None,
) -> tuple[Any, Any]:
"""Create a mock CalDAV client and its mock calendar.
Returns ``(client, mock_calendar)``. ``mock_calendar`` is None when
``raise_on_connect`` is set (connection fails before calendar is reached).
"""
client = MagicMock()
if raise_on_connect is not None:
client.principal.side_effect = raise_on_connect
return client, None
principal = MagicMock()
calendar = MagicMock()
calendar.date_search.return_value = [MockCaldavEvent(d) for d in remote_events]
saved_ics = make_ics(saved_uid or "saved-uid", "Saved Event")
calendar.save_event.return_value = MockCaldavEvent(saved_ics)
principal.calendars.return_value = [calendar]
client.principal.return_value = principal
return client, calendar
def client_factory_from(client: Any) -> Any:
"""Wrap a mock client in a client_factory callable."""
def factory(config: ExternalCalendarConfig) -> Any:
return client
return factory
def make_config(
config_id: str = "config-1",
user_id: str = USER_ID,
provider: str = "caldav",
last_sync: str | None = None,
) -> ExternalCalendarConfig:
return ExternalCalendarConfig(
id=config_id,
user_id=user_id,
provider=provider,
credentials=json.dumps(
{"url": "https://caldav.example.com", "username": "user", "password": "pass"}
),
last_sync=last_sync,
)
def make_local_event(
event_id: str = "evt-1",
title: str = "Local Event",
external_id: str | None = None,
last_modified: str = "2026-01-01T00:00:00+00:00",
rrule: str | None = None,
) -> CalendarEvent:
return CalendarEvent(
id=event_id,
user_id=USER_ID,
title=title,
start_time="2026-07-01T10:00:00+00:00",
end_time="2026-07-01T11:00:00+00:00",
external_id=external_id,
external_provider="caldav" if external_id else None,
last_modified=last_modified,
created_at=last_modified,
rrule=rrule,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def calendar_db_path(tmp_path: Path) -> Path:
path = tmp_path / "test_calendar.db"
asyncio.run(init_calendar_db(path))
return path
# ---------------------------------------------------------------------------
# Pull tests
# ---------------------------------------------------------------------------
async def test_caldav_provider_pull_creates_local_events(calendar_db_path: Path) -> None:
"""Mock returns 2 events → 2 local events created with external_id set."""
ics1 = make_ics("uid-1", "Remote Event 1")
ics2 = make_ics("uid-2", "Remote Event 2")
client, _ = make_mock_client([ics1, ics2])
provider = CalDAVSyncProvider(
db_path=calendar_db_path, client_factory=client_factory_from(client)
)
config = make_config()
pulled = await provider.pull_changes(config)
assert len(pulled) == 2
events = await list_events(USER_ID, db_path=calendar_db_path)
assert len(events) == 2
titles = {e.title for e in events}
assert titles == {"Remote Event 1", "Remote Event 2"}
for event in events:
assert event.external_id is not None
assert event.external_provider == "caldav"
async def test_caldav_provider_pull_updates_existing_event(calendar_db_path: Path) -> None:
"""Local event exists (matched by external_id), remote is newer → local updated."""
local = make_local_event(
event_id="evt-1",
title="Old Title",
external_id="uid-1",
last_modified="2026-01-01T00:00:00+00:00",
)
await insert_event(local, calendar_db_path)
ics_remote = make_ics("uid-1", "Updated Title", last_modified="20260601T120000Z")
client, _ = make_mock_client([ics_remote])
provider = CalDAVSyncProvider(
db_path=calendar_db_path, client_factory=client_factory_from(client)
)
config = make_config()
pulled = await provider.pull_changes(config)
assert len(pulled) == 1
updated = await get_event_by_external_id("uid-1", "caldav", USER_ID, calendar_db_path)
assert updated is not None
assert updated.title == "Updated Title"
# ---------------------------------------------------------------------------
# Push tests
# ---------------------------------------------------------------------------
async def test_caldav_provider_push_creates_remote_event(calendar_db_path: Path) -> None:
"""Local event with no external_id → push creates remote, external_id stored."""
local = make_local_event(event_id="evt-1", title="New Local Event", external_id=None)
await insert_event(local, calendar_db_path)
client, _ = make_mock_client([], saved_uid="remote-uid-1")
provider = CalDAVSyncProvider(
db_path=calendar_db_path, client_factory=client_factory_from(client)
)
config = make_config()
events = await list_events(USER_ID, db_path=calendar_db_path)
pushed = await provider.push_changes(config, events)
assert len(pushed) == 1
assert pushed[0].external_id == "remote-uid-1"
assert pushed[0].external_provider == "caldav"
db_event = await get_event_by_external_id("remote-uid-1", "caldav", USER_ID, calendar_db_path)
assert db_event is not None
async def test_caldav_provider_push_updates_remote_event(calendar_db_path: Path) -> None:
"""Local event modified, has external_id → push updates remote."""
local = make_local_event(
event_id="evt-1",
title="Updated Local Event",
external_id="existing-uid",
last_modified="2026-06-01T00:00:00+00:00",
)
await insert_event(local, calendar_db_path)
client, mock_calendar = make_mock_client([], saved_uid="existing-uid")
provider = CalDAVSyncProvider(
db_path=calendar_db_path, client_factory=client_factory_from(client)
)
config = make_config()
events = await list_events(USER_ID, db_path=calendar_db_path)
pushed = await provider.push_changes(config, events)
assert len(pushed) == 1
assert pushed[0].external_id == "existing-uid"
# Verify save_event was called with ICS containing the existing UID
saved_ics = mock_calendar.save_event.call_args[0][0]
assert b"UID:existing-uid" in saved_ics
# ---------------------------------------------------------------------------
# Conflict tests
# ---------------------------------------------------------------------------
async def test_caldav_conflict_last_write_wins(calendar_db_path: Path) -> None:
"""Both sides modified, local is newer → local wins, local not updated."""
local = make_local_event(
event_id="evt-1",
title="Local Updated",
external_id="uid-1",
last_modified="2026-06-15T00:00:00+00:00",
)
await insert_event(local, calendar_db_path)
ics_remote = make_ics("uid-1", "Remote Older", last_modified="20260601T000000Z")
client, _ = make_mock_client([ics_remote])
provider = CalDAVSyncProvider(
db_path=calendar_db_path, client_factory=client_factory_from(client)
)
config = make_config()
pulled = await provider.pull_changes(config)
# Local wins → no update, pulled is empty
assert len(pulled) == 0
db_event = await get_event_by_external_id("uid-1", "caldav", USER_ID, calendar_db_path)
assert db_event is not None
assert db_event.title == "Local Updated"
async def test_caldav_conflict_sends_ws_notification(calendar_db_path: Path) -> None:
"""Conflict detected → WS callback called with calendar_sync_conflict type (G4)."""
notifications: list[tuple[str, dict[str, Any]]] = []
async def notify(event_type: str, payload: dict[str, Any]) -> None:
notifications.append((event_type, payload))
local = make_local_event(
event_id="evt-1",
title="Local Updated",
external_id="uid-1",
last_modified="2026-06-15T00:00:00+00:00",
)
await insert_event(local, calendar_db_path)
ics_remote = make_ics("uid-1", "Remote Older", last_modified="20260601T000000Z")
client, _ = make_mock_client([ics_remote])
provider = CalDAVSyncProvider(
db_path=calendar_db_path,
client_factory=client_factory_from(client),
notify_callback=notify,
)
config = make_config()
await provider.pull_changes(config)
assert len(notifications) == 1
event_type, payload = notifications[0]
assert event_type == "calendar_sync_conflict"
assert payload["event_id"] == "evt-1"
assert payload["winner"] == "local"
# ---------------------------------------------------------------------------
# RRULE roundtrip test
# ---------------------------------------------------------------------------
async def test_caldav_rrule_roundtrip(calendar_db_path: Path) -> None:
"""Event with RRULE synced → RRULE preserved in both pull and push."""
rrule = "FREQ=WEEKLY;BYDAY=MO;COUNT=4"
ics_remote = make_ics("uid-rrule", "Weekly Meeting", rrule=rrule)
client, _ = make_mock_client([ics_remote])
provider = CalDAVSyncProvider(
db_path=calendar_db_path, client_factory=client_factory_from(client)
)
config = make_config()
# Pull: verify RRULE preserved
pulled = await provider.pull_changes(config)
assert len(pulled) == 1
assert pulled[0].rrule is not None
assert "FREQ=WEEKLY" in pulled[0].rrule
assert "BYDAY=MO" in pulled[0].rrule
assert "COUNT=4" in pulled[0].rrule
# Push: verify RRULE in ICS sent to remote
events = await list_events(USER_ID, db_path=calendar_db_path)
client2, mock_calendar2 = make_mock_client([], saved_uid="uid-rrule")
provider2 = CalDAVSyncProvider(
db_path=calendar_db_path, client_factory=client_factory_from(client2)
)
await provider2.push_changes(config, events)
saved_ics = mock_calendar2.save_event.call_args[0][0]
assert b"RRULE:FREQ=WEEKLY" in saved_ics
assert b"BYDAY=MO" in saved_ics
assert b"COUNT=4" in saved_ics
# ---------------------------------------------------------------------------
# test_connection tests
# ---------------------------------------------------------------------------
async def test_caldav_test_connection_success(calendar_db_path: Path) -> None:
"""Mock returns calendars → test_connection() returns (True, "")."""
client, _ = make_mock_client([])
provider = CalDAVSyncProvider(
db_path=calendar_db_path, client_factory=client_factory_from(client)
)
config = make_config()
ok, msg = await provider.test_connection(config)
assert ok is True
assert msg == ""
async def test_caldav_test_connection_failure(calendar_db_path: Path) -> None:
"""Mock raises → test_connection() returns (False, error_msg)."""
client, _ = make_mock_client([], raise_on_connect=ConnectionError("Auth failed"))
provider = CalDAVSyncProvider(
db_path=calendar_db_path, client_factory=client_factory_from(client)
)
config = make_config()
ok, msg = await provider.test_connection(config)
assert ok is False
assert "Auth failed" in msg

View File

@ -0,0 +1,197 @@
"""Tests for SyncManager — orchestrates external calendar sync (U6, G8).
Uses a mock ``AbstractSyncProvider`` to test SyncManager in isolation,
verifying provider iteration, last_sync updates, and conflict WS push.
"""
from __future__ import annotations
import asyncio
import json
import sys
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock
import pytest
# caldav is not installed — inject mock so SyncManager's import of
# CalDAVSyncProvider (which imports caldav) does not fail.
if "caldav" not in sys.modules: # pragma: no cover
sys.modules["caldav"] = MagicMock()
from agentkit.calendar.db import (
init_calendar_db,
insert_external_config,
list_external_configs,
)
from agentkit.calendar.models import CalendarEvent, ExternalCalendarConfig
from agentkit.calendar.sync.base import AbstractSyncProvider
from agentkit.calendar.sync.manager import SyncManager
USER_ID = "user-1"
# ---------------------------------------------------------------------------
# Mock provider + helpers
# ---------------------------------------------------------------------------
class MockSyncProvider(AbstractSyncProvider):
"""In-memory mock provider that records calls for assertion."""
def __init__(self) -> None:
self.pull_calls: int = 0
self.push_calls: int = 0
self.pull_configs: list[ExternalCalendarConfig] = []
self.push_configs: list[ExternalCalendarConfig] = []
async def pull_changes(
self, config: ExternalCalendarConfig, since: str | None = None
) -> list[CalendarEvent]:
self.pull_calls += 1
self.pull_configs.append(config)
return []
async def push_changes(
self, config: ExternalCalendarConfig, events: list[CalendarEvent]
) -> list[CalendarEvent]:
self.push_calls += 1
self.push_configs.append(config)
return events
async def test_connection(self, config: ExternalCalendarConfig) -> tuple[bool, str]:
return True, ""
def make_config(
config_id: str = "config-1",
user_id: str = USER_ID,
provider: str = "caldav",
last_sync: str | None = None,
) -> ExternalCalendarConfig:
return ExternalCalendarConfig(
id=config_id,
user_id=user_id,
provider=provider,
credentials=json.dumps(
{"url": "https://caldav.example.com", "username": "user", "password": "pass"}
),
last_sync=last_sync,
)
def make_event(
event_id: str = "evt-1",
title: str = "Test Event",
last_modified: str = "2026-06-01T00:00:00+00:00",
external_id: str | None = None,
) -> CalendarEvent:
return CalendarEvent(
id=event_id,
user_id=USER_ID,
title=title,
start_time="2026-07-01T10:00:00+00:00",
end_time="2026-07-01T11:00:00+00:00",
external_id=external_id,
external_provider="caldav" if external_id else None,
last_modified=last_modified,
created_at=last_modified,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def calendar_db_path(tmp_path: Path) -> Path:
path = tmp_path / "test_calendar.db"
asyncio.run(init_calendar_db(path))
return path
# ---------------------------------------------------------------------------
# sync_all tests
# ---------------------------------------------------------------------------
async def test_sync_manager_sync_all_iterates_providers(
calendar_db_path: Path,
) -> None:
"""2 configs → both providers called (G8)."""
config1 = make_config(config_id="config-1")
config2 = make_config(config_id="config-2")
await insert_external_config(config1, calendar_db_path)
await insert_external_config(config2, calendar_db_path)
mock_provider = MockSyncProvider()
manager = SyncManager(db_path=calendar_db_path, providers={"caldav": mock_provider})
result = await manager.sync_all(USER_ID)
assert result["synced"] == 2
assert result["errors"] == []
assert mock_provider.pull_calls == 2
assert mock_provider.push_calls == 2
# ---------------------------------------------------------------------------
# sync_provider tests
# ---------------------------------------------------------------------------
async def test_sync_manager_sync_provider_updates_last_sync(
calendar_db_path: Path,
) -> None:
"""Sync completes → config.last_sync updated (G8)."""
config = make_config(config_id="config-1", last_sync=None)
await insert_external_config(config, calendar_db_path)
mock_provider = MockSyncProvider()
manager = SyncManager(db_path=calendar_db_path, providers={"caldav": mock_provider})
await manager.sync_provider("config-1")
configs = await list_external_configs(USER_ID, calendar_db_path)
assert len(configs) == 1
assert configs[0].last_sync is not None
# ---------------------------------------------------------------------------
# resolve_conflict tests
# ---------------------------------------------------------------------------
async def test_sync_manager_resolve_conflict_notifies_user(
calendar_db_path: Path,
) -> None:
"""Conflict → WS callback called with calendar_sync_conflict type (G8/G4)."""
notifications: list[tuple[str, dict[str, Any]]] = []
async def notify(event_type: str, payload: dict[str, Any]) -> None:
notifications.append((event_type, payload))
manager = SyncManager(db_path=calendar_db_path, notify_callback=notify)
local = make_event(
event_id="evt-1",
title="Local Version",
last_modified="2026-06-15T00:00:00+00:00",
external_id="uid-1",
)
remote = make_event(
event_id="evt-remote",
title="Remote Version",
last_modified="2026-06-01T00:00:00+00:00",
external_id="uid-1",
)
winner = await manager.resolve_conflict(local, remote)
assert len(notifications) == 1
event_type, payload = notifications[0]
assert event_type == "calendar_sync_conflict"
assert payload["winner"] == "local"
assert winner is local # Local is newer → local wins