307 lines
10 KiB
Python
307 lines
10 KiB
Python
"""Tests for APIKey model."""
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
|
|
import pytest
|
|
from sqlalchemy import select, and_
|
|
|
|
from app.models.api_key import APIKey
|
|
|
|
|
|
class TestAPIKeyModel:
|
|
"""Test cases for APIKey model."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_key_create(self, async_session, test_user):
|
|
"""Test creating a new API key."""
|
|
api_key = APIKey(
|
|
id=uuid.uuid4(),
|
|
user_id=test_user.id,
|
|
engine_type="chatgpt",
|
|
encrypted_key="encrypted_test_key",
|
|
key_hint="sk-...abc",
|
|
key_source="user",
|
|
status="active",
|
|
priority=0,
|
|
)
|
|
async_session.add(api_key)
|
|
await async_session.commit()
|
|
await async_session.refresh(api_key)
|
|
|
|
assert api_key.id is not None
|
|
assert api_key.user_id == test_user.id
|
|
assert api_key.engine_type == "chatgpt"
|
|
assert api_key.encrypted_key == "encrypted_test_key"
|
|
assert api_key.key_hint == "sk-...abc"
|
|
assert api_key.key_source == "user"
|
|
assert api_key.status == "active"
|
|
assert api_key.priority == 0
|
|
assert api_key.created_at is not None
|
|
assert api_key.updated_at is not None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_key_default_values(self, async_session, test_user):
|
|
"""Test API key default values."""
|
|
api_key = APIKey(
|
|
user_id=test_user.id,
|
|
engine_type="kimi",
|
|
encrypted_key="encrypted_kimi_key",
|
|
key_hint="sk-...xyz",
|
|
)
|
|
async_session.add(api_key)
|
|
await async_session.commit()
|
|
await async_session.refresh(api_key)
|
|
|
|
assert api_key.key_source == "user"
|
|
assert api_key.status == "active"
|
|
assert api_key.priority == 0
|
|
assert api_key.last_verified_at is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_key_fields(self, async_session, test_user):
|
|
"""Test API key field validation and constraints."""
|
|
now = datetime.now(timezone.utc)
|
|
api_key = APIKey(
|
|
user_id=test_user.id,
|
|
engine_type="deepseek",
|
|
encrypted_key="encrypted_deepseek_key_data",
|
|
key_hint="sk-...def",
|
|
key_source="system",
|
|
status="active",
|
|
priority=10,
|
|
last_verified_at=now,
|
|
)
|
|
async_session.add(api_key)
|
|
await async_session.commit()
|
|
await async_session.refresh(api_key)
|
|
|
|
assert api_key.engine_type == "deepseek"
|
|
assert api_key.encrypted_key == "encrypted_deepseek_key_data"
|
|
assert api_key.key_hint == "sk-...def"
|
|
assert api_key.key_source == "system"
|
|
assert api_key.status == "active"
|
|
assert api_key.priority == 10
|
|
assert api_key.last_verified_at is not None
|
|
assert api_key.last_verified_at.replace(tzinfo=None) == now.replace(tzinfo=None)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_key_query_by_id(self, async_session, test_user):
|
|
"""Test querying API key by ID."""
|
|
key_id = uuid.uuid4()
|
|
api_key = APIKey(
|
|
id=key_id,
|
|
user_id=test_user.id,
|
|
engine_type="gemini",
|
|
encrypted_key="encrypted_gemini_key",
|
|
key_hint="AIza...123",
|
|
)
|
|
async_session.add(api_key)
|
|
await async_session.commit()
|
|
|
|
result = await async_session.execute(
|
|
select(APIKey).where(APIKey.id == key_id)
|
|
)
|
|
fetched_key = result.scalar_one()
|
|
|
|
assert fetched_key is not None
|
|
assert fetched_key.id == key_id
|
|
assert fetched_key.engine_type == "gemini"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_key_query_by_user_id(self, async_session, test_user):
|
|
"""Test querying API keys by user ID."""
|
|
key1 = APIKey(
|
|
user_id=test_user.id,
|
|
engine_type="chatgpt",
|
|
encrypted_key="encrypted_key_1",
|
|
key_hint="sk-...1",
|
|
)
|
|
key2 = APIKey(
|
|
user_id=test_user.id,
|
|
engine_type="kimi",
|
|
encrypted_key="encrypted_key_2",
|
|
key_hint="sk-...2",
|
|
)
|
|
async_session.add(key1)
|
|
async_session.add(key2)
|
|
await async_session.commit()
|
|
|
|
result = await async_session.execute(
|
|
select(APIKey).where(APIKey.user_id == test_user.id)
|
|
)
|
|
keys = result.scalars().all()
|
|
|
|
assert len(keys) == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_key_query_by_user_and_engine(self, async_session, test_user):
|
|
"""Test querying API keys by user ID and engine type."""
|
|
key1 = APIKey(
|
|
user_id=test_user.id,
|
|
engine_type="chatgpt",
|
|
encrypted_key="encrypted_chatgpt_key",
|
|
key_hint="sk-...chat",
|
|
)
|
|
key2 = APIKey(
|
|
user_id=test_user.id,
|
|
engine_type="kimi",
|
|
encrypted_key="encrypted_kimi_key",
|
|
key_hint="sk-...kimi",
|
|
)
|
|
async_session.add(key1)
|
|
async_session.add(key2)
|
|
await async_session.commit()
|
|
|
|
result = await async_session.execute(
|
|
select(APIKey).where(
|
|
and_(
|
|
APIKey.user_id == test_user.id,
|
|
APIKey.engine_type == "chatgpt"
|
|
)
|
|
)
|
|
)
|
|
keys = result.scalars().all()
|
|
|
|
assert len(keys) == 1
|
|
assert keys[0].engine_type == "chatgpt"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_key_timestamps(self, async_session, test_user):
|
|
"""Test API key created_at and updated_at timestamps."""
|
|
api_key = APIKey(
|
|
user_id=test_user.id,
|
|
engine_type="qwen",
|
|
encrypted_key="encrypted_qwen_key",
|
|
key_hint="sk-...qwen",
|
|
)
|
|
async_session.add(api_key)
|
|
await async_session.commit()
|
|
await async_session.refresh(api_key)
|
|
|
|
assert api_key.created_at is not None
|
|
assert api_key.updated_at is not None
|
|
assert isinstance(api_key.created_at, datetime)
|
|
assert isinstance(api_key.updated_at, datetime)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_key_update(self, async_session, test_user):
|
|
"""Test updating API key fields."""
|
|
api_key = APIKey(
|
|
user_id=test_user.id,
|
|
engine_type="wenxin",
|
|
encrypted_key="encrypted_wenxin_key",
|
|
key_hint="sk-...wenxin",
|
|
status="active",
|
|
priority=0,
|
|
)
|
|
async_session.add(api_key)
|
|
await async_session.commit()
|
|
|
|
api_key.status = "invalid"
|
|
api_key.priority = 5
|
|
api_key.last_verified_at = datetime.now(timezone.utc)
|
|
await async_session.commit()
|
|
await async_session.refresh(api_key)
|
|
|
|
assert api_key.status == "invalid"
|
|
assert api_key.priority == 5
|
|
assert api_key.last_verified_at is not None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_key_delete(self, async_session, test_user):
|
|
"""Test deleting an API key."""
|
|
api_key = APIKey(
|
|
user_id=test_user.id,
|
|
engine_type="doubao",
|
|
encrypted_key="encrypted_doubao_key",
|
|
key_hint="sk-...doubao",
|
|
)
|
|
async_session.add(api_key)
|
|
await async_session.commit()
|
|
key_id = api_key.id
|
|
|
|
await async_session.delete(api_key)
|
|
await async_session.commit()
|
|
|
|
result = await async_session.execute(
|
|
select(APIKey).where(APIKey.id == key_id)
|
|
)
|
|
deleted_key = result.scalar_one_or_none()
|
|
|
|
assert deleted_key is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_key_status_values(self, async_session, test_user):
|
|
"""Test different status values for API key."""
|
|
statuses = ["active", "invalid", "expired", "rate_limited", "unknown"]
|
|
created_keys = []
|
|
|
|
for i, status in enumerate(statuses):
|
|
api_key = APIKey(
|
|
user_id=test_user.id,
|
|
engine_type=f"engine_{i}",
|
|
encrypted_key=f"encrypted_key_{i}",
|
|
key_hint=f"sk-...{i}",
|
|
status=status,
|
|
)
|
|
async_session.add(api_key)
|
|
created_keys.append(api_key)
|
|
|
|
await async_session.commit()
|
|
|
|
for key, status in zip(created_keys, statuses):
|
|
assert key.status == status
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_key_priority_ordering(self, async_session, test_user):
|
|
"""Test API keys with different priorities."""
|
|
keys_data = [
|
|
{"engine_type": "engine_a", "priority": 0, "key_hint": "a..."},
|
|
{"engine_type": "engine_b", "priority": 10, "key_hint": "b..."},
|
|
{"engine_type": "engine_c", "priority": 5, "key_hint": "c..."},
|
|
]
|
|
|
|
for data in keys_data:
|
|
api_key = APIKey(
|
|
user_id=test_user.id,
|
|
engine_type=data["engine_type"],
|
|
encrypted_key=f"encrypted_{data['engine_type']}",
|
|
key_hint=data["key_hint"],
|
|
priority=data["priority"],
|
|
)
|
|
async_session.add(api_key)
|
|
|
|
await async_session.commit()
|
|
|
|
result = await async_session.execute(
|
|
select(APIKey)
|
|
.where(APIKey.user_id == test_user.id)
|
|
.order_by(APIKey.priority.desc())
|
|
)
|
|
keys = result.scalars().all()
|
|
|
|
assert keys[0].priority == 10
|
|
assert keys[1].priority == 5
|
|
assert keys[2].priority == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_api_key_user_id_index(self, async_session, test_user):
|
|
"""Test that user_id field has an index."""
|
|
for i in range(5):
|
|
api_key = APIKey(
|
|
user_id=test_user.id,
|
|
engine_type=f"engine_{i}",
|
|
encrypted_key=f"encrypted_key_{i}",
|
|
key_hint=f"hint_{i}",
|
|
)
|
|
async_session.add(api_key)
|
|
|
|
await async_session.commit()
|
|
|
|
result = await async_session.execute(
|
|
select(APIKey).where(APIKey.user_id == test_user.id)
|
|
)
|
|
keys = result.scalars().all()
|
|
|
|
assert len(keys) == 5
|