geo/backend/tests/test_services/test_api_key_manager.py

313 lines
11 KiB
Python

import pytest
from app.services.api_key_manager import APIKeyConfig, APIKeyManager, KeySource, KeyStatus
class TestAPIKeyConfig:
def test_config_creation(self):
config = APIKeyConfig(
engine_type="chatgpt",
key_source=KeySource.SYSTEM,
encrypted_key="ZW5jcnlwdGVk",
key_hint="sk-...abc",
status=KeyStatus.UNKNOWN,
priority=0,
)
assert config.engine_type == "chatgpt"
assert config.key_source == KeySource.SYSTEM
assert config.encrypted_key == "ZW5jcnlwdGVk"
assert config.key_hint == "sk-...abc"
assert config.status == KeyStatus.UNKNOWN
assert config.priority == 0
def test_config_default_values(self):
config = APIKeyConfig(
engine_type="chatgpt",
key_source=KeySource.USER,
encrypted_key="abc",
key_hint="***",
)
assert config.status == KeyStatus.UNKNOWN
assert config.priority == 0
assert config.last_verified_at is None
assert config.created_at is None
assert config.user_id is None
def test_key_source_enum_values(self):
assert KeySource.SYSTEM == "system"
assert KeySource.USER == "user"
assert KeySource.ENV == "env"
def test_key_status_enum_values(self):
assert KeyStatus.ACTIVE == "active"
assert KeyStatus.INVALID == "invalid"
assert KeyStatus.EXPIRED == "expired"
assert KeyStatus.RATE_LIMITED == "rate_limited"
assert KeyStatus.UNKNOWN == "unknown"
class TestAPIKeyManagerAddKey:
@pytest.fixture
def manager(self):
return APIKeyManager()
def test_add_system_key(self, manager):
config = manager.add_key("chatgpt", "sk-1234567890abcdef", source=KeySource.SYSTEM)
assert config.engine_type == "chatgpt"
assert config.key_source == KeySource.SYSTEM
assert config.encrypted_key != "sk-1234567890abcdef"
assert config.key_hint == "sk-...def"
def test_add_user_key(self, manager):
config = manager.add_key("chatgpt", "sk-user1234567890", source=KeySource.USER, user_id="user_001")
assert config.key_source == KeySource.USER
assert config.user_id == "user_001"
def test_add_multiple_keys_for_same_engine(self, manager):
manager.add_key("chatgpt", "sk-1111111111111111", source=KeySource.SYSTEM)
manager.add_key("chatgpt", "sk-2222222222222222", source=KeySource.USER, user_id="user_001")
keys = manager.list_keys("chatgpt")
assert len(keys) == 2
class TestAPIKeyManagerGetKey:
@pytest.fixture
def manager(self):
mgr = APIKeyManager()
mgr.add_key("chatgpt", "sk-system1234567890", source=KeySource.SYSTEM, priority=0)
mgr.add_key("chatgpt", "sk-user1234567890ab", source=KeySource.USER, user_id="user_001", priority=10)
return mgr
def test_get_key_decrypts_correctly(self, manager):
key = manager.get_key("chatgpt", user_id="user_001")
assert key == "sk-user1234567890ab"
def test_get_key_returns_system_key_when_no_user_id(self, manager):
key = manager.get_key("chatgpt")
assert key == "sk-system1234567890"
def test_get_key_returns_none_for_unknown_engine(self, manager):
key = manager.get_key("nonexistent")
assert key is None
def test_get_key_returns_none_for_wrong_user(self, manager):
key = manager.get_key("chatgpt", user_id="user_999")
assert key == "sk-system1234567890"
class TestAPIKeyManagerRemoveKey:
@pytest.fixture
def manager(self):
mgr = APIKeyManager()
mgr.add_key("chatgpt", "sk-1234567890abcdef", source=KeySource.SYSTEM)
return mgr
def test_remove_existing_key(self, manager):
keys = manager.list_keys("chatgpt")
hint = keys[0].key_hint
result = manager.remove_key("chatgpt", hint)
assert result is True
assert len(manager.list_keys("chatgpt")) == 0
def test_remove_nonexistent_key(self, manager):
result = manager.remove_key("chatgpt", "sk-...xyz")
assert result is False
class TestAPIKeyManagerListKeys:
@pytest.fixture
def manager(self):
mgr = APIKeyManager()
mgr.add_key("chatgpt", "sk-chatgpt12345678", source=KeySource.SYSTEM)
mgr.add_key("perplexity", "sk-perplexity12345", source=KeySource.SYSTEM)
return mgr
def test_list_keys_for_engine(self, manager):
keys = manager.list_keys("chatgpt")
assert len(keys) == 1
assert keys[0].engine_type == "chatgpt"
def test_list_all_keys(self, manager):
keys = manager.list_keys()
assert len(keys) == 2
def test_list_keys_returns_masked_hints_only(self, manager):
keys = manager.list_keys("chatgpt")
for k in keys:
assert k.encrypted_key != "sk-chatgpt12345678"
assert "..." in k.key_hint
class TestKeyEncryption:
@pytest.fixture
def manager(self):
return APIKeyManager()
def test_encrypted_key_is_not_plaintext(self, manager):
original = "sk-my-super-secret-key-123456"
config = manager.add_key("chatgpt", original, source=KeySource.SYSTEM)
assert config.encrypted_key != original
def test_encrypt_decrypt_roundtrip(self, manager):
original = "sk-roundtrip-test-key-12345"
manager.add_key("chatgpt", original, source=KeySource.SYSTEM)
decrypted = manager.get_key("chatgpt")
assert decrypted == original
def test_key_hint_masks_middle(self, manager):
config = manager.add_key("chatgpt", "sk-abcdefghijklmnop", source=KeySource.SYSTEM)
assert config.key_hint.startswith("sk-")
assert config.key_hint.endswith("nop")
assert "..." in config.key_hint
def test_short_key_hint(self, manager):
config = manager.add_key("chatgpt", "short", source=KeySource.SYSTEM)
assert config.key_hint == "***"
class TestKeyVerification:
@pytest.fixture
def manager(self):
return APIKeyManager()
@pytest.mark.asyncio
async def test_verify_valid_key(self, manager):
status = await manager.verify_key("chatgpt", "sk-valid-key-1234567890")
assert status == KeyStatus.ACTIVE
@pytest.mark.asyncio
async def test_verify_empty_key(self, manager):
status = await manager.verify_key("chatgpt", "")
assert status == KeyStatus.INVALID
@pytest.mark.asyncio
async def test_verify_short_key(self, manager):
status = await manager.verify_key("chatgpt", "short")
assert status == KeyStatus.INVALID
class TestKeyPriority:
@pytest.fixture
def manager(self):
mgr = APIKeyManager()
mgr.add_key("chatgpt", "sk-env-key-1234567890", source=KeySource.ENV, priority=0)
mgr.add_key("chatgpt", "sk-system-key-1234567", source=KeySource.SYSTEM, priority=5)
mgr.add_key("chatgpt", "sk-user-key-123456789", source=KeySource.USER, user_id="user_001", priority=10)
return mgr
def test_keys_sorted_by_priority(self, manager):
keys = manager.list_keys("chatgpt")
assert keys[0].priority == 10
assert keys[1].priority == 5
assert keys[2].priority == 0
def test_user_key_has_higher_priority(self, manager):
key = manager.get_key("chatgpt", user_id="user_001")
assert key == "sk-user-key-123456789"
def test_system_key_used_when_no_user_key(self, manager):
key = manager.get_key("chatgpt", user_id="user_999")
assert key == "sk-system-key-1234567"
class TestBestKeySelection:
@pytest.fixture
def manager(self):
mgr = APIKeyManager()
mgr.add_key("chatgpt", "sk-user-active-key-12", source=KeySource.USER, user_id="user_001", priority=10)
mgr.add_key("chatgpt", "sk-system-active-key1", source=KeySource.SYSTEM, priority=5)
mgr.add_key("chatgpt", "sk-env-active-key-1234", source=KeySource.ENV, priority=0)
return mgr
def test_get_best_key_prefers_user_key(self, manager):
key = manager.get_key("chatgpt", user_id="user_001")
assert key == "sk-user-active-key-12"
def test_get_best_key_falls_back_to_system(self, manager):
key = manager.get_key("chatgpt")
assert key == "sk-system-active-key1"
class TestKeyDegradation:
@pytest.fixture
def manager(self):
mgr = APIKeyManager()
mgr.add_key("chatgpt", "sk-user-key-123456789", source=KeySource.USER, user_id="user_001", priority=10)
mgr.add_key("chatgpt", "sk-system-key-1234567", source=KeySource.SYSTEM, priority=5)
return mgr
def test_degradation_to_system_key_when_user_key_invalid(self, manager):
keys = manager.list_keys("chatgpt")
for k in keys:
if k.key_source == KeySource.USER and k.user_id == "user_001":
k.status = KeyStatus.INVALID
key = manager.get_key("chatgpt", user_id="user_001")
assert key == "sk-system-key-1234567"
def test_degradation_to_env_key_when_system_key_rate_limited(self, manager):
manager.add_key("chatgpt", "sk-env-key-1234567890", source=KeySource.ENV, priority=0)
keys = manager.list_keys("chatgpt")
for k in keys:
if k.key_source == KeySource.SYSTEM:
k.status = KeyStatus.RATE_LIMITED
key = manager.get_key("chatgpt")
assert key == "sk-env-key-1234567890"
def test_returns_none_when_all_keys_unavailable(self, manager):
keys = manager.list_keys("chatgpt")
for k in keys:
k.status = KeyStatus.INVALID
key = manager.get_key("chatgpt")
assert key is None
class TestKeyExpiry:
@pytest.fixture
def manager(self):
mgr = APIKeyManager()
mgr.add_key("chatgpt", "sk-expired-key-1234567", source=KeySource.SYSTEM, priority=5)
mgr.add_key("chatgpt", "sk-active-key-12345678", source=KeySource.ENV, priority=0)
return mgr
def test_expired_key_skipped(self, manager):
keys = manager.list_keys("chatgpt")
for k in keys:
if k.key_source == KeySource.SYSTEM:
k.status = KeyStatus.EXPIRED
key = manager.get_key("chatgpt")
assert key == "sk-active-key-12345678"
def test_active_and_unknown_keys_are_usable(self, manager):
keys = manager.list_keys("chatgpt")
usable = [k for k in keys if k.status in (KeyStatus.ACTIVE, KeyStatus.UNKNOWN)]
assert len(usable) == 2
class TestLoadEnvKeys:
@pytest.fixture
def manager(self):
return APIKeyManager()
def test_load_env_keys_from_environment(self, manager, monkeypatch):
monkeypatch.setenv("OPENAI_API_KEY", "sk-openai-from-env-1234567890")
monkeypatch.setenv("DEEPSEEK_API_KEY", "sk-deepseek-from-env-12345678")
manager.load_env_keys()
keys = manager.list_keys()
engine_types = {k.engine_type for k in keys}
assert "chatgpt" in engine_types
assert "deepseek" in engine_types
def test_env_keys_have_env_source(self, manager, monkeypatch):
monkeypatch.setenv("OPENAI_API_KEY", "sk-openai-from-env-1234567890")
manager.load_env_keys()
keys = manager.list_keys("chatgpt")
assert len(keys) == 1
assert keys[0].key_source == KeySource.ENV
def test_env_keys_not_loaded_when_empty(self, manager, monkeypatch):
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
monkeypatch.delenv("DEEPSEEK_API_KEY", raising=False)
manager.load_env_keys()
keys = manager.list_keys()
assert len(keys) == 0