fischer-agentkit/tests/integration/admin/test_department_isolation.py

505 lines
18 KiB
Python

"""Integration tests for department-scoped isolation (U4).
Verifies that:
- A user belonging to department A sees only:
- skills bound to department A, AND
- skills with NO department binding (global skills).
- A user belonging to both A and B sees skills from A + B + global.
- A user with NO departments sees only global skills.
- An admin user bypasses filtering entirely (sees everything).
- KB source filtering follows the same rules.
- Removing a user from a department immediately revokes visibility
of that department's bound resources.
The tests mount a minimal FastAPI app with the ``skills`` and
``kb-management`` routers, plus a real auth DB (init_auth_db) so the
``department_skill_bindings`` / ``department_kb_bindings`` tables
exist. The ``get_department_context`` dependency is overridden per
test to simulate different callers (admin, user-in-A, user-in-B,
user-in-both, user-in-none).
"""
from __future__ import annotations
import sqlite3
import uuid
from datetime import datetime, timezone
from pathlib import Path
import pytest
from fastapi import FastAPI, Request
from fastapi.testclient import TestClient
from agentkit.server.admin.context import DepartmentContext
from agentkit.server.auth.models import init_auth_db
from agentkit.server.routes import kb_management as kb_routes
from agentkit.server.routes import skills as skills_routes
from agentkit.skills.base import Skill, SkillConfig
from agentkit.skills.registry import SkillRegistry
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
async def tmp_auth_db(tmp_path: Path) -> Path:
db_path = tmp_path / "dept_isolation.db"
await init_auth_db(db_path)
return db_path
@pytest.fixture
def skill_registry() -> SkillRegistry:
"""A SkillRegistry pre-loaded with three test skills.
- ``hr_skill`` — will be bound to department A.
- ``dev_skill`` — will be bound to department B.
- ``global_skill`` — has NO department binding (global).
"""
registry = SkillRegistry()
for name in ("hr_skill", "dev_skill", "global_skill"):
config = SkillConfig(
name=name,
agent_type="test_type",
task_mode="llm_generate",
description=f"Test skill {name}",
prompt={"identity": name, "instructions": "test"},
)
registry.register(Skill(config=config))
return registry
@pytest.fixture
def kb_store():
"""Reset the module-level KB source store singleton.
The KB routes use a module-level ``_source_store`` singleton. We
reset it before each test so tests don't leak state.
"""
kb_routes._source_store = kb_routes.KnowledgeSourceStore()
return kb_routes._source_store
@pytest.fixture
def app(
tmp_auth_db: Path,
skill_registry: SkillRegistry,
kb_store: kb_routes.KnowledgeSourceStore,
) -> FastAPI:
"""A minimal FastAPI app with skills + kb-management routers.
The ``get_department_context`` dependency is overridden per-test
via ``app.dependency_overrides``. The default override is "no
current user" (unauthenticated) — individual tests install their
own override.
"""
application = FastAPI()
application.state.auth_db_path = str(tmp_auth_db)
application.state.skill_registry = skill_registry
# KB routes read from the module-level _source_store, not app.state,
# so we don't need to install it on app.state.
application.include_router(skills_routes.router, prefix="/api/v1")
application.include_router(kb_routes.router, prefix="/api/v1")
# Default: unauthenticated caller (no current_user on request.state).
# Tests override this via app.dependency_overrides.
application.dependency_overrides[
skills_routes.get_department_context
] = _unauthenticated_context
application.dependency_overrides[
kb_routes.get_department_context
] = _unauthenticated_context
return application
@pytest.fixture
def client(app: FastAPI) -> TestClient:
return TestClient(app)
# ---------------------------------------------------------------------------
# Department-context dependency overrides
# ---------------------------------------------------------------------------
async def _unauthenticated_context(request: Request) -> DepartmentContext:
"""Simulate an unauthenticated caller (no current_user)."""
return DepartmentContext(user_id=None, department_ids=[], is_admin=False)
def _ctx_for_user(
user_id: str | None,
department_ids: list[str],
is_admin: bool = False,
):
"""Build a dependency override returning a fixed DepartmentContext."""
async def _override(request: Request) -> DepartmentContext:
return DepartmentContext(
user_id=user_id,
department_ids=list(department_ids),
is_admin=is_admin,
)
return _override
def _set_caller(
app: FastAPI,
user_id: str | None,
department_ids: list[str],
is_admin: bool = False,
) -> None:
"""Install the dependency overrides for both routers."""
override = _ctx_for_user(user_id, department_ids, is_admin)
app.dependency_overrides[skills_routes.get_department_context] = override
app.dependency_overrides[kb_routes.get_department_context] = override
# ---------------------------------------------------------------------------
# DB helpers (synchronous sqlite3 — no event-loop mixing with TestClient)
# ---------------------------------------------------------------------------
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _create_department(db_path: Path, name: str) -> str:
dept_id = str(uuid.uuid4())
with sqlite3.connect(str(db_path)) as db:
db.execute(
"INSERT INTO departments (id, name, description, is_active, created_at) "
"VALUES (?, ?, ?, ?, ?)",
(dept_id, name, "", 1, _now_iso()),
)
db.commit()
return dept_id
def _bind_skill(db_path: Path, department_id: str, skill_name: str) -> None:
with sqlite3.connect(str(db_path)) as db:
db.execute(
"INSERT INTO department_skill_bindings (id, department_id, skill_name, created_at) "
"VALUES (?, ?, ?, ?)",
(str(uuid.uuid4()), department_id, skill_name, _now_iso()),
)
db.commit()
def _bind_kb(db_path: Path, department_id: str, kb_source_id: str) -> None:
with sqlite3.connect(str(db_path)) as db:
db.execute(
"INSERT INTO department_kb_bindings (id, department_id, kb_source_id, created_at) "
"VALUES (?, ?, ?, ?)",
(str(uuid.uuid4()), department_id, kb_source_id, _now_iso()),
)
db.commit()
def _assign_user_to_department(
db_path: Path, user_id: str, department_id: str
) -> None:
with sqlite3.connect(str(db_path)) as db:
db.execute(
"INSERT INTO user_departments (user_id, department_id, created_at) "
"VALUES (?, ?, ?)",
(user_id, department_id, _now_iso()),
)
db.commit()
def _remove_user_from_department(
db_path: Path, user_id: str, department_id: str
) -> None:
with sqlite3.connect(str(db_path)) as db:
db.execute(
"DELETE FROM user_departments WHERE user_id = ? AND department_id = ?",
(user_id, department_id),
)
db.commit()
# ---------------------------------------------------------------------------
# Test fixtures: departments A and B with skill/KB bindings
# ---------------------------------------------------------------------------
@pytest.fixture
def dept_setup(tmp_auth_db: Path, kb_store: kb_routes.KnowledgeSourceStore):
"""Create departments A and B, bind skills and KB sources.
Layout:
- Department A: bound to ``hr_skill`` and KB source ``hr_kb``
- Department B: bound to ``dev_skill`` and KB source ``dev_kb``
- ``global_skill`` and ``global_kb`` have NO bindings (global)
"""
dept_a = _create_department(tmp_auth_db, "HR")
dept_b = _create_department(tmp_auth_db, "Dev")
_bind_skill(tmp_auth_db, dept_a, "hr_skill")
_bind_skill(tmp_auth_db, dept_b, "dev_skill")
# global_skill intentionally has no binding.
# Create KB sources in the in-memory store.
hr_kb = kb_store.add_source("HR KB", "local", {})
dev_kb = kb_store.add_source("Dev KB", "local", {})
global_kb = kb_store.add_source("Global KB", "local", {})
_bind_kb(tmp_auth_db, dept_a, hr_kb.id)
_bind_kb(tmp_auth_db, dept_b, dev_kb.id)
# global_kb intentionally has no binding.
return {
"dept_a": dept_a,
"dept_b": dept_b,
"hr_kb_id": hr_kb.id,
"dev_kb_id": dev_kb.id,
"global_kb_id": global_kb.id,
}
# ---------------------------------------------------------------------------
# Skill isolation tests
# ---------------------------------------------------------------------------
class TestSkillIsolation:
"""GET /api/v1/skills must respect department bindings."""
def test_user_in_dept_a_sees_hr_and_global_not_dev(
self, app: FastAPI, client: TestClient, dept_setup: dict
):
_set_caller(app, user_id="user-a", department_ids=[dept_setup["dept_a"]])
resp = client.get("/api/v1/skills")
assert resp.status_code == 200
names = {s["name"] for s in resp.json()}
assert "hr_skill" in names
assert "global_skill" in names
assert "dev_skill" not in names
def test_admin_sees_all_skills(
self, app: FastAPI, client: TestClient, dept_setup: dict
):
_set_caller(
app,
user_id="admin-1",
department_ids=[],
is_admin=True,
)
resp = client.get("/api/v1/skills")
assert resp.status_code == 200
names = {s["name"] for s in resp.json()}
assert names == {"hr_skill", "dev_skill", "global_skill"}
def test_user_with_no_departments_sees_only_global(
self, app: FastAPI, client: TestClient, dept_setup: dict
):
_set_caller(app, user_id="lonely-user", department_ids=[])
resp = client.get("/api/v1/skills")
assert resp.status_code == 200
names = {s["name"] for s in resp.json()}
assert names == {"global_skill"}
def test_user_in_both_departments_sees_all_bound_plus_global(
self, app: FastAPI, client: TestClient, dept_setup: dict
):
_set_caller(
app,
user_id="user-ab",
department_ids=[dept_setup["dept_a"], dept_setup["dept_b"]],
)
resp = client.get("/api/v1/skills")
assert resp.status_code == 200
names = {s["name"] for s in resp.json()}
assert names == {"hr_skill", "dev_skill", "global_skill"}
def test_unauthenticated_caller_sees_only_global(
self, app: FastAPI, client: TestClient, dept_setup: dict
):
# Default override is unauthenticated.
resp = client.get("/api/v1/skills")
assert resp.status_code == 200
names = {s["name"] for s in resp.json()}
assert names == {"global_skill"}
def test_user_removed_from_dept_a_loses_hr_skill(
self,
app: FastAPI,
client: TestClient,
tmp_auth_db: Path,
dept_setup: dict,
):
"""After removal, the user no longer sees hr_skill."""
user_id = "user-removal"
# Initially assign to dept A.
_assign_user_to_department(tmp_auth_db, user_id, dept_setup["dept_a"])
_set_caller(app, user_id=user_id, department_ids=[dept_setup["dept_a"]])
resp = client.get("/api/v1/skills")
names = {s["name"] for s in resp.json()}
assert "hr_skill" in names
# Remove from dept A — simulate the context change by updating
# the override (in production, the next request's
# get_department_context would re-query user_departments and
# find no rows).
_remove_user_from_department(tmp_auth_db, user_id, dept_setup["dept_a"])
_set_caller(app, user_id=user_id, department_ids=[])
resp = client.get("/api/v1/skills")
names = {s["name"] for s in resp.json()}
assert "hr_skill" not in names
assert "global_skill" in names
# ---------------------------------------------------------------------------
# KB source isolation tests
# ---------------------------------------------------------------------------
class TestKbSourceIsolation:
"""GET /api/v1/kb-management/sources must respect department bindings."""
def test_user_in_dept_a_sees_hr_and_global_kb(
self, app: FastAPI, client: TestClient, dept_setup: dict
):
_set_caller(app, user_id="user-a", department_ids=[dept_setup["dept_a"]])
resp = client.get("/api/v1/kb-management/sources")
assert resp.status_code == 200
ids = {s["id"] for s in resp.json()["sources"]}
assert dept_setup["hr_kb_id"] in ids
assert dept_setup["global_kb_id"] in ids
assert dept_setup["dev_kb_id"] not in ids
def test_admin_sees_all_kb_sources(
self, app: FastAPI, client: TestClient, dept_setup: dict
):
_set_caller(
app,
user_id="admin-1",
department_ids=[],
is_admin=True,
)
resp = client.get("/api/v1/kb-management/sources")
assert resp.status_code == 200
ids = {s["id"] for s in resp.json()["sources"]}
assert ids == {
dept_setup["hr_kb_id"],
dept_setup["dev_kb_id"],
dept_setup["global_kb_id"],
}
def test_user_with_no_departments_sees_only_global_kb(
self, app: FastAPI, client: TestClient, dept_setup: dict
):
_set_caller(app, user_id="lonely-user", department_ids=[])
resp = client.get("/api/v1/kb-management/sources")
assert resp.status_code == 200
ids = {s["id"] for s in resp.json()["sources"]}
assert ids == {dept_setup["global_kb_id"]}
def test_user_in_both_departments_sees_all_kb(
self, app: FastAPI, client: TestClient, dept_setup: dict
):
_set_caller(
app,
user_id="user-ab",
department_ids=[dept_setup["dept_a"], dept_setup["dept_b"]],
)
resp = client.get("/api/v1/kb-management/sources")
assert resp.status_code == 200
ids = {s["id"] for s in resp.json()["sources"]}
assert ids == {
dept_setup["hr_kb_id"],
dept_setup["dev_kb_id"],
dept_setup["global_kb_id"],
}
def test_unauthenticated_caller_sees_only_global_kb(
self, app: FastAPI, client: TestClient, dept_setup: dict
):
resp = client.get("/api/v1/kb-management/sources")
assert resp.status_code == 200
ids = {s["id"] for s in resp.json()["sources"]}
assert ids == {dept_setup["global_kb_id"]}
# ---------------------------------------------------------------------------
# KB document isolation tests
# ---------------------------------------------------------------------------
class TestKbDocumentIsolation:
"""GET /api/v1/kb-management/documents must respect department bindings."""
@pytest.fixture
def docs_setup(self, dept_setup: dict, kb_store: kb_routes.KnowledgeSourceStore):
"""Add one document to each KB source."""
for source_id_key, filename in [
("hr_kb_id", "hr_doc.txt"),
("dev_kb_id", "dev_doc.txt"),
("global_kb_id", "global_doc.txt"),
]:
source_id = dept_setup[source_id_key]
doc = kb_routes.UploadedDocument(
document_id=str(uuid.uuid4()),
filename=filename,
source_id=source_id,
chunks=1,
status="indexed",
)
kb_store.add_document(doc)
return dept_setup
def test_user_in_dept_a_sees_only_hr_and_global_docs(
self, app: FastAPI, client: TestClient, docs_setup: dict
):
_set_caller(app, user_id="user-a", department_ids=[docs_setup["dept_a"]])
resp = client.get("/api/v1/kb-management/documents")
assert resp.status_code == 200
filenames = {d["filename"] for d in resp.json()["documents"]}
assert "hr_doc.txt" in filenames
assert "global_doc.txt" in filenames
assert "dev_doc.txt" not in filenames
def test_admin_sees_all_documents(
self, app: FastAPI, client: TestClient, docs_setup: dict
):
_set_caller(
app,
user_id="admin-1",
department_ids=[],
is_admin=True,
)
resp = client.get("/api/v1/kb-management/documents")
assert resp.status_code == 200
filenames = {d["filename"] for d in resp.json()["documents"]}
assert filenames == {"hr_doc.txt", "dev_doc.txt", "global_doc.txt"}
def test_user_in_dept_a_filtering_by_dev_source_returns_empty(
self, app: FastAPI, client: TestClient, docs_setup: dict
):
"""A user in dept A asking for dept B's source_id gets nothing."""
_set_caller(app, user_id="user-a", department_ids=[docs_setup["dept_a"]])
resp = client.get(
"/api/v1/kb-management/documents",
params={"source_id": docs_setup["dev_kb_id"]},
)
assert resp.status_code == 200
assert resp.json()["documents"] == []
def test_user_in_dept_a_can_query_own_source(
self, app: FastAPI, client: TestClient, docs_setup: dict
):
_set_caller(app, user_id="user-a", department_ids=[docs_setup["dept_a"]])
resp = client.get(
"/api/v1/kb-management/documents",
params={"source_id": docs_setup["hr_kb_id"]},
)
assert resp.status_code == 200
filenames = {d["filename"] for d in resp.json()["documents"]}
assert filenames == {"hr_doc.txt"}