68 lines
2.3 KiB
Python
68 lines
2.3 KiB
Python
"""RRULE recurrence expansion wrapper.
|
|
|
|
Uses ``dateutil.rrule`` for RFC 5545 compliant recurrence rule expansion.
|
|
All times are UTC (see KTD-11).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
from dateutil.rrule import rrulestr
|
|
|
|
|
|
def _parse_dt(dt_str: str) -> datetime:
|
|
"""Parse ISO 8601 string to timezone-aware datetime (UTC)."""
|
|
dt = datetime.fromisoformat(dt_str)
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return dt
|
|
|
|
|
|
def expand_rrule(
|
|
rrule_str: str | None,
|
|
dtstart: str,
|
|
range_start: str | None = None,
|
|
range_end: str | None = None,
|
|
) -> list[str]:
|
|
"""Expand a recurrence rule into individual occurrence start times.
|
|
|
|
Args:
|
|
rrule_str: RFC 5545 RRULE string (e.g. "FREQ=WEEKLY;BYDAY=MO;COUNT=4").
|
|
If None or empty, returns ``[dtstart]``.
|
|
dtstart: ISO 8601 start time of the first occurrence (UTC).
|
|
range_start: Optional ISO 8601 lower bound (inclusive) for filtering.
|
|
range_end: Optional ISO 8601 upper bound (exclusive) for filtering.
|
|
|
|
Returns:
|
|
List of ISO 8601 UTC strings for each occurrence's start time
|
|
within the given range. If no range is specified, returns all
|
|
occurrences (bounded by COUNT or UNTIL in the RRULE).
|
|
"""
|
|
if not rrule_str:
|
|
return [dtstart]
|
|
|
|
start_dt = _parse_dt(dtstart)
|
|
|
|
# rrulestr expects the RRULE to have a DTSTART context.
|
|
# We prepend DTSTART to ensure the rule starts from the event's start time.
|
|
full_rule = f"DTSTART:{start_dt.strftime('%Y%m%dT%H%M%SZ')}\nRRULE:{rrule_str}"
|
|
rule = rrulestr(full_rule)
|
|
|
|
if range_start is not None and range_end is not None:
|
|
rs = _parse_dt(range_start)
|
|
re_ = _parse_dt(range_end)
|
|
# Half-open interval [start, end) — standard date-range convention
|
|
occurrences = [dt for dt in rule if rs <= dt < re_]
|
|
elif range_start is not None:
|
|
rs = _parse_dt(range_start)
|
|
occurrences = [dt for dt in rule if dt >= rs]
|
|
elif range_end is not None:
|
|
re_ = _parse_dt(range_end)
|
|
occurrences = [dt for dt in rule if dt < re_]
|
|
else:
|
|
occurrences = list(rule)
|
|
|
|
# Convert back to ISO 8601 UTC strings
|
|
return [dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00") for dt in occurrences]
|