"""Integration tests for the admin session-management routes (U4). Per the plan (U4 admin test scenarios): - ``GET /admin/users/{id}/sessions`` as admin → returns all sessions (active + revoked) - ``GET /admin/users/{id}/sessions`` as non-admin → 403 - ``DELETE /admin/users/{id}/sessions/{sid}`` as admin → that session revoked - ``DELETE /admin/users/{id}/sessions/{sid}`` as non-admin → 403 """ from __future__ import annotations import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any import aiosqlite import pytest from fastapi import FastAPI from fastapi.testclient import TestClient from agentkit.server.auth.denylist import InMemoryRecentlyRevoked from agentkit.server.auth.middleware import AuthMiddleware from agentkit.server.auth.models import init_auth_db from agentkit.server.auth.password import hash_password from agentkit.server.auth.session_service import SessionService, set_session_service from agentkit.server.routes import auth as auth_routes # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def jwt_secret() -> str: return "admin-integration-test-jwt-secret-32bytes!" @pytest.fixture async def tmp_auth_db(tmp_path: Path) -> Path: db_path = tmp_path / "admin_auth_integration.db" await init_auth_db(db_path) return db_path async def _insert_user( db_path: Path, *, username: str, password: str, role: str = "member", ) -> dict[str, Any]: user_id = str(uuid.uuid4()) email = f"{username}@example.com" password_hash = hash_password(password) now_iso = datetime.now(timezone.utc).isoformat() async with aiosqlite.connect(str(db_path)) as db: await db.execute( "INSERT INTO users " "(id, username, email, password_hash, role, is_active, " " is_terminal_authorized, is_server_terminal_authorized, " " created_at, updated_at) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", (user_id, username, email, password_hash, role, 1, 0, 0, now_iso, now_iso), ) await db.commit() return { "id": user_id, "username": username, "email": email, "password": password, "role": role, } @pytest.fixture async def users(tmp_auth_db: Path) -> dict[str, dict[str, Any]]: """Create a member and an admin in the same DB.""" member = await _insert_user(tmp_auth_db, username="alice", password="alice-pw-12345") admin = await _insert_user( tmp_auth_db, username="admin", password="admin-pw-12345", role="admin", ) return {"member": member, "admin": admin} @pytest.fixture def auth_app(jwt_secret: str, users: dict[str, dict[str, Any]], tmp_auth_db: Path) -> FastAPI: app = FastAPI() app.state.jwt_secret = jwt_secret app.state.auth_db_path = str(tmp_auth_db) app.add_middleware(AuthMiddleware, jwt_secret=jwt_secret) app.include_router(auth_routes.router, prefix="/api/v1") app.include_router(auth_routes.admin_router, prefix="/api/v1") return app @pytest.fixture def auth_client(auth_app: FastAPI) -> TestClient: test_svc = SessionService( db_path=auth_app.state.auth_db_path, denylist=InMemoryRecentlyRevoked(), ) set_session_service(test_svc) try: yield TestClient(auth_app) finally: set_session_service(None) def _login(client: TestClient, username: str, password: str) -> dict[str, Any]: resp = client.post( "/api/v1/auth/login", json={"username": username, "password": password}, ) assert resp.status_code == 200, resp.text return resp.json() # --------------------------------------------------------------------------- # Admin: list user sessions # --------------------------------------------------------------------------- class TestAdminListUserSessions: """GET /admin/users/{user_id}/sessions.""" def test_admin_can_list_user_sessions( self, auth_client: TestClient, users: dict[str, dict[str, Any]], ): # Member logs in (creates a session). _login(auth_client, users["member"]["username"], users["member"]["password"]) # Admin logs in. admin_body = _login( auth_client, users["admin"]["username"], users["admin"]["password"], ) member_id = users["member"]["id"] resp = auth_client.get( f"/api/v1/admin/users/{member_id}/sessions", headers={"Authorization": f"Bearer {admin_body['access_token']}"}, ) assert resp.status_code == 200, resp.text sessions = resp.json() assert len(sessions) >= 1 # All sessions belong to the member. assert all(s["user_id"] == member_id for s in sessions) def test_non_admin_cannot_list_user_sessions( self, auth_client: TestClient, users: dict[str, dict[str, Any]], ): # Member logs in. member_body = _login( auth_client, users["member"]["username"], users["member"]["password"], ) # Member tries to access admin endpoint. resp = auth_client.get( f"/api/v1/admin/users/{users['member']['id']}/sessions", headers={"Authorization": f"Bearer {member_body['access_token']}"}, ) assert resp.status_code in (403, 401) def test_admin_list_unknown_user_returns_404( self, auth_client: TestClient, users: dict[str, dict[str, Any]], ): """Admin lists sessions for a non-existent user → 404 (not 500).""" admin_body = _login( auth_client, users["admin"]["username"], users["admin"]["password"], ) fake_id = str(uuid.uuid4()) resp = auth_client.get( f"/api/v1/admin/users/{fake_id}/sessions", headers={"Authorization": f"Bearer {admin_body['access_token']}"}, ) # The route 404s on unknown user — no 500 leak. assert resp.status_code == 404 # --------------------------------------------------------------------------- # Admin: revoke user session # --------------------------------------------------------------------------- class TestAdminRevokeUserSession: """DELETE /admin/users/{user_id}/sessions/{session_id}.""" def test_admin_can_revoke_user_session( self, auth_client: TestClient, users: dict[str, dict[str, Any]], ): # Member logs in (creates a session). member_body = _login( auth_client, users["member"]["username"], users["member"]["password"], ) # Get the member's session id via whoami. whoami_resp = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": f"Bearer {member_body['access_token']}"}, ) assert whoami_resp.status_code == 200 member_sid = whoami_resp.json()["session_id"] assert member_sid is not None # Admin logs in and revokes the member's session. admin_body = _login( auth_client, users["admin"]["username"], users["admin"]["password"], ) resp = auth_client.delete( f"/api/v1/admin/users/{users['member']['id']}/sessions/{member_sid}", headers={"Authorization": f"Bearer {admin_body['access_token']}"}, ) assert resp.status_code == 200, resp.text assert resp.json()["revoked"] is True # Member's session should now be invalid. resp_member = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": f"Bearer {member_body['access_token']}"}, ) assert resp_member.status_code == 401 def test_non_admin_cannot_revoke_user_session( self, auth_client: TestClient, users: dict[str, dict[str, Any]], ): member_body = _login( auth_client, users["member"]["username"], users["member"]["password"], ) whoami_resp = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": f"Bearer {member_body['access_token']}"}, ) member_sid = whoami_resp.json()["session_id"] # Member tries to use the admin endpoint. resp = auth_client.delete( f"/api/v1/admin/users/{users['member']['id']}/sessions/{member_sid}", headers={"Authorization": f"Bearer {member_body['access_token']}"}, ) assert resp.status_code in (403, 401) # --------------------------------------------------------------------------- # Admin: list all sessions # --------------------------------------------------------------------------- class TestAdminListAllSessions: """GET /admin/sessions.""" def test_admin_can_list_all_sessions( self, auth_client: TestClient, users: dict[str, dict[str, Any]], ): # Both users log in. _login(auth_client, users["member"]["username"], users["member"]["password"]) admin_body = _login( auth_client, users["admin"]["username"], users["admin"]["password"], ) resp = auth_client.get( "/api/v1/admin/sessions", headers={"Authorization": f"Bearer {admin_body['access_token']}"}, ) assert resp.status_code == 200, resp.text sessions = resp.json() # At least 2 sessions (member + admin). assert len(sessions) >= 2 # Sessions span both users (stronger than just count). user_ids = {s["user_id"] for s in sessions} assert users["member"]["id"] in user_ids assert users["admin"]["id"] in user_ids def test_non_admin_cannot_list_all_sessions( self, auth_client: TestClient, users: dict[str, dict[str, Any]], ): """A member must NOT be able to call GET /admin/sessions (403/401).""" member_body = _login( auth_client, users["member"]["username"], users["member"]["password"], ) resp = auth_client.get( "/api/v1/admin/sessions", headers={"Authorization": f"Bearer {member_body['access_token']}"}, ) assert resp.status_code in (403, 401)