fischer-agentkit/src/agentkit/server/admin/context.py

199 lines
7.6 KiB
Python

"""Department-scoped request context (U4 — Admin Console).
This module provides the FastAPI dependency :func:`get_department_context`
that resolves the current user's department membership from the auth DB
and returns a :class:`DepartmentContext` describing which departments
the request should be scoped to.
The dependency is intentionally *per-route* (via ``Depends()``) rather
than a global middleware. This keeps the department lookup close to the
routes that actually need it (skills, KB, usage) and avoids paying the
DB round-trip for whitelisted paths (``/health``, ``/docs``, etc.).
Admin users (``role == "admin"``) bypass department filtering entirely
— :class:`DepartmentContext` is returned with ``is_admin=True`` and an
empty ``department_ids`` list, signalling to the filtering helpers in
:mod:`agentkit.server.admin.filtering` that no scoping should be
applied.
Skills/KB with NO department binding are *global* (visible to all
users, including unauthenticated API-key clients). The filtering
helpers in :mod:`agentkit.server.admin.filtering` are responsible for
preserving this global-visibility invariant.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import aiosqlite
from fastapi import HTTPException, Request
from agentkit.server.auth.models import DEFAULT_AUTH_DB_PATH
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Dataclass
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class DepartmentContext:
"""Request-scoped department context.
Attributes:
user_id: The authenticated user's id, or ``None`` for API-key
clients / unauthenticated requests.
department_ids: The user's department ids (union of all
``user_departments`` rows). Empty for admins (who bypass
filtering) and for users with no department assignments.
is_admin: ``True`` if the user has the ``admin`` role. Admins
bypass department filtering — they see all resources.
"""
user_id: str | None = None
department_ids: list[str] = field(default_factory=list)
is_admin: bool = False
@property
def should_filter(self) -> bool:
"""Return ``True`` if the caller should be department-filtered.
Admins and unauthenticated callers (no user_id) are NOT
filtered — admins see everything, and unauthenticated callers
only see global resources (the filtering helpers handle this by
returning only the global set when ``department_ids`` is empty
and ``is_admin`` is False).
"""
return not self.is_admin
# ---------------------------------------------------------------------------
# DB path resolution (mirrors routes.admin._resolve_db_path)
# ---------------------------------------------------------------------------
def _resolve_db_path(request: Request) -> Path:
"""Resolve the auth DB path from ``app.state`` or the default."""
path = getattr(request.app.state, "auth_db_path", None)
return Path(path) if path else DEFAULT_AUTH_DB_PATH
# ---------------------------------------------------------------------------
# Department-id lookup
# ---------------------------------------------------------------------------
async def _fetch_user_department_ids(db_path: Path, user_id: str) -> list[str]:
"""Return the user's department ids from ``user_departments``.
Only *active* departments are included — if a department is
disabled (``is_active=0``), its bindings no longer grant access to
its resources, so we drop it from the user's effective department
set here. This matches the plan's "disabled department → users
cannot access department resources" rule.
"""
async with aiosqlite.connect(str(db_path)) as db:
cursor = await db.execute(
"SELECT ud.department_id "
"FROM user_departments ud "
"INNER JOIN departments d ON d.id = ud.department_id "
"WHERE ud.user_id = ? AND d.is_active = 1 "
"ORDER BY ud.department_id ASC",
(user_id,),
)
rows = await cursor.fetchall()
return [row[0] for row in rows]
# ---------------------------------------------------------------------------
# FastAPI dependencies
# ---------------------------------------------------------------------------
async def get_department_context(request: Request) -> DepartmentContext:
"""Resolve the :class:`DepartmentContext` for the current request.
Reads ``request.state.current_user`` (set by :class:`AuthMiddleware`):
- **Admin** (``role == "admin"``): returns
``DepartmentContext(user_id, [], is_admin=True)`` — admins bypass
filtering.
- **API-key client / unauthenticated** (``user_id is None``):
returns ``DepartmentContext(None, [], is_admin=False)`` — the
filtering helpers will return only global resources.
- **Regular user**: queries ``user_departments`` for the user's
active department ids and returns
``DepartmentContext(user_id, department_ids, is_admin=False)``.
If ``request.state.current_user`` is missing entirely (e.g. the
auth middleware was not installed), returns an empty context
equivalent to the unauthenticated case.
Fail-closed (KTD-1): if the DB lookup fails for a regular user,
raises ``HTTPException(503)``. Returning an empty list would make
quota enforcement skip the check (fail-open), which is a security
bug. Admins and API-key clients are unaffected (no DB lookup).
"""
current_user: dict[str, Any] | None = getattr(request.state, "current_user", None)
if current_user is None:
return DepartmentContext(user_id=None, department_ids=[], is_admin=False)
user_id = current_user.get("user_id")
role = current_user.get("role")
# Admins bypass department filtering.
if role == "admin":
return DepartmentContext(
user_id=user_id,
department_ids=[],
is_admin=True,
)
# API-key clients have user_id=None — they see only global resources.
if not user_id:
return DepartmentContext(user_id=None, department_ids=[], is_admin=False)
# Regular user: look up their active department ids.
# Fail-closed on DB errors — returning an empty list would bypass
# quota enforcement (fail-open), which is a security vulnerability.
db_path = _resolve_db_path(request)
try:
department_ids = await _fetch_user_department_ids(db_path, user_id)
except Exception:
logger.exception(
"Failed to fetch department ids for user %s — fail-closed (503)",
user_id,
)
raise HTTPException(
status_code=503,
detail={
"error": "department_lookup_failed",
"detail": "Cannot verify department membership — refusing request",
},
) from None
return DepartmentContext(
user_id=user_id,
department_ids=department_ids,
is_admin=False,
)
async def get_department_context_optional(request: Request) -> DepartmentContext | None:
"""Optional variant: returns ``None`` if no ``current_user`` is set.
Use this on routes where auth is optional (e.g. public skill
listing) — the route can short-circuit to "global only" when the
context is ``None``.
"""
current_user: dict[str, Any] | None = getattr(request.state, "current_user", None)
if current_user is None:
return None
return await get_department_context(request)