import pytest from app.services.api_key_manager import APIKeyManager, KeySource class TestKeyCredentials: def test_key_credentials_single_key(self): from app.services.api_key_manager import KeyCredentials creds = KeyCredentials(api_key="test-api-key-123") assert creds.api_key == "test-api-key-123" assert creds.secret_key is None assert creds.to_dict() == {"api_key": "test-api-key-123"} def test_key_credentials_dual_keys(self): from app.services.api_key_manager import KeyCredentials creds = KeyCredentials(api_key="test-api-key-123", secret_key="test-secret-key-456") assert creds.api_key == "test-api-key-123" assert creds.secret_key == "test-secret-key-456" assert creds.to_dict() == {"api_key": "test-api-key-123", "secret_key": "test-secret-key-456"} class TestAPIKeyManagerDualKeys: @pytest.fixture def manager(self): return APIKeyManager() def test_add_dual_keys_dict_for_wenxin(self, manager): """测试为文心一言添加双密钥字典""" credentials = {"api_key": "baidu-api-key-xxx", "secret_key": "baidu-secret-key-yyy"} config = manager.add_key("wenxin", credentials, source=KeySource.USER) assert config.engine_type == "wenxin" assert config.key_source == KeySource.USER def test_add_dual_keys_and_retrieve(self, manager): """测试添加双密钥后能够正确获取""" credentials = {"api_key": "baidu-api-key-xxx", "secret_key": "baidu-secret-key-yyy"} manager.add_key("wenxin", credentials, source=KeySource.SYSTEM) retrieved = manager.get_key("wenxin") assert retrieved is not None assert isinstance(retrieved, dict) assert retrieved["api_key"] == "baidu-api-key-xxx" assert retrieved["secret_key"] == "baidu-secret-key-yyy" def test_get_credentials_returns_complete_credentials(self, manager): """测试获取完整凭证(包含api_key和secret_key)""" credentials = {"api_key": "baidu-api-key-xxx", "secret_key": "baidu-secret-key-yyy"} manager.add_key("wenxin", credentials, source=KeySource.SYSTEM) creds = manager.get_credentials("wenxin") assert creds is not None assert creds.api_key == "baidu-api-key-xxx" assert creds.secret_key == "baidu-secret-key-yyy" def test_get_credentials_with_single_key(self, manager): """测试获取单Key凭证(兼容现有逻辑)""" manager.add_key("chatgpt", "sk-chatgpt-key-1234567890", source=KeySource.SYSTEM) creds = manager.get_credentials("chatgpt") assert creds is not None assert creds.api_key == "sk-chatgpt-key-1234567890" assert creds.secret_key is None def test_get_credentials_returns_none_for_unknown_engine(self, manager): """测试获取未知引擎凭证返回None""" creds = manager.get_credentials("unknown_engine") assert creds is None def test_add_single_key_string_still_works(self, manager): """测试单Key字符串格式仍然正常工作""" config = manager.add_key("chatgpt", "sk-test-key-1234567890", source=KeySource.SYSTEM) assert config.engine_type == "chatgpt" key = manager.get_key("chatgpt") assert key == "sk-test-key-1234567890" def test_add_dual_keys_string_auto_wrap(self, manager): """测试单Key字符串自动包装为api_key格式""" manager.add_key("wenxin", "single-key-value", source=KeySource.SYSTEM) creds = manager.get_credentials("wenxin") assert creds is not None assert creds.api_key == "single-key-value" assert creds.secret_key is None def test_get_all_keys_returns_dict_for_dual_key_engine(self, manager): """测试获取所有Key信息时返回完整字典""" credentials = {"api_key": "api-key-xxx", "secret_key": "secret-key-yyy"} manager.add_key("wenxin", credentials, source=KeySource.SYSTEM) keys = manager.get_all_keys("wenxin") assert keys is not None assert "api_key" in keys assert "secret_key" in keys assert keys["api_key"] == "api-key-xxx" assert keys["secret_key"] == "secret-key-yyy" def test_env_mapping_includes_dual_keys(self): """测试环境变量映射支持文心一言双Key""" manager = APIKeyManager() assert "wenxin" in manager._ENV_MAPPING assert manager._ENV_MAPPING["wenxin"] == "BAIDU_QIANFAN_API_KEY" def test_env_loading_for_dual_key_engine(self, manager, monkeypatch): """测试加载环境变量时正确处理文心一言双Key""" monkeypatch.setenv("BAIDU_QIANFAN_API_KEY", "env-api-key-xxx") monkeypatch.setenv("BAIDU_SECRET_KEY", "env-secret-key-yyy") manager.load_env_keys() creds = manager.get_credentials("wenxin") assert creds is not None assert creds.api_key == "env-api-key-xxx" class TestWenxinAdapterDualKeys: def test_wenxin_adapter_gets_dual_keys_from_manager(self): """测试文心一言适配器从KeyManager获取双密钥""" from app.services.ai_engine.wenxin import WenxinAdapter from app.services.api_key_manager import APIKeyManager, KeySource manager = APIKeyManager() credentials = {"api_key": "test-api-key-123", "secret_key": "test-secret-key-456"} manager.add_key("wenxin", credentials, source=KeySource.SYSTEM) adapter = WenxinAdapter(key_manager=manager) assert adapter.api_key == "test-api-key-123" assert adapter.secret_key == "test-secret-key-456" def test_wenxin_adapter_with_single_key_fallback(self): """测试文心一言适配器单Key时的降级处理""" from app.services.ai_engine.wenxin import WenxinAdapter from app.services.api_key_manager import APIKeyManager, KeySource manager = APIKeyManager() manager.add_key("wenxin", "single-key-only", source=KeySource.SYSTEM) adapter = WenxinAdapter(key_manager=manager) assert adapter.api_key == "single-key-only" assert adapter.secret_key == "" def test_wenxin_adapter_direct_credentials_take_precedence(self): """测试直接传入的credentials优先于KeyManager""" from app.services.ai_engine.wenxin import WenxinAdapter from app.services.api_key_manager import APIKeyManager, KeySource manager = APIKeyManager() manager.add_key("wenxin", {"api_key": "manager-api", "secret_key": "manager-secret"}, source=KeySource.SYSTEM) adapter = WenxinAdapter( api_key="direct-api", secret_key="direct-secret", key_manager=manager ) assert adapter.api_key == "direct-api" assert adapter.secret_key == "direct-secret"