"""Integration tests for the auth REST routes (U4). These tests exercise the full HTTP stack (TestClient → AuthMiddleware → route → SessionService → SQLite) end-to-end. They complement the unit tests in ``tests/unit/test_auth.py`` by covering the new endpoints introduced by the centralized-auth feature: ``/auth/whoami`` (with refresh-token cold-start), ``/auth/sessions``, ``/auth/logout-others``, ``/auth/change-password``. Per the plan (U4 test scenarios), the suite covers: - Happy paths (login, refresh, whoami, sessions, logout-others, change-password) - Error paths (wrong password, unknown user, expired token, missing auth) - Integration (multi-device sessions, session cap eviction, password change invalidating other devices) """ from __future__ import annotations import uuid from datetime import datetime, timezone from pathlib import Path from typing import Any import aiosqlite import pytest from fastapi import FastAPI from fastapi.testclient import TestClient from agentkit.server.auth.denylist import InMemoryRecentlyRevoked from agentkit.server.auth.jwt_utils import verify_token from agentkit.server.auth.middleware import AuthMiddleware from agentkit.server.auth.models import init_auth_db from agentkit.server.auth.password import hash_password from agentkit.server.auth.session_service import SessionService, set_session_service from agentkit.server.routes import auth as auth_routes # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def jwt_secret() -> str: return "integration-test-jwt-secret-do-not-use-in-prod-32bytes" @pytest.fixture async def tmp_auth_db(tmp_path: Path) -> Path: db_path = tmp_path / "auth_integration.db" await init_auth_db(db_path) return db_path async def _insert_user( db_path: Path, *, username: str = "alice", password: str = "correct-horse-battery-staple", role: str = "member", ) -> dict[str, Any]: """Insert a user row and return the user fields + plaintext password.""" user_id = str(uuid.uuid4()) email = f"{username}@example.com" password_hash = hash_password(password) now_iso = datetime.now(timezone.utc).isoformat() async with aiosqlite.connect(str(db_path)) as db: await db.execute( "INSERT INTO users " "(id, username, email, password_hash, role, is_active, " " is_terminal_authorized, is_server_terminal_authorized, " " created_at, updated_at) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", (user_id, username, email, password_hash, role, 1, 0, 0, now_iso, now_iso), ) await db.commit() return { "id": user_id, "username": username, "email": email, "password": password, "role": role, "db_path": db_path, } @pytest.fixture async def auth_db_with_user(tmp_auth_db: Path) -> dict[str, Any]: return await _insert_user(tmp_auth_db) @pytest.fixture async def auth_db_with_admin(tmp_auth_db: Path) -> dict[str, Any]: """A second DB+user with admin role (for admin-route tests).""" return await _insert_user( tmp_auth_db, username="admin", password="admin-horse-battery-staple", role="admin", ) @pytest.fixture def auth_app(jwt_secret: str, auth_db_with_user: dict[str, Any]) -> FastAPI: app = FastAPI() app.state.jwt_secret = jwt_secret app.state.auth_db_path = str(auth_db_with_user["db_path"]) app.add_middleware(AuthMiddleware, jwt_secret=jwt_secret) app.include_router(auth_routes.router, prefix="/api/v1") app.include_router(auth_routes.admin_router, prefix="/api/v1") return app @pytest.fixture def auth_client(auth_app: FastAPI) -> TestClient: """TestClient with SessionService bound to the test DB.""" test_svc = SessionService( db_path=auth_app.state.auth_db_path, denylist=InMemoryRecentlyRevoked(), ) set_session_service(test_svc) try: yield TestClient(auth_app) finally: set_session_service(None) def _login(client: TestClient, username: str, password: str) -> dict[str, Any]: """Helper: log in and return the TokenResponse body.""" resp = client.post( "/api/v1/auth/login", json={"username": username, "password": password}, ) assert resp.status_code == 200, resp.text return resp.json() # --------------------------------------------------------------------------- # Happy paths # --------------------------------------------------------------------------- class TestWhoamiColdStart: """GET /auth/whoami with access OR refresh token (cold-start).""" def test_whoami_with_access_token_returns_user( self, auth_client: TestClient, auth_db_with_user: dict[str, Any], ): body = _login(auth_client, auth_db_with_user["username"], auth_db_with_user["password"]) resp = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": f"Bearer {body['access_token']}"}, ) assert resp.status_code == 200, resp.text data = resp.json() assert data["user"]["username"] == auth_db_with_user["username"] # Access-token call does NOT issue a new access token. assert data["access_token"] is None assert data["session_id"] is not None assert data["session"] is not None def test_whoami_with_refresh_token_issues_new_access_token( self, auth_client: TestClient, auth_db_with_user: dict[str, Any], ): """Cold-start: refresh token → fresh access token returned.""" body = _login(auth_client, auth_db_with_user["username"], auth_db_with_user["password"]) resp = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": f"Bearer {body['refresh_token']}"}, ) assert resp.status_code == 200, resp.text data = resp.json() assert data["user"]["username"] == auth_db_with_user["username"] # Cold-start issues a fresh access token. assert data["access_token"] is not None assert data["access_token"] != body["access_token"] assert data["session_id"] is not None def test_whoami_missing_bearer_returns_401(self, auth_client: TestClient): resp = auth_client.get("/api/v1/auth/whoami") assert resp.status_code == 401 def test_whoami_invalid_token_returns_401(self, auth_client: TestClient): resp = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": "Bearer not.a.valid.jwt"}, ) assert resp.status_code == 401 async def test_whoami_disabled_user_returns_401( self, auth_client: TestClient, auth_db_with_user: dict[str, Any], tmp_auth_db: Path, ): """A disabled user (is_active=0) must not pass whoami (U1 fix).""" body = _login( auth_client, auth_db_with_user["username"], auth_db_with_user["password"], ) # Disable the user directly in the DB. async with aiosqlite.connect(str(tmp_auth_db)) as db: await db.execute( "UPDATE users SET is_active = 0 WHERE id = ?", (auth_db_with_user["id"],), ) await db.commit() # whoami with the still-valid access token must now 401. resp = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": f"Bearer {body['access_token']}"}, ) assert resp.status_code == 401 assert "disabled" in resp.json()["detail"].lower() async def test_whoami_refresh_token_does_not_leak_new_refresh_token( self, auth_client: TestClient, auth_db_with_user: dict[str, Any], ): """Cold-start must NOT issue a new refresh token (U1 token-amplification fix). The response only carries ``access_token``; the client keeps using its existing refresh token. """ body = _login( auth_client, auth_db_with_user["username"], auth_db_with_user["password"], ) resp = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": f"Bearer {body['refresh_token']}"}, ) assert resp.status_code == 200, resp.text data = resp.json() assert data["access_token"] is not None # WhoamiResponse has no refresh_token field — verify it's absent. assert "refresh_token" not in data class TestWhoamiTokenHash: """GET /auth/whoami refresh-token hash verification (U7 — R9). After refresh-token rotation via ``/auth/refresh``, the old refresh token must NOT be usable for cold-start on ``/auth/whoami``. The route verifies the presented token's SHA-256 hash against the session's stored ``refresh_token_hash`` and rejects mismatches with 401 (constant-time comparison via ``hmac.compare_digest``). Note: ``create_token_pair`` does not add ``jti`` to refresh tokens, so login + refresh within the same second produce identical refresh tokens. To test the hash-mismatch path deterministically we update ``refresh_token_hash`` directly in the DB (simulating rotation to a different token). """ def test_whoami_with_valid_refresh_token_returns_200( self, auth_client: TestClient, auth_db_with_user: dict[str, Any], ): """A valid (non-rotated) refresh token passes the hash check.""" body = _login( auth_client, auth_db_with_user["username"], auth_db_with_user["password"], ) resp = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": f"Bearer {body['refresh_token']}"}, ) assert resp.status_code == 200, resp.text data = resp.json() assert data["access_token"] is not None assert data["user"]["username"] == auth_db_with_user["username"] async def test_whoami_with_rotated_refresh_token_returns_401( self, auth_client: TestClient, auth_db_with_user: dict[str, Any], tmp_auth_db: Path, ): """After rotation, the old refresh token is rejected. We simulate rotation by overwriting ``refresh_token_hash`` in the DB with a different value (the hash of a hypothetical new token). The old token's hash no longer matches and whoami returns 401. """ body = _login( auth_client, auth_db_with_user["username"], auth_db_with_user["password"], ) old_refresh = body["refresh_token"] # Decode the old token to get the session id. old_payload = verify_token( old_refresh, auth_client.app.state.jwt_secret, expected_type="refresh" ) sid = old_payload.get("sid") assert sid is not None, "refresh token must carry sid" # Simulate rotation: replace the stored hash with a different one. from agentkit.server.auth.denylist import hash_token as _hash_token new_hash = _hash_token("rotated-new-token-different-from-old") async with aiosqlite.connect(str(tmp_auth_db)) as db: await db.execute( "UPDATE auth_sessions SET refresh_token_hash = ? WHERE id = ?", (new_hash, sid), ) await db.commit() # Old refresh token must now be rejected by whoami. resp = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": f"Bearer {old_refresh}"}, ) assert resp.status_code == 401 detail = resp.json()["detail"].lower() assert "rotated" in detail or "revoked" in detail async def test_whoami_with_new_refresh_token_after_rotation_returns_200( self, auth_client: TestClient, auth_db_with_user: dict[str, Any], tmp_auth_db: Path, ): """After rotation, the NEW refresh token works on whoami. We simulate rotation by overwriting ``refresh_token_hash`` with the hash of a known new token, then present that new token. """ body = _login( auth_client, auth_db_with_user["username"], auth_db_with_user["password"], ) old_refresh = body["refresh_token"] old_payload = verify_token( old_refresh, auth_client.app.state.jwt_secret, expected_type="refresh" ) sid = old_payload.get("sid") assert sid is not None # Mint a new refresh token with a unique jti claim so it differs # from the old token (create_token_pair doesn't add jti to # refresh tokens, so we craft one directly). import jwt as _jwt new_payload = {**old_payload, "jti": str(uuid.uuid4())} new_refresh = _jwt.encode(new_payload, auth_client.app.state.jwt_secret, algorithm="HS256") if isinstance(new_refresh, bytes): new_refresh = new_refresh.decode("utf-8") # Update the stored hash to match the new token. from agentkit.server.auth.denylist import hash_token as _hash_token new_hash = _hash_token(new_refresh) async with aiosqlite.connect(str(tmp_auth_db)) as db: await db.execute( "UPDATE auth_sessions SET refresh_token_hash = ? WHERE id = ?", (new_hash, sid), ) await db.commit() # The new refresh token must pass the hash check. resp = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": f"Bearer {new_refresh}"}, ) assert resp.status_code == 200, resp.text assert resp.json()["access_token"] is not None def test_whoami_with_revoked_session_refresh_token_returns_401( self, auth_client: TestClient, auth_db_with_user: dict[str, Any], ): """A refresh token whose session was revoked is rejected. Although the session row's ``refresh_token_hash`` is unchanged by revocation, the session-revocation check (``info.revoked``) fires first and returns 401. """ body = _login( auth_client, auth_db_with_user["username"], auth_db_with_user["password"], ) # Revoke the session via DELETE /auth/sessions/{id}. sessions = auth_client.get( "/api/v1/auth/sessions", headers={"Authorization": f"Bearer {body['access_token']}"}, ).json() sid = sessions[0]["id"] del_resp = auth_client.delete( f"/api/v1/auth/sessions/{sid}", headers={"Authorization": f"Bearer {body['access_token']}"}, ) assert del_resp.status_code == 200 # Refresh token on the revoked session → 401. resp = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": f"Bearer {body['refresh_token']}"}, ) assert resp.status_code == 401 async def test_whoami_access_token_skips_hash_check( self, auth_client: TestClient, auth_db_with_user: dict[str, Any], tmp_auth_db: Path, ): """Access tokens are not subject to the refresh-token hash check. The hash check only runs when ``token_type == "refresh"``; access tokens bypass it (they have their own expiry + jti). """ body = _login( auth_client, auth_db_with_user["username"], auth_db_with_user["password"], ) # Rotate the stored hash to a different value. old_refresh = body["refresh_token"] old_payload = verify_token( old_refresh, auth_client.app.state.jwt_secret, expected_type="refresh" ) sid = old_payload.get("sid") assert sid is not None from agentkit.server.auth.denylist import hash_token as _hash_token new_hash = _hash_token("some-other-token-not-the-access-token") async with aiosqlite.connect(str(tmp_auth_db)) as db: await db.execute( "UPDATE auth_sessions SET refresh_token_hash = ? WHERE id = ?", (new_hash, sid), ) await db.commit() # The original access token should still work (not yet expired). resp = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": f"Bearer {body['access_token']}"}, ) assert resp.status_code == 200, resp.text # Access-token call does NOT issue a new access token. assert resp.json()["access_token"] is None class TestSessionsManagement: """GET /auth/sessions, DELETE /auth/sessions/{id}.""" def test_list_sessions_returns_current_user_sessions( self, auth_client: TestClient, auth_db_with_user: dict[str, Any], ): body = _login(auth_client, auth_db_with_user["username"], auth_db_with_user["password"]) resp = auth_client.get( "/api/v1/auth/sessions", headers={"Authorization": f"Bearer {body['access_token']}"}, ) assert resp.status_code == 200, resp.text sessions = resp.json() assert len(sessions) >= 1 # The current session should be marked is_current=True. assert any(s["is_current"] for s in sessions) def test_revoke_own_session( self, auth_client: TestClient, auth_db_with_user: dict[str, Any], ): body = _login(auth_client, auth_db_with_user["username"], auth_db_with_user["password"]) # List sessions to get the session id. sessions = auth_client.get( "/api/v1/auth/sessions", headers={"Authorization": f"Bearer {body['access_token']}"}, ).json() sid = sessions[0]["id"] resp = auth_client.delete( f"/api/v1/auth/sessions/{sid}", headers={"Authorization": f"Bearer {body['access_token']}"}, ) assert resp.status_code == 200, resp.text assert resp.json()["revoked"] is True async def test_revoke_other_user_session_returns_404( self, auth_client: TestClient, auth_db_with_user: dict[str, Any], tmp_auth_db: Path, ): """A user cannot revoke another user's session (404, not 403, to avoid leaking).""" # Create a second user and log in as them. other = await _login_async_create_user(auth_client, tmp_auth_db, username="bob") # Alice tries to revoke Bob's session. body = _login(auth_client, auth_db_with_user["username"], auth_db_with_user["password"]) resp = auth_client.delete( f"/api/v1/auth/sessions/{other['session_id']}", headers={"Authorization": f"Bearer {body['access_token']}"}, ) assert resp.status_code == 404 async def _login_async_create_user( client: TestClient, db_path: Path, *, username: str, ) -> dict[str, Any]: """Insert a user directly into the DB and log in via the API. Returns the login response + the session_id extracted from the JWT. Async so it can be ``await``-ed inside pytest-asyncio tests without triggering the ``asyncio.run() cannot be called from a running event loop`` error. """ from agentkit.server.auth.jwt_utils import verify_token user = await _insert_user(db_path, username=username) body = _login(client, username, user["password"]) payload = verify_token(body["access_token"], client.app.state.jwt_secret) return {**body, "session_id": payload.get("sid"), "user_id": user["id"]} class TestLogoutOthers: """POST /auth/logout-others.""" def test_logout_others_revokes_all_other_sessions( self, auth_client: TestClient, auth_db_with_user: dict[str, Any], tmp_auth_db: Path, ): """Login from 2 devices, logout-others from device A → B's session revoked.""" # Device A body_a = _login(auth_client, auth_db_with_user["username"], auth_db_with_user["password"]) # Device B (same user, different fingerprint header) body_b = _login(auth_client, auth_db_with_user["username"], auth_db_with_user["password"]) # A calls logout-others. resp = auth_client.post( "/api/v1/auth/logout-others", headers={"Authorization": f"Bearer {body_a['access_token']}"}, ) assert resp.status_code == 200, resp.text assert resp.json()["revoked_count"] >= 1 # B's session should now be revoked → whoami with B's refresh token → 401. resp_b = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": f"Bearer {body_b['refresh_token']}"}, ) assert resp_b.status_code == 401 # A's session should still be valid. resp_a = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": f"Bearer {body_a['access_token']}"}, ) assert resp_a.status_code == 200 class TestChangePassword: """POST /auth/change-password.""" def test_change_password_revokes_other_sessions( self, auth_client: TestClient, auth_db_with_user: dict[str, Any], ): """Change password → other sessions revoked, current stays alive.""" # Device A body_a = _login(auth_client, auth_db_with_user["username"], auth_db_with_user["password"]) # Device B body_b = _login(auth_client, auth_db_with_user["username"], auth_db_with_user["password"]) # A changes the password. resp = auth_client.post( "/api/v1/auth/change-password", headers={"Authorization": f"Bearer {body_a['access_token']}"}, json={ "old_password": auth_db_with_user["password"], "new_password": "new-strong-password-123", }, ) assert resp.status_code == 200, resp.text # B's session should be revoked. resp_b = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": f"Bearer {body_b['refresh_token']}"}, ) assert resp_b.status_code == 401 def test_change_password_wrong_old_returns_400( self, auth_client: TestClient, auth_db_with_user: dict[str, Any], ): body = _login(auth_client, auth_db_with_user["username"], auth_db_with_user["password"]) resp = auth_client.post( "/api/v1/auth/change-password", headers={"Authorization": f"Bearer {body['access_token']}"}, json={ "old_password": "totally-wrong", "new_password": "new-strong-password-123", }, ) assert resp.status_code == 400 def test_change_password_weak_new_returns_400( self, auth_client: TestClient, auth_db_with_user: dict[str, Any], ): body = _login(auth_client, auth_db_with_user["username"], auth_db_with_user["password"]) resp = auth_client.post( "/api/v1/auth/change-password", headers={"Authorization": f"Bearer {body['access_token']}"}, json={ "old_password": auth_db_with_user["password"], "new_password": "short", }, ) assert resp.status_code == 400 # --------------------------------------------------------------------------- # Integration: multi-device + session cap # --------------------------------------------------------------------------- class TestMultiDeviceIntegration: """Login from multiple devices → independent sessions.""" def test_multiple_logins_create_independent_sessions( self, auth_client: TestClient, auth_db_with_user: dict[str, Any], ): body_a = _login(auth_client, auth_db_with_user["username"], auth_db_with_user["password"]) body_b = _login(auth_client, auth_db_with_user["username"], auth_db_with_user["password"]) # Both tokens should work independently. resp_a = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": f"Bearer {body_a['access_token']}"}, ) resp_b = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": f"Bearer {body_b['access_token']}"}, ) assert resp_a.status_code == 200 assert resp_b.status_code == 200 # Different session ids. assert resp_a.json()["session_id"] != resp_b.json()["session_id"] def test_session_cap_evicts_oldest( self, auth_client: TestClient, auth_db_with_user: dict[str, Any], ): """The default session cap is 10; the 11th login evicts the oldest.""" tokens = [] for _ in range(11): body = _login( auth_client, auth_db_with_user["username"], auth_db_with_user["password"], ) tokens.append(body) # The first session should have been evicted (whoami → 401). resp_first = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": f"Bearer {tokens[0]['refresh_token']}"}, ) assert resp_first.status_code == 401 # The last session should still be valid. resp_last = auth_client.get( "/api/v1/auth/whoami", headers={"Authorization": f"Bearer {tokens[-1]['access_token']}"}, ) assert resp_last.status_code == 200