fischer-agentkit/tests/integration/admin/test_security_isolation.py

484 lines
18 KiB
Python

"""Security isolation integration tests for department-scoped resources (U10).
Verifies department-based access control end-to-end through the full
request stack (route → DepartmentContext → filtering → response):
- User in dept A cannot see skills/KB bound to dept B.
- User in depts A+B sees the union of both departments' resources.
- Admin sees all resources regardless of department bindings.
- User removed from a department loses access to that department's
resources.
- Disabled department's resources are not visible to its users.
- API-key client (no user_id) sees only global resources.
- Non-admin user gets 403 on all admin endpoints.
The tests mount a minimal FastAPI app with the ``skills`` and
``kb-management`` routers (for isolation verification) plus the
``admin_router`` (for the 403 checks). The ``get_department_context``
dependency is overridden per-test to simulate different callers.
"""
from __future__ import annotations
import sqlite3
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import pytest
from fastapi import FastAPI, HTTPException, Request
from fastapi.testclient import TestClient
from agentkit.server.admin.context import DepartmentContext
from agentkit.server.auth.models import init_auth_db
from agentkit.server.routes import admin as admin_routes_module
from agentkit.server.routes import kb_management as kb_routes
from agentkit.server.routes import skills as skills_routes
from agentkit.skills.base import Skill, SkillConfig
from agentkit.skills.registry import SkillRegistry
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
async def tmp_auth_db(tmp_path: Path) -> Path:
db_path = tmp_path / "security_isolation.db"
await init_auth_db(db_path)
return db_path
@pytest.fixture
def skill_registry() -> SkillRegistry:
"""A SkillRegistry pre-loaded with three test skills.
- ``hr_skill`` — will be bound to department A.
- ``eng_skill`` — will be bound to department B.
- ``global_skill`` — has NO department binding (global).
"""
registry = SkillRegistry()
for name in ("hr_skill", "eng_skill", "global_skill"):
config = SkillConfig(
name=name,
agent_type="test_type",
task_mode="llm_generate",
description=f"Test skill {name}",
prompt={"identity": name, "instructions": "test"},
)
registry.register(Skill(config=config))
return registry
@pytest.fixture
def kb_store():
"""Reset the module-level KB source store singleton."""
kb_routes._source_store = kb_routes.KnowledgeSourceStore()
return kb_routes._source_store
@pytest.fixture
def app(
tmp_auth_db: Path,
skill_registry: SkillRegistry,
kb_store: kb_routes.KnowledgeSourceStore,
) -> FastAPI:
"""A FastAPI app with skills + kb-management + admin routers mounted.
The ``get_department_context`` dependency is overridden per-test via
``app.dependency_overrides``. The default override is
"unauthenticated caller".
"""
application = FastAPI()
application.state.auth_db_path = str(tmp_auth_db)
application.state.skill_registry = skill_registry
application.include_router(skills_routes.router, prefix="/api/v1")
application.include_router(kb_routes.router, prefix="/api/v1")
application.include_router(admin_routes_module.admin_router, prefix="/api/v1")
# Default: unauthenticated caller.
application.dependency_overrides[skills_routes.get_department_context] = (
_unauthenticated_context
)
application.dependency_overrides[kb_routes.get_department_context] = _unauthenticated_context
# Default: admin access allowed (used only for the admin 403 test
# which overrides this).
application.dependency_overrides[admin_routes_module._require_admin] = lambda: (
_make_admin_user()
)
return application
@pytest.fixture
def client(app: FastAPI) -> TestClient:
return TestClient(app)
# ---------------------------------------------------------------------------
# Department-context dependency overrides
# ---------------------------------------------------------------------------
async def _unauthenticated_context(request: Request) -> DepartmentContext:
"""Simulate an unauthenticated caller (no current_user)."""
return DepartmentContext(user_id=None, department_ids=[], is_admin=False)
def _ctx_for_user(
user_id: str | None,
department_ids: list[str],
is_admin: bool = False,
):
"""Build a dependency override returning a fixed DepartmentContext."""
async def _override(request: Request) -> DepartmentContext:
return DepartmentContext(
user_id=user_id,
department_ids=list(department_ids),
is_admin=is_admin,
)
return _override
def _set_caller(
app: FastAPI,
user_id: str | None,
department_ids: list[str],
is_admin: bool = False,
) -> None:
"""Install the dependency overrides for both skills + kb routers."""
override = _ctx_for_user(user_id, department_ids, is_admin)
app.dependency_overrides[skills_routes.get_department_context] = override
app.dependency_overrides[kb_routes.get_department_context] = override
def _make_admin_user() -> dict[str, Any]:
return {"user_id": "admin-1", "username": "admin", "role": "admin"}
def _raise_forbidden() -> dict[str, Any]:
raise HTTPException(status_code=403, detail="Admin permission required")
# ---------------------------------------------------------------------------
# DB helpers (synchronous sqlite3 — no event-loop mixing with TestClient)
# ---------------------------------------------------------------------------
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _create_department(db_path: Path, name: str) -> str:
dept_id = str(uuid.uuid4())
with sqlite3.connect(str(db_path)) as db:
db.execute(
"INSERT INTO departments (id, name, description, is_active, created_at) "
"VALUES (?, ?, ?, ?, ?)",
(dept_id, name, "", 1, _now_iso()),
)
db.commit()
return dept_id
def _disable_department(db_path: Path, department_id: str) -> None:
with sqlite3.connect(str(db_path)) as db:
db.execute(
"UPDATE departments SET is_active = 0 WHERE id = ?",
(department_id,),
)
db.commit()
def _bind_skill(db_path: Path, department_id: str, skill_name: str) -> None:
with sqlite3.connect(str(db_path)) as db:
db.execute(
"INSERT INTO department_skill_bindings (id, department_id, skill_name, created_at) "
"VALUES (?, ?, ?, ?)",
(str(uuid.uuid4()), department_id, skill_name, _now_iso()),
)
db.commit()
def _bind_kb(db_path: Path, department_id: str, kb_source_id: str) -> None:
with sqlite3.connect(str(db_path)) as db:
db.execute(
"INSERT INTO department_kb_bindings (id, department_id, kb_source_id, created_at) "
"VALUES (?, ?, ?, ?)",
(str(uuid.uuid4()), department_id, kb_source_id, _now_iso()),
)
db.commit()
def _assign_user_to_department(db_path: Path, user_id: str, department_id: str) -> None:
with sqlite3.connect(str(db_path)) as db:
db.execute(
"INSERT INTO user_departments (user_id, department_id, created_at) VALUES (?, ?, ?)",
(user_id, department_id, _now_iso()),
)
db.commit()
def _remove_user_from_department(db_path: Path, user_id: str, department_id: str) -> None:
with sqlite3.connect(str(db_path)) as db:
db.execute(
"DELETE FROM user_departments WHERE user_id = ? AND department_id = ?",
(user_id, department_id),
)
db.commit()
# ---------------------------------------------------------------------------
# Test fixture: departments A and B with skill/KB bindings
# ---------------------------------------------------------------------------
@pytest.fixture
def dept_setup(tmp_auth_db: Path, kb_store: kb_routes.KnowledgeSourceStore):
"""Create departments A and B, bind skills and KB sources.
Layout:
- Department A: bound to ``hr_skill`` and KB source ``hr_kb``
- Department B: bound to ``eng_skill`` and KB source ``eng_kb``
- ``global_skill`` and ``global_kb`` have NO bindings (global)
"""
dept_a = _create_department(tmp_auth_db, "HR")
dept_b = _create_department(tmp_auth_db, "Engineering")
_bind_skill(tmp_auth_db, dept_a, "hr_skill")
_bind_skill(tmp_auth_db, dept_b, "eng_skill")
# global_skill intentionally has no binding.
# Create KB sources in the in-memory store.
hr_kb = kb_store.add_source("HR KB", "local", {})
eng_kb = kb_store.add_source("Engineering KB", "local", {})
global_kb = kb_store.add_source("Global KB", "local", {})
_bind_kb(tmp_auth_db, dept_a, hr_kb.id)
_bind_kb(tmp_auth_db, dept_b, eng_kb.id)
# global_kb intentionally has no binding.
return {
"dept_a": dept_a,
"dept_b": dept_b,
"hr_kb_id": hr_kb.id,
"eng_kb_id": eng_kb.id,
"global_kb_id": global_kb.id,
}
# ---------------------------------------------------------------------------
# Department isolation tests
# ---------------------------------------------------------------------------
class TestDepartmentIsolation:
"""Security tests for department-based resource isolation."""
def test_user_cannot_see_other_department_skills(
self, app: FastAPI, client: TestClient, dept_setup: dict
):
"""User in dept A cannot see skills bound to dept B."""
_set_caller(app, user_id="alice", department_ids=[dept_setup["dept_a"]])
resp = client.get("/api/v1/skills")
assert resp.status_code == 200
names = {s["name"] for s in resp.json()}
# Alice sees hr_skill (dept A) + global_skill, NOT eng_skill (dept B).
assert "hr_skill" in names
assert "global_skill" in names
assert "eng_skill" not in names
def test_user_cannot_see_other_department_kb(
self, app: FastAPI, client: TestClient, dept_setup: dict
):
"""User in dept A cannot see KB sources bound to dept B."""
_set_caller(app, user_id="alice", department_ids=[dept_setup["dept_a"]])
resp = client.get("/api/v1/kb-management/sources")
assert resp.status_code == 200
ids = {s["id"] for s in resp.json()["sources"]}
# Alice sees hr_kb (dept A) + global_kb, NOT eng_kb (dept B).
assert dept_setup["hr_kb_id"] in ids
assert dept_setup["global_kb_id"] in ids
assert dept_setup["eng_kb_id"] not in ids
def test_user_in_multiple_departments_sees_union(
self, app: FastAPI, client: TestClient, dept_setup: dict
):
"""User in depts A+B sees skills from both."""
_set_caller(
app,
user_id="alice",
department_ids=[dept_setup["dept_a"], dept_setup["dept_b"]],
)
resp = client.get("/api/v1/skills")
assert resp.status_code == 200
names = {s["name"] for s in resp.json()}
# Alice sees hr_skill (A) + eng_skill (B) + global_skill.
assert names == {"hr_skill", "eng_skill", "global_skill"}
def test_admin_sees_all_resources(self, app: FastAPI, client: TestClient, dept_setup: dict):
"""Admin user sees all resources regardless of department bindings."""
_set_caller(
app,
user_id="admin-1",
department_ids=[],
is_admin=True,
)
# Admin sees all skills.
resp = client.get("/api/v1/skills")
assert resp.status_code == 200
names = {s["name"] for s in resp.json()}
assert names == {"hr_skill", "eng_skill", "global_skill"}
# Admin sees all KB sources.
resp = client.get("/api/v1/kb-management/sources")
assert resp.status_code == 200
ids = {s["id"] for s in resp.json()["sources"]}
assert ids == {
dept_setup["hr_kb_id"],
dept_setup["eng_kb_id"],
dept_setup["global_kb_id"],
}
def test_user_removed_from_department_loses_access(
self,
app: FastAPI,
client: TestClient,
tmp_auth_db: Path,
dept_setup: dict,
):
"""User removed from dept A can no longer see dept A's skills."""
user_id = "user-removal"
dept_a = dept_setup["dept_a"]
# Initially assign to dept A.
_assign_user_to_department(tmp_auth_db, user_id, dept_a)
_set_caller(app, user_id=user_id, department_ids=[dept_a])
resp = client.get("/api/v1/skills")
names = {s["name"] for s in resp.json()}
assert "hr_skill" in names
# Remove from dept A — simulate the context change.
_remove_user_from_department(tmp_auth_db, user_id, dept_a)
_set_caller(app, user_id=user_id, department_ids=[])
resp = client.get("/api/v1/skills")
names = {s["name"] for s in resp.json()}
assert "hr_skill" not in names
assert "global_skill" in names
def test_disabled_department_excluded(
self,
app: FastAPI,
client: TestClient,
tmp_auth_db: Path,
dept_setup: dict,
):
"""Disabled department's resources are not visible to its users.
The ``_fetch_user_department_ids`` helper in
:mod:`agentkit.server.admin.context` filters out disabled
departments (``is_active=0``). We simulate this by disabling
dept A in the DB and updating the caller's context to reflect
the now-empty department list.
"""
user_id = "user-disabled-dept"
dept_a = dept_setup["dept_a"]
# Initially assign to dept A and verify access.
_assign_user_to_department(tmp_auth_db, user_id, dept_a)
_set_caller(app, user_id=user_id, department_ids=[dept_a])
resp = client.get("/api/v1/skills")
names = {s["name"] for s in resp.json()}
assert "hr_skill" in names
# Disable dept A in the DB.
_disable_department(tmp_auth_db, dept_a)
# Simulate the context change: the next request's
# ``get_department_context`` would re-query user_departments
# and find no *active* departments.
_set_caller(app, user_id=user_id, department_ids=[])
resp = client.get("/api/v1/skills")
names = {s["name"] for s in resp.json()}
assert "hr_skill" not in names
assert "global_skill" in names
def test_api_key_client_sees_only_global(
self, app: FastAPI, client: TestClient, dept_setup: dict
):
"""API key client (no user_id) sees only global resources."""
# API-key client → user_id=None, department_ids=[], is_admin=False.
_set_caller(app, user_id=None, department_ids=[], is_admin=False)
# Skills: only global_skill.
resp = client.get("/api/v1/skills")
assert resp.status_code == 200
names = {s["name"] for s in resp.json()}
assert names == {"global_skill"}
# KB sources: only global_kb.
resp = client.get("/api/v1/kb-management/sources")
assert resp.status_code == 200
ids = {s["id"] for s in resp.json()["sources"]}
assert ids == {dept_setup["global_kb_id"]}
# ---------------------------------------------------------------------------
# Non-admin access tests
# ---------------------------------------------------------------------------
class TestNonAdminAccess:
"""Non-admin user gets 403 on all admin endpoints."""
def test_non_admin_cannot_access_admin_endpoints(self, app: FastAPI, tmp_auth_db: Path):
"""Non-admin user gets 403 on a representative sample of admin endpoints."""
# Override _require_admin to raise 403 (simulating a non-admin caller).
app.dependency_overrides[admin_routes_module._require_admin] = _raise_forbidden
client = TestClient(app)
# Representative sample of admin endpoints across all resource types.
endpoints = [
("GET", "/api/v1/admin/departments"),
("POST", "/api/v1/admin/departments"),
("GET", "/api/v1/admin/users"),
("POST", "/api/v1/admin/users"),
("GET", "/api/v1/admin/llm/providers"),
("POST", "/api/v1/admin/llm/providers"),
("GET", "/api/v1/admin/usage/summary"),
("GET", "/api/v1/admin/usage/timeseries"),
("GET", "/api/v1/admin/usage/by-model"),
("GET", "/api/v1/admin/usage/top-users"),
("GET", "/api/v1/admin/usage/export"),
]
for method, path in endpoints:
if method == "GET":
resp = client.get(path)
elif method == "POST":
# Use a minimal body where required; we expect 403 before
# body validation runs.
body: dict[str, Any] | None = {}
if path == "/api/v1/admin/departments":
body = {"name": "X"}
elif path == "/api/v1/admin/users":
body = {
"username": "x",
"email": "x@x.com",
"password": "Pw123!",
}
elif path == "/api/v1/admin/llm/providers":
body = {
"name": "x",
"type": "openai",
"api_key": "sk-x",
}
resp = client.post(path, json=body)
assert resp.status_code == 403, (
f"{method} {path} returned {resp.status_code}, expected 403"
)