276 lines
8.9 KiB
Python
276 lines
8.9 KiB
Python
"""Integration tests for the admin session-management routes (U4).
|
|
|
|
Per the plan (U4 admin test scenarios):
|
|
- ``GET /admin/users/{id}/sessions`` as admin → returns all sessions (active + revoked)
|
|
- ``GET /admin/users/{id}/sessions`` as non-admin → 403
|
|
- ``DELETE /admin/users/{id}/sessions/{sid}`` as admin → that session revoked
|
|
- ``DELETE /admin/users/{id}/sessions/{sid}`` as non-admin → 403
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import aiosqlite
|
|
import pytest
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
|
|
from agentkit.server.auth.denylist import InMemoryRecentlyRevoked
|
|
from agentkit.server.auth.middleware import AuthMiddleware
|
|
from agentkit.server.auth.models import init_auth_db
|
|
from agentkit.server.auth.password import hash_password
|
|
from agentkit.server.auth.session_service import SessionService, set_session_service
|
|
from agentkit.server.routes import auth as auth_routes
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def jwt_secret() -> str:
|
|
return "admin-integration-test-jwt-secret-32bytes!"
|
|
|
|
|
|
@pytest.fixture
|
|
async def tmp_auth_db(tmp_path: Path) -> Path:
|
|
db_path = tmp_path / "admin_auth_integration.db"
|
|
await init_auth_db(db_path)
|
|
return db_path
|
|
|
|
|
|
async def _insert_user(
|
|
db_path: Path,
|
|
*,
|
|
username: str,
|
|
password: str,
|
|
role: str = "member",
|
|
) -> dict[str, Any]:
|
|
user_id = str(uuid.uuid4())
|
|
email = f"{username}@example.com"
|
|
password_hash = hash_password(password)
|
|
now_iso = datetime.now(timezone.utc).isoformat()
|
|
async with aiosqlite.connect(str(db_path)) as db:
|
|
await db.execute(
|
|
"INSERT INTO users "
|
|
"(id, username, email, password_hash, role, is_active, "
|
|
" is_terminal_authorized, is_server_terminal_authorized, "
|
|
" created_at, updated_at) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(user_id, username, email, password_hash, role, 1, 0, 0, now_iso, now_iso),
|
|
)
|
|
await db.commit()
|
|
return {
|
|
"id": user_id,
|
|
"username": username,
|
|
"email": email,
|
|
"password": password,
|
|
"role": role,
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
async def users(tmp_auth_db: Path) -> dict[str, dict[str, Any]]:
|
|
"""Create a member and an admin in the same DB."""
|
|
member = await _insert_user(tmp_auth_db, username="alice", password="alice-pw-12345")
|
|
admin = await _insert_user(
|
|
tmp_auth_db,
|
|
username="admin",
|
|
password="admin-pw-12345",
|
|
role="admin",
|
|
)
|
|
return {"member": member, "admin": admin}
|
|
|
|
|
|
@pytest.fixture
|
|
def auth_app(jwt_secret: str, users: dict[str, dict[str, Any]], tmp_auth_db: Path) -> FastAPI:
|
|
app = FastAPI()
|
|
app.state.jwt_secret = jwt_secret
|
|
app.state.auth_db_path = str(tmp_auth_db)
|
|
app.add_middleware(AuthMiddleware, jwt_secret=jwt_secret)
|
|
app.include_router(auth_routes.router, prefix="/api/v1")
|
|
app.include_router(auth_routes.admin_router, prefix="/api/v1")
|
|
return app
|
|
|
|
|
|
@pytest.fixture
|
|
def auth_client(auth_app: FastAPI) -> TestClient:
|
|
test_svc = SessionService(
|
|
db_path=auth_app.state.auth_db_path,
|
|
denylist=InMemoryRecentlyRevoked(),
|
|
)
|
|
set_session_service(test_svc)
|
|
try:
|
|
yield TestClient(auth_app)
|
|
finally:
|
|
set_session_service(None)
|
|
|
|
|
|
def _login(client: TestClient, username: str, password: str) -> dict[str, Any]:
|
|
resp = client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": username, "password": password},
|
|
)
|
|
assert resp.status_code == 200, resp.text
|
|
return resp.json()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Admin: list user sessions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAdminListUserSessions:
|
|
"""GET /admin/users/{user_id}/sessions."""
|
|
|
|
def test_admin_can_list_user_sessions(
|
|
self,
|
|
auth_client: TestClient,
|
|
users: dict[str, dict[str, Any]],
|
|
):
|
|
# Member logs in (creates a session).
|
|
_login(auth_client, users["member"]["username"], users["member"]["password"])
|
|
# Admin logs in.
|
|
admin_body = _login(
|
|
auth_client,
|
|
users["admin"]["username"],
|
|
users["admin"]["password"],
|
|
)
|
|
member_id = users["member"]["id"]
|
|
|
|
resp = auth_client.get(
|
|
f"/api/v1/admin/users/{member_id}/sessions",
|
|
headers={"Authorization": f"Bearer {admin_body['access_token']}"},
|
|
)
|
|
assert resp.status_code == 200, resp.text
|
|
sessions = resp.json()
|
|
assert len(sessions) >= 1
|
|
# All sessions belong to the member.
|
|
assert all(s["user_id"] == member_id for s in sessions)
|
|
|
|
def test_non_admin_cannot_list_user_sessions(
|
|
self,
|
|
auth_client: TestClient,
|
|
users: dict[str, dict[str, Any]],
|
|
):
|
|
# Member logs in.
|
|
member_body = _login(
|
|
auth_client,
|
|
users["member"]["username"],
|
|
users["member"]["password"],
|
|
)
|
|
# Member tries to access admin endpoint.
|
|
resp = auth_client.get(
|
|
f"/api/v1/admin/users/{users['member']['id']}/sessions",
|
|
headers={"Authorization": f"Bearer {member_body['access_token']}"},
|
|
)
|
|
assert resp.status_code in (403, 401)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Admin: revoke user session
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAdminRevokeUserSession:
|
|
"""DELETE /admin/users/{user_id}/sessions/{session_id}."""
|
|
|
|
def test_admin_can_revoke_user_session(
|
|
self,
|
|
auth_client: TestClient,
|
|
users: dict[str, dict[str, Any]],
|
|
):
|
|
# Member logs in (creates a session).
|
|
member_body = _login(
|
|
auth_client,
|
|
users["member"]["username"],
|
|
users["member"]["password"],
|
|
)
|
|
# Get the member's session id via whoami.
|
|
whoami_resp = auth_client.get(
|
|
"/api/v1/auth/whoami",
|
|
headers={"Authorization": f"Bearer {member_body['access_token']}"},
|
|
)
|
|
assert whoami_resp.status_code == 200
|
|
member_sid = whoami_resp.json()["session_id"]
|
|
assert member_sid is not None
|
|
|
|
# Admin logs in and revokes the member's session.
|
|
admin_body = _login(
|
|
auth_client,
|
|
users["admin"]["username"],
|
|
users["admin"]["password"],
|
|
)
|
|
resp = auth_client.delete(
|
|
f"/api/v1/admin/users/{users['member']['id']}/sessions/{member_sid}",
|
|
headers={"Authorization": f"Bearer {admin_body['access_token']}"},
|
|
)
|
|
assert resp.status_code == 200, resp.text
|
|
assert resp.json()["revoked"] is True
|
|
|
|
# Member's session should now be invalid.
|
|
resp_member = auth_client.get(
|
|
"/api/v1/auth/whoami",
|
|
headers={"Authorization": f"Bearer {member_body['access_token']}"},
|
|
)
|
|
assert resp_member.status_code == 401
|
|
|
|
def test_non_admin_cannot_revoke_user_session(
|
|
self,
|
|
auth_client: TestClient,
|
|
users: dict[str, dict[str, Any]],
|
|
):
|
|
member_body = _login(
|
|
auth_client,
|
|
users["member"]["username"],
|
|
users["member"]["password"],
|
|
)
|
|
whoami_resp = auth_client.get(
|
|
"/api/v1/auth/whoami",
|
|
headers={"Authorization": f"Bearer {member_body['access_token']}"},
|
|
)
|
|
member_sid = whoami_resp.json()["session_id"]
|
|
|
|
# Member tries to use the admin endpoint.
|
|
resp = auth_client.delete(
|
|
f"/api/v1/admin/users/{users['member']['id']}/sessions/{member_sid}",
|
|
headers={"Authorization": f"Bearer {member_body['access_token']}"},
|
|
)
|
|
assert resp.status_code in (403, 401)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Admin: list all sessions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAdminListAllSessions:
|
|
"""GET /admin/sessions."""
|
|
|
|
def test_admin_can_list_all_sessions(
|
|
self,
|
|
auth_client: TestClient,
|
|
users: dict[str, dict[str, Any]],
|
|
):
|
|
# Both users log in.
|
|
_login(auth_client, users["member"]["username"], users["member"]["password"])
|
|
admin_body = _login(
|
|
auth_client,
|
|
users["admin"]["username"],
|
|
users["admin"]["password"],
|
|
)
|
|
|
|
resp = auth_client.get(
|
|
"/api/v1/admin/sessions",
|
|
headers={"Authorization": f"Bearer {admin_body['access_token']}"},
|
|
)
|
|
assert resp.status_code == 200, resp.text
|
|
sessions = resp.json()
|
|
# At least 2 sessions (member + admin).
|
|
assert len(sessions) >= 2
|