fischer-agentkit/tests/integration/admin/test_user_routes.py

615 lines
23 KiB
Python

"""Integration tests for the user admin routes (U3).
Uses FastAPI TestClient with a test app that mounts only the
``admin_router`` from ``routes.admin``. The ``_require_admin`` dependency
is overridden via ``app.dependency_overrides`` so the tests don't need
real JWTs — they can simulate admin and non-admin callers directly.
The SessionService singleton is also installed against the temp DB so
that ``reset_password`` can revoke sessions end-to-end.
"""
from __future__ import annotations
import sqlite3
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import pytest
from fastapi import FastAPI, HTTPException
from fastapi.testclient import TestClient
from agentkit.server.auth.models import init_auth_db
from agentkit.server.auth.session_service import SessionService, set_session_service
from agentkit.server.routes import admin as admin_routes_module
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
async def tmp_auth_db(tmp_path: Path) -> Path:
db_path = tmp_path / "admin_users.db"
await init_auth_db(db_path)
return db_path
@pytest.fixture
def session_service(tmp_auth_db: Path):
"""Install a SessionService singleton backed by the temp DB.
Required so that ``UserService.reset_password`` can find the
SessionService via ``get_session_service()`` and revoke sessions.
"""
svc = SessionService(db_path=tmp_auth_db)
set_session_service(svc)
yield svc
set_session_service(None)
def _make_admin_user() -> dict[str, Any]:
return {"user_id": "admin-1", "username": "admin", "role": "admin"}
@pytest.fixture
def admin_app(tmp_auth_db: Path) -> FastAPI:
"""A minimal FastAPI app with only the admin router mounted.
The ``_require_admin`` dependency is overridden to return a fake admin
user. Individual tests can re-override it to simulate a non-admin.
"""
app = FastAPI()
app.state.auth_db_path = str(tmp_auth_db)
app.include_router(admin_routes_module.admin_router, prefix="/api/v1")
# Default: allow admin access.
app.dependency_overrides[admin_routes_module._require_admin] = lambda: _make_admin_user()
return app
@pytest.fixture
def admin_client(
admin_app: FastAPI, session_service: SessionService
) -> TestClient:
"""TestClient with admin access and SessionService installed.
The ``session_service`` fixture is listed as a dependency so that
the singleton is installed before any request runs.
"""
return TestClient(admin_app)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _create_department(client: TestClient, name: str, description: str = "") -> dict:
resp = client.post(
"/api/v1/admin/departments",
json={"name": name, "description": description},
)
assert resp.status_code == 201, resp.text
return resp.json()
def _create_user(
client: TestClient,
*,
username: str = "alice",
email: str = "alice@example.com",
password: str = "Secret123!",
role: str = "member",
department_ids: list[str] | None = None,
) -> dict:
payload: dict[str, Any] = {
"username": username,
"email": email,
"password": password,
"role": role,
}
if department_ids is not None:
payload["department_ids"] = department_ids
resp = client.post("/api/v1/admin/users", json=payload)
assert resp.status_code == 201, resp.text
return resp.json()
def _insert_session(db_path: Path, user_id: str, session_id: str | None = None) -> str:
"""Insert a minimal active auth_sessions row synchronously."""
session_id = session_id or str(uuid.uuid4())
now = datetime.now(timezone.utc).isoformat()
with sqlite3.connect(str(db_path)) as db:
db.execute(
"INSERT INTO auth_sessions "
"(id, user_id, refresh_token_hash, device_fingerprint, device_label, "
" ip, user_agent, auth_provider, created_at, last_active_at, expires_at, "
" revoked, revoked_reason, previous_session_id) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
session_id,
user_id,
f"hash-{session_id[:8]}",
"fp-test",
"Test device",
"127.0.0.1",
"test-agent",
"local",
now,
now,
now,
0,
None,
None,
),
)
db.commit()
return session_id
def _count_active_sessions(db_path: Path, user_id: str) -> int:
with sqlite3.connect(str(db_path)) as db:
cursor = db.execute(
"SELECT COUNT(*) FROM auth_sessions WHERE user_id = ? AND revoked = 0",
(user_id,),
)
return int(cursor.fetchone()[0])
# ---------------------------------------------------------------------------
# POST /admin/users
# ---------------------------------------------------------------------------
class TestCreateUser:
def test_create_returns_201_with_user_dict(self, admin_client: TestClient):
resp = admin_client.post(
"/api/v1/admin/users",
json={
"username": "alice",
"email": "alice@example.com",
"password": "Secret123!",
"role": "member",
},
)
assert resp.status_code == 201
body = resp.json()
assert body["id"]
assert body["username"] == "alice"
assert body["email"] == "alice@example.com"
assert body["role"] == "member"
assert body["is_active"] is True
assert body["departments"] == []
# password_hash must NOT be in the response.
assert "password_hash" not in body
def test_create_with_department_ids_assigns_departments(
self, admin_client: TestClient
):
eng = _create_department(admin_client, "Engineering")
resp = admin_client.post(
"/api/v1/admin/users",
json={
"username": "alice",
"email": "alice@example.com",
"password": "Secret123!",
"department_ids": [eng["id"]],
},
)
assert resp.status_code == 201
body = resp.json()
assert len(body["departments"]) == 1
assert body["departments"][0]["id"] == eng["id"]
def test_create_duplicate_username_returns_409(self, admin_client: TestClient):
_create_user(admin_client, username="alice", email="alice@example.com")
resp = admin_client.post(
"/api/v1/admin/users",
json={
"username": "alice",
"email": "other@example.com",
"password": "Secret123!",
},
)
assert resp.status_code == 409
def test_create_duplicate_email_returns_409(self, admin_client: TestClient):
_create_user(admin_client, username="alice", email="alice@example.com")
resp = admin_client.post(
"/api/v1/admin/users",
json={
"username": "alice2",
"email": "alice@example.com",
"password": "Secret123!",
},
)
assert resp.status_code == 409
def test_create_with_nonexistent_department_returns_404(
self, admin_client: TestClient
):
resp = admin_client.post(
"/api/v1/admin/users",
json={
"username": "alice",
"email": "alice@example.com",
"password": "Secret123!",
"department_ids": [str(uuid.uuid4())],
},
)
assert resp.status_code == 404
def test_non_admin_returns_403(self, admin_app: FastAPI):
admin_app.dependency_overrides[admin_routes_module._require_admin] = _raise_forbidden
client = TestClient(admin_app)
resp = client.post(
"/api/v1/admin/users",
json={
"username": "alice",
"email": "alice@example.com",
"password": "Secret123!",
},
)
assert resp.status_code == 403
# ---------------------------------------------------------------------------
# GET /admin/users
# ---------------------------------------------------------------------------
class TestListUsers:
def test_list_returns_all_users(self, admin_client: TestClient):
_create_user(admin_client, username="alice", email="alice@example.com")
_create_user(admin_client, username="bob", email="bob@example.com")
resp = admin_client.get("/api/v1/admin/users")
assert resp.status_code == 200
names = {u["username"] for u in resp.json()}
assert names == {"alice", "bob"}
def test_list_excludes_inactive_when_asked(self, admin_client: TestClient):
alice = _create_user(admin_client, username="alice", email="alice@example.com")
_create_user(admin_client, username="bob", email="bob@example.com")
# Soft-delete alice.
admin_client.delete(f"/api/v1/admin/users/{alice['id']}")
resp = admin_client.get(
"/api/v1/admin/users", params={"include_inactive": False}
)
assert resp.status_code == 200
names = {u["username"] for u in resp.json()}
assert names == {"bob"}
def test_list_filtered_by_department(self, admin_client: TestClient):
eng = _create_department(admin_client, "Engineering")
hr = _create_department(admin_client, "HR")
alice = _create_user(
admin_client, username="alice", email="alice@example.com"
)
bob = _create_user(admin_client, username="bob", email="bob@example.com")
admin_client.post(f"/api/v1/admin/users/{alice['id']}/departments/{eng['id']}")
admin_client.post(f"/api/v1/admin/users/{bob['id']}/departments/{hr['id']}")
resp = admin_client.get(
"/api/v1/admin/users", params={"department_id": eng["id"]}
)
assert resp.status_code == 200
users = resp.json()
assert len(users) == 1
assert users[0]["username"] == "alice"
def test_non_admin_returns_403(self, admin_app: FastAPI):
admin_app.dependency_overrides[admin_routes_module._require_admin] = _raise_forbidden
client = TestClient(admin_app)
resp = client.get("/api/v1/admin/users")
assert resp.status_code == 403
# ---------------------------------------------------------------------------
# GET /admin/users/{user_id}
# ---------------------------------------------------------------------------
class TestGetUser:
def test_get_returns_user_with_departments(self, admin_client: TestClient):
eng = _create_department(admin_client, "Engineering")
alice = _create_user(
admin_client,
username="alice",
email="alice@example.com",
department_ids=[eng["id"]],
)
resp = admin_client.get(f"/api/v1/admin/users/{alice['id']}")
assert resp.status_code == 200
body = resp.json()
assert body["username"] == "alice"
assert len(body["departments"]) == 1
assert body["departments"][0]["name"] == "Engineering"
def test_get_unknown_id_returns_404(self, admin_client: TestClient):
resp = admin_client.get(f"/api/v1/admin/users/{uuid.uuid4()}")
assert resp.status_code == 404
def test_non_admin_returns_403(self, admin_app: FastAPI):
admin_app.dependency_overrides[admin_routes_module._require_admin] = _raise_forbidden
client = TestClient(admin_app)
resp = client.get(f"/api/v1/admin/users/{uuid.uuid4()}")
assert resp.status_code == 403
# ---------------------------------------------------------------------------
# PATCH /admin/users/{user_id}
# ---------------------------------------------------------------------------
class TestUpdateUser:
def test_update_role(self, admin_client: TestClient):
alice = _create_user(admin_client, username="alice", email="alice@example.com")
resp = admin_client.patch(
f"/api/v1/admin/users/{alice['id']}",
json={"role": "admin"},
)
assert resp.status_code == 200
assert resp.json()["role"] == "admin"
def test_update_is_active(self, admin_client: TestClient):
alice = _create_user(admin_client, username="alice", email="alice@example.com")
resp = admin_client.patch(
f"/api/v1/admin/users/{alice['id']}",
json={"is_active": False},
)
assert resp.status_code == 200
assert resp.json()["is_active"] is False
def test_update_terminal_authorized_flags(self, admin_client: TestClient):
alice = _create_user(admin_client, username="alice", email="alice@example.com")
resp = admin_client.patch(
f"/api/v1/admin/users/{alice['id']}",
json={
"is_terminal_authorized": True,
"is_server_terminal_authorized": True,
},
)
assert resp.status_code == 200
body = resp.json()
assert body["is_terminal_authorized"] is True
assert body["is_server_terminal_authorized"] is True
def test_update_unknown_id_returns_404(self, admin_client: TestClient):
resp = admin_client.patch(
f"/api/v1/admin/users/{uuid.uuid4()}",
json={"role": "admin"},
)
assert resp.status_code == 404
def test_non_admin_returns_403(self, admin_app: FastAPI):
admin_app.dependency_overrides[admin_routes_module._require_admin] = _raise_forbidden
client = TestClient(admin_app)
resp = client.patch(
f"/api/v1/admin/users/{uuid.uuid4()}",
json={"role": "admin"},
)
assert resp.status_code == 403
# ---------------------------------------------------------------------------
# DELETE /admin/users/{user_id}
# ---------------------------------------------------------------------------
class TestDeleteUser:
def test_delete_returns_200(self, admin_client: TestClient):
alice = _create_user(admin_client, username="alice", email="alice@example.com")
resp = admin_client.delete(f"/api/v1/admin/users/{alice['id']}")
assert resp.status_code == 200
assert resp.json() == {"deleted": True}
def test_delete_is_soft(self, admin_client: TestClient, tmp_auth_db: Path):
alice = _create_user(admin_client, username="alice", email="alice@example.com")
admin_client.delete(f"/api/v1/admin/users/{alice['id']}")
# The row must still exist (soft delete).
with sqlite3.connect(str(tmp_auth_db)) as db:
cursor = db.execute(
"SELECT is_active FROM users WHERE id = ?", (alice["id"],)
)
row = cursor.fetchone()
assert row is not None
assert bool(row[0]) is False
def test_delete_unknown_id_returns_404(self, admin_client: TestClient):
resp = admin_client.delete(f"/api/v1/admin/users/{uuid.uuid4()}")
assert resp.status_code == 404
def test_delete_already_inactive_returns_404(self, admin_client: TestClient):
alice = _create_user(admin_client, username="alice", email="alice@example.com")
# First delete succeeds.
first = admin_client.delete(f"/api/v1/admin/users/{alice['id']}")
assert first.status_code == 200
# Second delete on the same (now-inactive) user returns 404.
second = admin_client.delete(f"/api/v1/admin/users/{alice['id']}")
assert second.status_code == 404
def test_non_admin_returns_403(self, admin_app: FastAPI):
admin_app.dependency_overrides[admin_routes_module._require_admin] = _raise_forbidden
client = TestClient(admin_app)
resp = client.delete(f"/api/v1/admin/users/{uuid.uuid4()}")
assert resp.status_code == 403
# ---------------------------------------------------------------------------
# POST /admin/users/{user_id}/reset-password
# ---------------------------------------------------------------------------
class TestResetPassword:
def test_reset_returns_200(self, admin_client: TestClient):
alice = _create_user(admin_client, username="alice", email="alice@example.com")
resp = admin_client.post(
f"/api/v1/admin/users/{alice['id']}/reset-password",
json={"new_password": "NewSecret456!"},
)
assert resp.status_code == 200
assert resp.json() == {"reset": True}
def test_reset_revokes_sessions(
self, admin_client: TestClient, tmp_auth_db: Path
):
alice = _create_user(admin_client, username="alice", email="alice@example.com")
_insert_session(tmp_auth_db, alice["id"])
_insert_session(tmp_auth_db, alice["id"])
assert _count_active_sessions(tmp_auth_db, alice["id"]) == 2
resp = admin_client.post(
f"/api/v1/admin/users/{alice['id']}/reset-password",
json={"new_password": "NewSecret456!"},
)
assert resp.status_code == 200
assert _count_active_sessions(tmp_auth_db, alice["id"]) == 0
def test_reset_unknown_id_returns_404(self, admin_client: TestClient):
resp = admin_client.post(
f"/api/v1/admin/users/{uuid.uuid4()}/reset-password",
json={"new_password": "NewSecret456!"},
)
assert resp.status_code == 404
def test_non_admin_returns_403(self, admin_app: FastAPI):
admin_app.dependency_overrides[admin_routes_module._require_admin] = _raise_forbidden
client = TestClient(admin_app)
resp = client.post(
f"/api/v1/admin/users/{uuid.uuid4()}/reset-password",
json={"new_password": "NewSecret456!"},
)
assert resp.status_code == 403
# ---------------------------------------------------------------------------
# POST /admin/users/{user_id}/departments/{department_id}
# ---------------------------------------------------------------------------
class TestAssignDepartment:
def test_assign_returns_201(self, admin_client: TestClient):
eng = _create_department(admin_client, "Engineering")
alice = _create_user(admin_client, username="alice", email="alice@example.com")
resp = admin_client.post(
f"/api/v1/admin/users/{alice['id']}/departments/{eng['id']}"
)
assert resp.status_code == 201
assert resp.json() == {"assigned": True}
def test_assign_duplicate_returns_409(self, admin_client: TestClient):
eng = _create_department(admin_client, "Engineering")
alice = _create_user(admin_client, username="alice", email="alice@example.com")
admin_client.post(f"/api/v1/admin/users/{alice['id']}/departments/{eng['id']}")
resp = admin_client.post(
f"/api/v1/admin/users/{alice['id']}/departments/{eng['id']}"
)
assert resp.status_code == 409
def test_assign_nonexistent_department_returns_404(
self, admin_client: TestClient
):
alice = _create_user(admin_client, username="alice", email="alice@example.com")
resp = admin_client.post(
f"/api/v1/admin/users/{alice['id']}/departments/{uuid.uuid4()}"
)
assert resp.status_code == 404
def test_non_admin_returns_403(self, admin_app: FastAPI):
admin_app.dependency_overrides[admin_routes_module._require_admin] = _raise_forbidden
client = TestClient(admin_app)
resp = client.post(
f"/api/v1/admin/users/{uuid.uuid4()}/departments/{uuid.uuid4()}"
)
assert resp.status_code == 403
# ---------------------------------------------------------------------------
# DELETE /admin/users/{user_id}/departments/{department_id}
# ---------------------------------------------------------------------------
class TestRemoveDepartment:
def test_remove_returns_200(self, admin_client: TestClient):
eng = _create_department(admin_client, "Engineering")
alice = _create_user(
admin_client,
username="alice",
email="alice@example.com",
department_ids=[eng["id"]],
)
resp = admin_client.delete(
f"/api/v1/admin/users/{alice['id']}/departments/{eng['id']}"
)
assert resp.status_code == 200
assert resp.json() == {"removed": True}
def test_remove_nonexistent_assignment_returns_404(
self, admin_client: TestClient
):
eng = _create_department(admin_client, "Engineering")
alice = _create_user(admin_client, username="alice", email="alice@example.com")
resp = admin_client.delete(
f"/api/v1/admin/users/{alice['id']}/departments/{eng['id']}"
)
assert resp.status_code == 404
def test_non_admin_returns_403(self, admin_app: FastAPI):
admin_app.dependency_overrides[admin_routes_module._require_admin] = _raise_forbidden
client = TestClient(admin_app)
resp = client.delete(
f"/api/v1/admin/users/{uuid.uuid4()}/departments/{uuid.uuid4()}"
)
assert resp.status_code == 403
# ---------------------------------------------------------------------------
# GET /admin/users/{user_id}/departments
# ---------------------------------------------------------------------------
class TestListUserDepartments:
def test_list_returns_departments(self, admin_client: TestClient):
eng = _create_department(admin_client, "Engineering")
hr = _create_department(admin_client, "HR")
alice = _create_user(
admin_client,
username="alice",
email="alice@example.com",
department_ids=[eng["id"], hr["id"]],
)
resp = admin_client.get(f"/api/v1/admin/users/{alice['id']}/departments")
assert resp.status_code == 200
depts = resp.json()
assert len(depts) == 2
names = {d["name"] for d in depts}
assert names == {"Engineering", "HR"}
def test_list_returns_empty_for_user_with_no_departments(
self, admin_client: TestClient
):
alice = _create_user(admin_client, username="alice", email="alice@example.com")
resp = admin_client.get(f"/api/v1/admin/users/{alice['id']}/departments")
assert resp.status_code == 200
assert resp.json() == []
def test_non_admin_returns_403(self, admin_app: FastAPI):
admin_app.dependency_overrides[admin_routes_module._require_admin] = _raise_forbidden
client = TestClient(admin_app)
resp = client.get(f"/api/v1/admin/users/{uuid.uuid4()}/departments")
assert resp.status_code == 403
# ---------------------------------------------------------------------------
# Helpers for non-admin simulation
# ---------------------------------------------------------------------------
def _raise_forbidden() -> dict[str, Any]:
"""Dependency override that simulates a non-admin (403) response."""
raise HTTPException(status_code=403, detail="Admin permission required")