654 lines
26 KiB
Python
654 lines
26 KiB
Python
"""Unit tests for auth.models (U1 — V2 schema + backfill).
|
|
|
|
Covers:
|
|
- ``auth_sessions`` table creation (columns + indexes)
|
|
- ``auth_meta`` table creation
|
|
- ``_SCHEMA_VERSION`` constant value
|
|
- ``auth_session_row_to_dict`` field-by-field conversion
|
|
- ``_backfill_user_sessions`` one-time migration (V1 → V2)
|
|
- ``init_auth_db`` idempotency (subsequent runs are no-ops)
|
|
- ``auth_provider`` column default value
|
|
- Index presence (verified via ``PRAGMA index_list``)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import aiosqlite
|
|
import pytest
|
|
|
|
from agentkit.server.auth.models import (
|
|
AuthSessionModel,
|
|
UserSessionModel,
|
|
_SCHEMA_VERSION,
|
|
auth_session_row_to_dict,
|
|
init_auth_db,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
async def fresh_db(tmp_path: Path) -> Path:
|
|
"""A brand-new auth DB on a fresh path (no data)."""
|
|
db_path = tmp_path / "auth.db"
|
|
await init_auth_db(db_path)
|
|
return db_path
|
|
|
|
|
|
async def _insert_user(db: aiosqlite.Connection, *, user_id: str | None = None) -> str:
|
|
"""Insert a minimal user row and return its id."""
|
|
user_id = user_id or str(uuid.uuid4())
|
|
now_iso = datetime.now(timezone.utc).isoformat()
|
|
await db.execute(
|
|
"INSERT INTO users "
|
|
"(id, username, email, password_hash, role, is_active, "
|
|
" is_terminal_authorized, is_server_terminal_authorized, "
|
|
" created_at, updated_at) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
user_id,
|
|
f"user-{user_id[:8]}",
|
|
f"{user_id[:8]}@example.com",
|
|
"$2b$12$placeholder.hash.placeholder.hash.placeholder.hash",
|
|
"member",
|
|
1,
|
|
0,
|
|
0,
|
|
now_iso,
|
|
now_iso,
|
|
),
|
|
)
|
|
return user_id
|
|
|
|
|
|
async def _insert_user_session(
|
|
db: aiosqlite.Connection,
|
|
*,
|
|
user_id: str,
|
|
session_id: str | None = None,
|
|
refresh_hash: str | None = None,
|
|
device_info: str = "{}",
|
|
revoked: bool = False,
|
|
) -> str:
|
|
"""Insert a V1 ``user_sessions`` row and return its id."""
|
|
session_id = session_id or str(uuid.uuid4())
|
|
refresh_hash = refresh_hash or uuid.uuid4().hex
|
|
now_iso = datetime.now(timezone.utc).isoformat()
|
|
await db.execute(
|
|
"INSERT INTO user_sessions "
|
|
"(id, user_id, refresh_token_hash, device_info, created_at, expires_at, revoked_at) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
session_id,
|
|
user_id,
|
|
refresh_hash,
|
|
device_info,
|
|
now_iso,
|
|
now_iso,
|
|
now_iso if revoked else None,
|
|
),
|
|
)
|
|
return session_id
|
|
|
|
|
|
async def _list_index_names(db: aiosqlite.Connection, table: str) -> set[str]:
|
|
"""Return the set of index names for a table.
|
|
|
|
Sets ``row_factory`` on the connection so we can address columns by name.
|
|
PRAGMA results in aiosqlite come back as plain tuples unless a row factory
|
|
is configured.
|
|
"""
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute(f"PRAGMA index_list({table})")
|
|
rows = await cursor.fetchall()
|
|
return {row["name"] for row in rows}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _SCHEMA_VERSION
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSchemaVersion:
|
|
def test_schema_version_is_v4(self):
|
|
"""The current schema version is 4 (V4 adds skill_states table)."""
|
|
assert _SCHEMA_VERSION == 4
|
|
|
|
def test_sqlalchemy_model_table_name(self):
|
|
assert AuthSessionModel.__tablename__ == "auth_sessions"
|
|
assert UserSessionModel.__tablename__ == "user_sessions"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# init_auth_db: tables + indexes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestInitAuthDbTables:
|
|
async def test_creates_auth_sessions_table(self, fresh_db: Path):
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
cursor = await db.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='auth_sessions'"
|
|
)
|
|
row = await cursor.fetchone()
|
|
assert row is not None
|
|
|
|
async def test_creates_auth_meta_table(self, fresh_db: Path):
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
cursor = await db.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='auth_meta'"
|
|
)
|
|
row = await cursor.fetchone()
|
|
assert row is not None
|
|
|
|
async def test_creates_user_sessions_table_for_back_compat(self, fresh_db: Path):
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
cursor = await db.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='user_sessions'"
|
|
)
|
|
row = await cursor.fetchone()
|
|
assert row is not None
|
|
|
|
async def test_records_schema_version_in_auth_meta(self, fresh_db: Path):
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute("SELECT value FROM auth_meta WHERE key='schema_version'")
|
|
row = await cursor.fetchone()
|
|
assert row is not None
|
|
assert row["value"] == str(_SCHEMA_VERSION)
|
|
|
|
|
|
class TestAuthSessionsIndexes:
|
|
async def test_user_id_active_index(self, fresh_db: Path):
|
|
async with aiosqlite.connect(str(fresh_db)):
|
|
pass
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
indexes = await _list_index_names(db, "auth_sessions")
|
|
assert "idx_auth_sessions_user_id_active" in indexes
|
|
|
|
async def test_expires_at_index(self, fresh_db: Path):
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
indexes = await _list_index_names(db, "auth_sessions")
|
|
assert "idx_auth_sessions_expires_at" in indexes
|
|
|
|
async def test_auth_provider_index(self, fresh_db: Path):
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
indexes = await _list_index_names(db, "auth_sessions")
|
|
assert "idx_auth_sessions_auth_provider" in indexes
|
|
|
|
async def test_refresh_token_hash_unique_index(self, fresh_db: Path):
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
# SQLite stores column-level UNIQUE constraints as PRAGMA index_list
|
|
# entries with auto-generated names like sqlite_autoindex_<table>_1.
|
|
# The PRAGMA index_info on each autoindex reports the constrained columns.
|
|
cursor = await db.execute("PRAGMA index_list(auth_sessions)")
|
|
index_entries = await cursor.fetchall()
|
|
column_names: set[str] = set()
|
|
for entry in index_entries:
|
|
col_cursor = await db.execute(f"PRAGMA index_info({entry['name']})")
|
|
col_rows = await col_cursor.fetchall()
|
|
for col_row in col_rows:
|
|
column_names.add(col_row["name"])
|
|
assert "refresh_token_hash" in column_names
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# auth_sessions columns
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAuthSessionsColumns:
|
|
async def test_required_columns_present(self, fresh_db: Path):
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
cursor = await db.execute("PRAGMA table_info(auth_sessions)")
|
|
cols = {row[1] for row in await cursor.fetchall()}
|
|
required = {
|
|
"id",
|
|
"user_id",
|
|
"refresh_token_hash",
|
|
"device_fingerprint",
|
|
"device_label",
|
|
"ip",
|
|
"user_agent",
|
|
"auth_provider",
|
|
"created_at",
|
|
"last_active_at",
|
|
"expires_at",
|
|
"revoked",
|
|
"revoked_reason",
|
|
"previous_session_id",
|
|
}
|
|
assert required.issubset(cols)
|
|
|
|
async def test_auth_provider_default_is_local(self, fresh_db: Path):
|
|
"""Insert a row without auth_provider → column should default to 'local'."""
|
|
user_id = str(uuid.uuid4())
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
await _insert_user(db, user_id=user_id)
|
|
sid = str(uuid.uuid4())
|
|
await db.execute(
|
|
"INSERT INTO auth_sessions "
|
|
"(id, user_id, refresh_token_hash, created_at, last_active_at, expires_at, "
|
|
" revoked) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, 0)",
|
|
(
|
|
sid,
|
|
user_id,
|
|
"deadbeef",
|
|
"2026-01-01T00:00:00+00:00",
|
|
"2026-01-01T00:00:00+00:00",
|
|
"2026-12-31T00:00:00+00:00",
|
|
),
|
|
)
|
|
await db.commit()
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute("SELECT auth_provider FROM auth_sessions WHERE id=?", (sid,))
|
|
row = await cursor.fetchone()
|
|
assert row is not None
|
|
assert row["auth_provider"] == "local"
|
|
|
|
async def test_revoked_default_is_false(self, fresh_db: Path):
|
|
user_id = str(uuid.uuid4())
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
await _insert_user(db, user_id=user_id)
|
|
sid = str(uuid.uuid4())
|
|
await db.execute(
|
|
"INSERT INTO auth_sessions "
|
|
"(id, user_id, refresh_token_hash, created_at, last_active_at, expires_at) "
|
|
"VALUES (?, ?, ?, ?, ?, ?)",
|
|
(
|
|
sid,
|
|
user_id,
|
|
"beefdead",
|
|
"2026-01-01T00:00:00+00:00",
|
|
"2026-01-01T00:00:00+00:00",
|
|
"2026-12-31T00:00:00+00:00",
|
|
),
|
|
)
|
|
await db.commit()
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute("SELECT revoked FROM auth_sessions WHERE id=?", (sid,))
|
|
row = await cursor.fetchone()
|
|
assert bool(row["revoked"]) is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# auth_session_row_to_dict
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAuthSessionRowToDict:
|
|
async def test_converts_all_fields(self, fresh_db: Path):
|
|
user_id = str(uuid.uuid4())
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
await _insert_user(db, user_id=user_id)
|
|
sid = str(uuid.uuid4())
|
|
await db.execute(
|
|
"INSERT INTO auth_sessions "
|
|
"(id, user_id, refresh_token_hash, device_fingerprint, device_label, ip, "
|
|
" user_agent, auth_provider, created_at, last_active_at, expires_at, "
|
|
" revoked, revoked_reason, previous_session_id) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
(
|
|
sid,
|
|
user_id,
|
|
"abc123",
|
|
"fp-xyz",
|
|
"macOS Chrome 119",
|
|
"10.0.0.1",
|
|
"Mozilla/5.0",
|
|
"local",
|
|
"2026-01-01T00:00:00+00:00",
|
|
"2026-06-20T00:00:00+00:00",
|
|
"2027-01-01T00:00:00+00:00",
|
|
1,
|
|
"user_terminated",
|
|
"old-sid",
|
|
),
|
|
)
|
|
await db.commit()
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute("SELECT * FROM auth_sessions WHERE id=?", (sid,))
|
|
row = await cursor.fetchone()
|
|
|
|
d = auth_session_row_to_dict(row)
|
|
assert d["id"] == sid
|
|
assert d["user_id"] == user_id
|
|
assert d["device_fingerprint"] == "fp-xyz"
|
|
assert d["device_label"] == "macOS Chrome 119"
|
|
assert d["ip"] == "10.0.0.1"
|
|
assert d["user_agent"] == "Mozilla/5.0"
|
|
assert d["auth_provider"] == "local"
|
|
assert d["revoked"] is True
|
|
assert d["revoked_reason"] == "user_terminated"
|
|
assert d["previous_session_id"] == "old-sid"
|
|
|
|
async def test_normalizes_revoked_to_bool(self, fresh_db: Path):
|
|
"""DB stores 0/1; helper should return Python bool."""
|
|
user_id = str(uuid.uuid4())
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
await _insert_user(db, user_id=user_id)
|
|
sid = str(uuid.uuid4())
|
|
await db.execute(
|
|
"INSERT INTO auth_sessions "
|
|
"(id, user_id, refresh_token_hash, created_at, last_active_at, expires_at, "
|
|
" revoked) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, 0)",
|
|
(sid, user_id, "x", "t", "t", "t"),
|
|
)
|
|
await db.commit()
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute("SELECT * FROM auth_sessions WHERE id=?", (sid,))
|
|
row = await cursor.fetchone()
|
|
d = auth_session_row_to_dict(row)
|
|
assert isinstance(d["revoked"], bool)
|
|
assert d["revoked"] is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _backfill_user_sessions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestBackfillUserSessions:
|
|
async def test_backfills_non_revoked_v1_rows(self, fresh_db: Path):
|
|
"""On a fresh V1 install, all non-revoked rows are migrated to auth_sessions."""
|
|
user_id = str(uuid.uuid4())
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
await _insert_user(db, user_id=user_id)
|
|
await _insert_user_session(
|
|
db,
|
|
user_id=user_id,
|
|
refresh_hash="hash1",
|
|
device_info="{}",
|
|
)
|
|
await _insert_user_session(
|
|
db,
|
|
user_id=user_id,
|
|
refresh_hash="hash2",
|
|
device_info=json.dumps(
|
|
{
|
|
"fingerprint": "mac-tauri",
|
|
"label": "macOS Tauri 1.0",
|
|
"ip": "192.168.1.10",
|
|
"user_agent": "Tauri/1.0",
|
|
}
|
|
),
|
|
)
|
|
await _insert_user_session(
|
|
db,
|
|
user_id=user_id,
|
|
refresh_hash="hash3",
|
|
revoked=True,
|
|
)
|
|
await db.commit()
|
|
# Force a backfill by clearing the marker + dropping auth_sessions rows
|
|
# (simulating a V1 install upgrading to V2).
|
|
await db.execute("DELETE FROM auth_sessions")
|
|
await db.execute("DELETE FROM auth_meta WHERE key='backfill_user_sessions_v1_to_v2'")
|
|
await db.commit()
|
|
|
|
# Re-init to trigger the migration
|
|
await init_auth_db(fresh_db)
|
|
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute("SELECT * FROM auth_sessions ORDER BY refresh_token_hash")
|
|
rows = await cursor.fetchall()
|
|
|
|
# Only the 2 non-revoked rows should have been backfilled
|
|
assert len(rows) == 2
|
|
hashes = {row["refresh_token_hash"] for row in rows}
|
|
assert hashes == {"hash1", "hash2"}
|
|
|
|
async def test_backfill_preserves_original_id(self, fresh_db: Path):
|
|
"""Backfilled rows reuse the V1 id so legacy clients match."""
|
|
user_id = str(uuid.uuid4())
|
|
original_id = str(uuid.uuid4())
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
await _insert_user(db, user_id=user_id)
|
|
await _insert_user_session(
|
|
db,
|
|
user_id=user_id,
|
|
session_id=original_id,
|
|
refresh_hash="hash-original",
|
|
)
|
|
await db.execute("DELETE FROM auth_sessions")
|
|
await db.execute("DELETE FROM auth_meta WHERE key='backfill_user_sessions_v1_to_v2'")
|
|
await db.commit()
|
|
|
|
await init_auth_db(fresh_db)
|
|
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute(
|
|
"SELECT id FROM auth_sessions WHERE refresh_token_hash='hash-original'"
|
|
)
|
|
row = await cursor.fetchone()
|
|
assert row is not None
|
|
assert row["id"] == original_id
|
|
|
|
async def test_backfill_does_not_touch_revoked_v1_rows(self, fresh_db: Path):
|
|
user_id = str(uuid.uuid4())
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
await _insert_user(db, user_id=user_id)
|
|
await _insert_user_session(
|
|
db,
|
|
user_id=user_id,
|
|
refresh_hash="revoked-hash",
|
|
revoked=True,
|
|
)
|
|
await db.execute("DELETE FROM auth_sessions")
|
|
await db.execute("DELETE FROM auth_meta WHERE key='backfill_user_sessions_v1_to_v2'")
|
|
await db.commit()
|
|
|
|
await init_auth_db(fresh_db)
|
|
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute(
|
|
"SELECT COUNT(*) AS c FROM auth_sessions WHERE refresh_token_hash='revoked-hash'"
|
|
)
|
|
row = await cursor.fetchone()
|
|
assert row["c"] == 0
|
|
|
|
async def test_backfill_copies_device_info_fields(self, fresh_db: Path):
|
|
user_id = str(uuid.uuid4())
|
|
device_info = json.dumps(
|
|
{
|
|
"fingerprint": "win-tauri-abc",
|
|
"label": "Windows Tauri 1.0",
|
|
"ip": "10.0.0.5",
|
|
"user_agent": "Tauri/2.0",
|
|
}
|
|
)
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
await _insert_user(db, user_id=user_id)
|
|
await _insert_user_session(
|
|
db,
|
|
user_id=user_id,
|
|
refresh_hash="hash-with-device",
|
|
device_info=device_info,
|
|
)
|
|
await db.execute("DELETE FROM auth_sessions")
|
|
await db.execute("DELETE FROM auth_meta WHERE key='backfill_user_sessions_v1_to_v2'")
|
|
await db.commit()
|
|
|
|
await init_auth_db(fresh_db)
|
|
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute(
|
|
"SELECT device_fingerprint, device_label, ip, user_agent "
|
|
"FROM auth_sessions WHERE refresh_token_hash='hash-with-device'"
|
|
)
|
|
row = await cursor.fetchone()
|
|
assert row["device_fingerprint"] == "win-tauri-abc"
|
|
assert row["device_label"] == "Windows Tauri 1.0"
|
|
assert row["ip"] == "10.0.0.5"
|
|
assert row["user_agent"] == "Tauri/2.0"
|
|
|
|
async def test_backfill_handles_malformed_device_info(self, fresh_db: Path):
|
|
"""Malformed JSON in device_info → fall back to defaults."""
|
|
user_id = str(uuid.uuid4())
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
await _insert_user(db, user_id=user_id)
|
|
await _insert_user_session(
|
|
db,
|
|
user_id=user_id,
|
|
refresh_hash="bad-json",
|
|
device_info="not-json{",
|
|
)
|
|
await db.execute("DELETE FROM auth_sessions")
|
|
await db.execute("DELETE FROM auth_meta WHERE key='backfill_user_sessions_v1_to_v2'")
|
|
await db.commit()
|
|
|
|
await init_auth_db(fresh_db)
|
|
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute(
|
|
"SELECT device_fingerprint, device_label FROM auth_sessions "
|
|
"WHERE refresh_token_hash='bad-json'"
|
|
)
|
|
row = await cursor.fetchone()
|
|
assert row is not None
|
|
assert row["device_fingerprint"] == "unknown"
|
|
assert row["device_label"] == "Unknown device"
|
|
|
|
async def test_backfill_marks_done_in_auth_meta(self, fresh_db: Path):
|
|
"""After a backfill, the auth_meta marker is set so it doesn't run again."""
|
|
user_id = str(uuid.uuid4())
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
await _insert_user(db, user_id=user_id)
|
|
await _insert_user_session(db, user_id=user_id, refresh_hash="h1")
|
|
await db.execute("DELETE FROM auth_sessions")
|
|
await db.execute("DELETE FROM auth_meta WHERE key='backfill_user_sessions_v1_to_v2'")
|
|
await db.commit()
|
|
|
|
await init_auth_db(fresh_db)
|
|
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute(
|
|
"SELECT value FROM auth_meta WHERE key='backfill_user_sessions_v1_to_v2'"
|
|
)
|
|
row = await cursor.fetchone()
|
|
assert row is not None
|
|
assert row["value"] == "done"
|
|
|
|
async def test_backfill_is_idempotent(self, fresh_db: Path):
|
|
"""Running init twice does NOT duplicate auth_sessions rows."""
|
|
user_id = str(uuid.uuid4())
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
await _insert_user(db, user_id=user_id)
|
|
for i in range(3):
|
|
await _insert_user_session(db, user_id=user_id, refresh_hash=f"hash{i}")
|
|
await db.execute("DELETE FROM auth_sessions")
|
|
await db.execute("DELETE FROM auth_meta WHERE key='backfill_user_sessions_v1_to_v2'")
|
|
await db.commit()
|
|
|
|
await init_auth_db(fresh_db)
|
|
await init_auth_db(fresh_db) # second run should be a no-op
|
|
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
cursor = await db.execute("SELECT COUNT(*) AS c FROM auth_sessions")
|
|
row = await cursor.fetchone()
|
|
assert row[0] == 3
|
|
|
|
async def test_fresh_install_marks_backfill_done_without_rows(self, fresh_db: Path):
|
|
"""A fresh V2 install (no V1 rows) still marks the backfill as done."""
|
|
# The fresh_db fixture already ran init_auth_db once.
|
|
# Re-running should be a no-op (idempotent).
|
|
await init_auth_db(fresh_db)
|
|
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute(
|
|
"SELECT value FROM auth_meta WHERE key='backfill_user_sessions_v1_to_v2'"
|
|
)
|
|
row = await cursor.fetchone()
|
|
assert row is not None
|
|
assert row["value"] == "done"
|
|
|
|
async def test_backfill_preserves_expires_at(self, fresh_db: Path):
|
|
user_id = str(uuid.uuid4())
|
|
original_exp = "2027-06-20T12:34:56+00:00"
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
await _insert_user(db, user_id=user_id)
|
|
await db.execute(
|
|
"INSERT INTO user_sessions "
|
|
"(id, user_id, refresh_token_hash, device_info, created_at, expires_at) "
|
|
"VALUES (?, ?, ?, ?, ?, ?)",
|
|
(
|
|
str(uuid.uuid4()),
|
|
user_id,
|
|
"exp-hash",
|
|
"{}",
|
|
"2026-01-01T00:00:00+00:00",
|
|
original_exp,
|
|
),
|
|
)
|
|
await db.execute("DELETE FROM auth_sessions")
|
|
await db.execute("DELETE FROM auth_meta WHERE key='backfill_user_sessions_v1_to_v2'")
|
|
await db.commit()
|
|
|
|
await init_auth_db(fresh_db)
|
|
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute(
|
|
"SELECT expires_at FROM auth_sessions WHERE refresh_token_hash='exp-hash'"
|
|
)
|
|
row = await cursor.fetchone()
|
|
assert row["expires_at"] == original_exp
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Active-session query pattern (covers the cap-count and list-active paths)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestActiveSessionQueries:
|
|
async def test_query_active_sessions_for_user(self, fresh_db: Path):
|
|
"""The (user_id, revoked, expires_at) index supports the active-session query."""
|
|
user_id = str(uuid.uuid4())
|
|
future = "2027-12-31T00:00:00+00:00"
|
|
past = "2020-01-01T00:00:00+00:00"
|
|
async with aiosqlite.connect(str(fresh_db)) as db:
|
|
await _insert_user(db, user_id=user_id)
|
|
# 2 active, 1 expired, 1 revoked → 2 should match
|
|
for hash_, exp in [("a", future), ("b", future), ("c", past)]:
|
|
await db.execute(
|
|
"INSERT INTO auth_sessions "
|
|
"(id, user_id, refresh_token_hash, created_at, last_active_at, expires_at, "
|
|
" revoked) VALUES (?, ?, ?, ?, ?, ?, 0)",
|
|
(str(uuid.uuid4()), user_id, hash_, "t", "t", exp),
|
|
)
|
|
await db.execute(
|
|
"INSERT INTO auth_sessions "
|
|
"(id, user_id, refresh_token_hash, created_at, last_active_at, expires_at, "
|
|
" revoked, revoked_reason) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, 1, 'user_terminated')",
|
|
(str(uuid.uuid4()), user_id, "d", "t", "t", future),
|
|
)
|
|
await db.commit()
|
|
db.row_factory = aiosqlite.Row
|
|
cursor = await db.execute(
|
|
"SELECT refresh_token_hash FROM auth_sessions "
|
|
"WHERE user_id = ? AND revoked = 0 AND expires_at > ?",
|
|
(user_id, "2026-06-20T00:00:00+00:00"),
|
|
)
|
|
rows = await cursor.fetchall()
|
|
hashes = {row["refresh_token_hash"] for row in rows}
|
|
assert hashes == {"a", "b"}
|