"""UsageService — read-side aggregations for the usage dashboard (U7). This module provides read-only aggregations over a :class:`UsageStore` for the admin usage dashboard. It is intentionally a thin layer — the store already produces :class:`UsageSummary` aggregations, and this service just shapes them for the dashboard endpoints (timeseries, top-N, CSV/JSON export). The service is a module-level singleton (see :func:`get_usage_service`) so tests can inject a custom instance via :func:`set_usage_service`. """ from __future__ import annotations import csv import io import json import logging from datetime import datetime from typing import Any from agentkit.llm.providers.usage_store import UsageStore, UsageSummary logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _bucket_start(ts: datetime, interval: str) -> datetime: """Return the start of the time bucket containing ``ts``.""" if interval == "hour": return ts.replace(minute=0, second=0, microsecond=0) # Default: day return ts.replace(hour=0, minute=0, second=0, microsecond=0) # --------------------------------------------------------------------------- # Service # --------------------------------------------------------------------------- class UsageService: """Read-side aggregations for the usage dashboard.""" async def get_usage_summary( self, usage_store: UsageStore, department_id: str | None = None, user_id: str | None = None, start_time: datetime | None = None, end_time: datetime | None = None, ) -> dict[str, Any]: """Return a flat usage summary dict. Shape:: { "total_tokens": int, "total_cost": float, "total_requests": int, "by_model": {model: {total_tokens, total_cost, count}, ...}, "by_user": {user_id: {...}, ...}, "by_department": {department_id: {...}, ...}, } """ summary = usage_store.get_usage( start_time=start_time, end_time=end_time, user_id=user_id, department_id=department_id, ) return self._summary_to_dict(summary) async def get_usage_timeseries( self, usage_store: UsageStore, department_id: str | None = None, user_id: str | None = None, start_time: datetime | None = None, end_time: datetime | None = None, interval: str = "day", ) -> list[dict[str, Any]]: """Return a time-bucketed series. Each item has shape ``{timestamp, tokens, cost, requests}``. Buckets with no activity are omitted (callers can fill gaps). """ summary = usage_store.get_usage( start_time=start_time, end_time=end_time, user_id=user_id, department_id=department_id, ) buckets: dict[datetime, dict[str, Any]] = {} for rec in summary.records: try: ts = datetime.fromisoformat(rec.timestamp) except ValueError: continue bucket = _bucket_start(ts, interval) if bucket not in buckets: buckets[bucket] = {"tokens": 0, "cost": 0.0, "requests": 0} buckets[bucket]["tokens"] += rec.total_tokens buckets[bucket]["cost"] += rec.cost buckets[bucket]["requests"] += 1 return [ { "timestamp": bucket.isoformat(), "tokens": data["tokens"], "cost": data["cost"], "requests": data["requests"], } for bucket, data in sorted(buckets.items()) ] async def get_usage_by_model( self, usage_store: UsageStore, department_id: str | None = None, user_id: str | None = None, start_time: datetime | None = None, end_time: datetime | None = None, ) -> list[dict[str, Any]]: """Return a per-model breakdown.""" summary = usage_store.get_usage( start_time=start_time, end_time=end_time, user_id=user_id, department_id=department_id, ) return [ { "model": model, "tokens": data["total_tokens"], "cost": data["total_cost"], "requests": data["count"], } for model, data in sorted(summary.by_model.items()) ] async def get_top_users( self, usage_store: UsageStore, department_id: str | None = None, user_id: str | None = None, start_time: datetime | None = None, end_time: datetime | None = None, limit: int = 10, ) -> list[dict[str, Any]]: """Return the top-N users by total token usage.""" summary = usage_store.get_usage( start_time=start_time, end_time=end_time, user_id=user_id, department_id=department_id, ) rows = [ { "user_id": uid, "tokens": data["total_tokens"], "cost": data["total_cost"], "requests": data["count"], } for uid, data in summary.by_user.items() ] rows.sort(key=lambda r: r["tokens"], reverse=True) return rows[:limit] async def get_top_departments( self, usage_store: UsageStore, department_id: str | None = None, user_id: str | None = None, start_time: datetime | None = None, end_time: datetime | None = None, limit: int = 10, ) -> list[dict[str, Any]]: """Return the top-N departments by total token usage.""" summary = usage_store.get_usage( start_time=start_time, end_time=end_time, user_id=user_id, department_id=department_id, ) rows = [ { "department_id": did, "tokens": data["total_tokens"], "cost": data["total_cost"], "requests": data["count"], } for did, data in summary.by_department.items() ] rows.sort(key=lambda r: r["tokens"], reverse=True) return rows[:limit] async def export_usage( self, usage_store: UsageStore, department_id: str | None = None, user_id: str | None = None, start_time: datetime | None = None, end_time: datetime | None = None, format: str = "csv", ) -> str: """Export raw usage records as CSV or JSON. ``format`` is ``"csv"`` (default) or ``"json"``. """ summary = usage_store.get_usage( start_time=start_time, end_time=end_time, user_id=user_id, department_id=department_id, ) records = [ { "timestamp": rec.timestamp, "agent_name": rec.agent_name, "model": rec.model, "prompt_tokens": rec.prompt_tokens, "completion_tokens": rec.completion_tokens, "total_tokens": rec.total_tokens, "cost": rec.cost, "latency_ms": rec.latency_ms, "user_id": rec.user_id or "", "department_id": rec.department_id or "", } for rec in summary.records ] if format == "json": return json.dumps(records, ensure_ascii=False, indent=2) # Default: CSV out = io.StringIO() if records: writer = csv.DictWriter(out, fieldnames=list(records[0].keys())) writer.writeheader() writer.writerows(records) else: # Empty CSV with just headers writer = csv.DictWriter( out, fieldnames=[ "timestamp", "agent_name", "model", "prompt_tokens", "completion_tokens", "total_tokens", "cost", "latency_ms", "user_id", "department_id", ], ) writer.writeheader() return out.getvalue() # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ @staticmethod def _summary_to_dict(summary: UsageSummary) -> dict[str, Any]: """Convert a :class:`UsageSummary` to a flat dict response.""" return { "total_tokens": summary.total_tokens, "total_cost": summary.total_cost, "total_requests": len(summary.records), "by_model": dict(summary.by_model), "by_user": dict(summary.by_user), "by_department": dict(summary.by_department), } # --------------------------------------------------------------------------- # Module-level singleton (overridable in tests via set_usage_service) # --------------------------------------------------------------------------- _usage_service: UsageService | None = None def get_usage_service() -> UsageService: """Return the process-wide :class:`UsageService` (lazy singleton).""" global _usage_service if _usage_service is None: _usage_service = UsageService() return _usage_service def set_usage_service(service: UsageService | None) -> None: """Inject a custom :class:`UsageService` (used by tests).""" global _usage_service _usage_service = service