fischer-agentkit/tests/integration/auth/test_admin_routes.py

316 lines
10 KiB
Python

"""Integration tests for the admin session-management routes (U4).
Per the plan (U4 admin test scenarios):
- ``GET /admin/users/{id}/sessions`` as admin → returns all sessions (active + revoked)
- ``GET /admin/users/{id}/sessions`` as non-admin → 403
- ``DELETE /admin/users/{id}/sessions/{sid}`` as admin → that session revoked
- ``DELETE /admin/users/{id}/sessions/{sid}`` as non-admin → 403
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import aiosqlite
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from agentkit.server.auth.denylist import InMemoryRecentlyRevoked
from agentkit.server.auth.middleware import AuthMiddleware
from agentkit.server.auth.models import init_auth_db
from agentkit.server.auth.password import hash_password
from agentkit.server.auth.session_service import SessionService, set_session_service
from agentkit.server.routes import auth as auth_routes
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def jwt_secret() -> str:
return "admin-integration-test-jwt-secret-32bytes!"
@pytest.fixture
async def tmp_auth_db(tmp_path: Path) -> Path:
db_path = tmp_path / "admin_auth_integration.db"
await init_auth_db(db_path)
return db_path
async def _insert_user(
db_path: Path,
*,
username: str,
password: str,
role: str = "member",
) -> dict[str, Any]:
user_id = str(uuid.uuid4())
email = f"{username}@example.com"
password_hash = hash_password(password)
now_iso = datetime.now(timezone.utc).isoformat()
async with aiosqlite.connect(str(db_path)) as db:
await db.execute(
"INSERT INTO users "
"(id, username, email, password_hash, role, is_active, "
" is_terminal_authorized, is_server_terminal_authorized, "
" created_at, updated_at) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(user_id, username, email, password_hash, role, 1, 0, 0, now_iso, now_iso),
)
await db.commit()
return {
"id": user_id,
"username": username,
"email": email,
"password": password,
"role": role,
}
@pytest.fixture
async def users(tmp_auth_db: Path) -> dict[str, dict[str, Any]]:
"""Create a member and an admin in the same DB."""
member = await _insert_user(tmp_auth_db, username="alice", password="alice-pw-12345")
admin = await _insert_user(
tmp_auth_db,
username="admin",
password="admin-pw-12345",
role="admin",
)
return {"member": member, "admin": admin}
@pytest.fixture
def auth_app(jwt_secret: str, users: dict[str, dict[str, Any]], tmp_auth_db: Path) -> FastAPI:
app = FastAPI()
app.state.jwt_secret = jwt_secret
app.state.auth_db_path = str(tmp_auth_db)
app.add_middleware(AuthMiddleware, jwt_secret=jwt_secret)
app.include_router(auth_routes.router, prefix="/api/v1")
app.include_router(auth_routes.admin_router, prefix="/api/v1")
return app
@pytest.fixture
def auth_client(auth_app: FastAPI) -> TestClient:
test_svc = SessionService(
db_path=auth_app.state.auth_db_path,
denylist=InMemoryRecentlyRevoked(),
)
set_session_service(test_svc)
try:
yield TestClient(auth_app)
finally:
set_session_service(None)
def _login(client: TestClient, username: str, password: str) -> dict[str, Any]:
resp = client.post(
"/api/v1/auth/login",
json={"username": username, "password": password},
)
assert resp.status_code == 200, resp.text
return resp.json()
# ---------------------------------------------------------------------------
# Admin: list user sessions
# ---------------------------------------------------------------------------
class TestAdminListUserSessions:
"""GET /admin/users/{user_id}/sessions."""
def test_admin_can_list_user_sessions(
self,
auth_client: TestClient,
users: dict[str, dict[str, Any]],
):
# Member logs in (creates a session).
_login(auth_client, users["member"]["username"], users["member"]["password"])
# Admin logs in.
admin_body = _login(
auth_client,
users["admin"]["username"],
users["admin"]["password"],
)
member_id = users["member"]["id"]
resp = auth_client.get(
f"/api/v1/admin/users/{member_id}/sessions",
headers={"Authorization": f"Bearer {admin_body['access_token']}"},
)
assert resp.status_code == 200, resp.text
sessions = resp.json()
assert len(sessions) >= 1
# All sessions belong to the member.
assert all(s["user_id"] == member_id for s in sessions)
def test_non_admin_cannot_list_user_sessions(
self,
auth_client: TestClient,
users: dict[str, dict[str, Any]],
):
# Member logs in.
member_body = _login(
auth_client,
users["member"]["username"],
users["member"]["password"],
)
# Member tries to access admin endpoint.
resp = auth_client.get(
f"/api/v1/admin/users/{users['member']['id']}/sessions",
headers={"Authorization": f"Bearer {member_body['access_token']}"},
)
assert resp.status_code in (403, 401)
def test_admin_list_unknown_user_returns_404(
self,
auth_client: TestClient,
users: dict[str, dict[str, Any]],
):
"""Admin lists sessions for a non-existent user → 404 (not 500)."""
admin_body = _login(
auth_client,
users["admin"]["username"],
users["admin"]["password"],
)
fake_id = str(uuid.uuid4())
resp = auth_client.get(
f"/api/v1/admin/users/{fake_id}/sessions",
headers={"Authorization": f"Bearer {admin_body['access_token']}"},
)
# The route 404s on unknown user — no 500 leak.
assert resp.status_code == 404
# ---------------------------------------------------------------------------
# Admin: revoke user session
# ---------------------------------------------------------------------------
class TestAdminRevokeUserSession:
"""DELETE /admin/users/{user_id}/sessions/{session_id}."""
def test_admin_can_revoke_user_session(
self,
auth_client: TestClient,
users: dict[str, dict[str, Any]],
):
# Member logs in (creates a session).
member_body = _login(
auth_client,
users["member"]["username"],
users["member"]["password"],
)
# Get the member's session id via whoami.
whoami_resp = auth_client.get(
"/api/v1/auth/whoami",
headers={"Authorization": f"Bearer {member_body['access_token']}"},
)
assert whoami_resp.status_code == 200
member_sid = whoami_resp.json()["session_id"]
assert member_sid is not None
# Admin logs in and revokes the member's session.
admin_body = _login(
auth_client,
users["admin"]["username"],
users["admin"]["password"],
)
resp = auth_client.delete(
f"/api/v1/admin/users/{users['member']['id']}/sessions/{member_sid}",
headers={"Authorization": f"Bearer {admin_body['access_token']}"},
)
assert resp.status_code == 200, resp.text
assert resp.json()["revoked"] is True
# Member's session should now be invalid.
resp_member = auth_client.get(
"/api/v1/auth/whoami",
headers={"Authorization": f"Bearer {member_body['access_token']}"},
)
assert resp_member.status_code == 401
def test_non_admin_cannot_revoke_user_session(
self,
auth_client: TestClient,
users: dict[str, dict[str, Any]],
):
member_body = _login(
auth_client,
users["member"]["username"],
users["member"]["password"],
)
whoami_resp = auth_client.get(
"/api/v1/auth/whoami",
headers={"Authorization": f"Bearer {member_body['access_token']}"},
)
member_sid = whoami_resp.json()["session_id"]
# Member tries to use the admin endpoint.
resp = auth_client.delete(
f"/api/v1/admin/users/{users['member']['id']}/sessions/{member_sid}",
headers={"Authorization": f"Bearer {member_body['access_token']}"},
)
assert resp.status_code in (403, 401)
# ---------------------------------------------------------------------------
# Admin: list all sessions
# ---------------------------------------------------------------------------
class TestAdminListAllSessions:
"""GET /admin/sessions."""
def test_admin_can_list_all_sessions(
self,
auth_client: TestClient,
users: dict[str, dict[str, Any]],
):
# Both users log in.
_login(auth_client, users["member"]["username"], users["member"]["password"])
admin_body = _login(
auth_client,
users["admin"]["username"],
users["admin"]["password"],
)
resp = auth_client.get(
"/api/v1/admin/sessions",
headers={"Authorization": f"Bearer {admin_body['access_token']}"},
)
assert resp.status_code == 200, resp.text
sessions = resp.json()
# At least 2 sessions (member + admin).
assert len(sessions) >= 2
# Sessions span both users (stronger than just count).
user_ids = {s["user_id"] for s in sessions}
assert users["member"]["id"] in user_ids
assert users["admin"]["id"] in user_ids
def test_non_admin_cannot_list_all_sessions(
self,
auth_client: TestClient,
users: dict[str, dict[str, Any]],
):
"""A member must NOT be able to call GET /admin/sessions (403/401)."""
member_body = _login(
auth_client,
users["member"]["username"],
users["member"]["password"],
)
resp = auth_client.get(
"/api/v1/admin/sessions",
headers={"Authorization": f"Bearer {member_body['access_token']}"},
)
assert resp.status_code in (403, 401)