"""U15 — ProviderConfig API Key 加密迁移测试。 覆盖: - get_api_key plaintext 回退 - aget_api_key + secrets_store 解密 - aget_api_key dual-read 回退(解密失败 → plaintext) - migrate_to_secrets 完整流程 - migrate_to_secrets 幂等 - LLMConfig.from_dict 解析新字段 - LLMConfig.from_dict 缺省新字段 """ from __future__ import annotations import json from unittest.mock import AsyncMock from agentkit.channels.secrets import SecretsStore from agentkit.llm.config import LLMConfig, ProviderConfig # ---------------------------------------------------------------------- # 15: get_api_key plaintext # ---------------------------------------------------------------------- def test_get_api_key_plaintext_no_store(): """无 secrets_store 时 get_api_key 返回 plaintext。""" pconf = ProviderConfig(api_key="sk-xxx", base_url="", type="openai") assert pconf.get_api_key(None) == "sk-xxx" def test_get_api_key_plaintext_with_store_sync(): """有 secrets_store 但同步调用 — 仍返回 plaintext(async 解密不可用)。""" pconf = ProviderConfig(api_key="sk-xxx", base_url="", type="openai") # 即使传 store,同步路径无法 decrypt,回退 plaintext store = object() # 任意非 None 对象 assert pconf.get_api_key(store) == "sk-xxx" # type: ignore[arg-type] # ---------------------------------------------------------------------- # 16: aget_api_key + secrets_store 解密成功 # ---------------------------------------------------------------------- async def test_aget_api_key_decrypts_from_store(): """加密列 + secrets_store → 解密返回 key。""" store = SecretsStore(master_key=b"x" * 32) # 先在 store 里放一个 key await store.set_secret("llm:provider:openai:api_key", "sk-decrypted") # 构造一个加密的 ProviderConfig(直接用真实加密流程) pconf = ProviderConfig( api_key="", base_url="", type="openai", api_key_encrypted="", # 占位,下面重新生成 api_key_source="secrets_store", ) # 用 store 加密 "sk-decrypted" 得到真实 encrypted 字段 entry = await store.set_secret("llm:provider:openai:api_key", "sk-decrypted") pconf.api_key_encrypted = ProviderConfig._encode_secret_entry( entry, "llm:provider:openai:api_key" ) result = await pconf.aget_api_key(store) assert result == "sk-decrypted" # ---------------------------------------------------------------------- # 17: aget_api_key dual-read 回退 # ---------------------------------------------------------------------- async def test_aget_api_key_dual_read_fallback_to_plaintext(): """加密列存在但解密失败 → 回退 plaintext。""" # secrets_store 返回 None(模拟 key 不存在 / 解密失败) store = AsyncMock(spec=SecretsStore) store.get_secret = AsyncMock(return_value=None) pconf = ProviderConfig( api_key="sk-plaintext", base_url="", type="openai", api_key_encrypted=json.dumps( { "key": "llm:provider:openai:api_key", "value": "fake", "nonce": "fake", "salt": "fake", "key_id": "default", "created_at": "", "updated_at": "", } ), api_key_source="dual", ) result = await pconf.aget_api_key(store) assert result == "sk-plaintext" async def test_aget_api_key_decrypt_exception_falls_back(): """解密抛异常时回退 plaintext。""" store = AsyncMock(spec=SecretsStore) store.get_secret = AsyncMock(side_effect=RuntimeError("decrypt failed")) pconf = ProviderConfig( api_key="sk-plaintext", base_url="", type="openai", api_key_encrypted=json.dumps( { "key": "llm:provider:openai:api_key", "value": "fake", "nonce": "fake", "salt": "fake", "key_id": "default", "created_at": "", "updated_at": "", } ), api_key_source="dual", ) result = await pconf.aget_api_key(store) assert result == "sk-plaintext" async def test_aget_api_key_no_encrypted_returns_plaintext(): """无加密列时直接返回 plaintext。""" store = SecretsStore(master_key=b"x" * 32) pconf = ProviderConfig(api_key="sk-plain", base_url="", type="openai") assert await pconf.aget_api_key(store) == "sk-plain" # ---------------------------------------------------------------------- # 18: migrate_to_secrets 完整流程 # ---------------------------------------------------------------------- async def test_migrate_to_secrets_full_flow(): """plaintext → secrets_store 迁移:加密、验证、清空 plaintext。""" store = SecretsStore(master_key=b"x" * 32) pconf = ProviderConfig( api_key="sk-secret", base_url="", type="openai", ) await pconf.migrate_to_secrets(store) assert pconf.api_key == "" # plaintext 清空 assert pconf.api_key_encrypted is not None # 加密列写入 assert pconf.api_key_source == "secrets_store" # 验证:通过 aget_api_key 能读回原 key decrypted = await pconf.aget_api_key(store) assert decrypted == "sk-secret" # ---------------------------------------------------------------------- # 19: migrate_to_secrets 幂等 # ---------------------------------------------------------------------- async def test_migrate_to_secrets_idempotent(): """已迁移的 config 再次调用 migrate_to_secrets 是 no-op。""" store = SecretsStore(master_key=b"x" * 32) pconf = ProviderConfig(api_key="sk-secret", base_url="", type="openai") await pconf.migrate_to_secrets(store) encrypted_after_first = pconf.api_key_encrypted assert pconf.api_key_source == "secrets_store" # 第二次调用 — no-op await pconf.migrate_to_secrets(store) assert pconf.api_key_encrypted == encrypted_after_first assert pconf.api_key_source == "secrets_store" assert pconf.api_key == "" async def test_migrate_to_secrets_skips_empty_plaintext(): """plaintext 为空时跳过迁移。""" store = SecretsStore(master_key=b"x" * 32) pconf = ProviderConfig(api_key="", base_url="", type="openai") await pconf.migrate_to_secrets(store) assert pconf.api_key == "" assert pconf.api_key_encrypted is None assert pconf.api_key_source == "plaintext" # ---------------------------------------------------------------------- # 20-21: LLMConfig.from_dict 解析新字段 # ---------------------------------------------------------------------- def test_from_dict_parses_new_encrypted_fields(): """LLMConfig.from_dict 解析 api_key_encrypted / api_key_source。""" data = { "providers": { "openai": { "api_key": "", "base_url": "https://api.openai.com/v1", "type": "openai", "api_key_encrypted": '{"key":"k","value":"v","nonce":"n","salt":"s"}', "api_key_source": "secrets_store", }, }, } config = LLMConfig.from_dict(data) pconf = config.providers["openai"] assert pconf.api_key_encrypted == '{"key":"k","value":"v","nonce":"n","salt":"s"}' assert pconf.api_key_source == "secrets_store" def test_from_dict_defaults_new_fields_to_plaintext(): """LLMConfig.from_dict 缺省新字段时默认 plaintext 行为。""" data = { "providers": { "openai": { "api_key": "sk-test", "base_url": "https://api.openai.com/v1", "type": "openai", }, }, } config = LLMConfig.from_dict(data) pconf = config.providers["openai"] assert pconf.api_key_encrypted is None assert pconf.api_key_source == "plaintext" def test_from_dict_dual_source_during_migration(): """双写窗口:api_key_source="dual" 时两个字段都保留。""" data = { "providers": { "openai": { "api_key": "sk-plaintext", "base_url": "", "type": "openai", "api_key_encrypted": '{"key":"k","value":"v","nonce":"n","salt":"s"}', "api_key_source": "dual", }, }, } config = LLMConfig.from_dict(data) pconf = config.providers["openai"] assert pconf.api_key == "sk-plaintext" assert pconf.api_key_encrypted is not None assert pconf.api_key_source == "dual" # ---------------------------------------------------------------------- # 额外:_secret_key_for_type 命名空间 # ---------------------------------------------------------------------- def test_secret_key_namespaced_by_type(): """secret key 应按 provider type 命名空间隔离。""" pconf = ProviderConfig(api_key="", base_url="", type="anthropic") assert pconf._secret_key_for_type() == "llm:provider:anthropic:api_key" pconf2 = ProviderConfig(api_key="", base_url="", type="gemini") assert pconf2._secret_key_for_type() == "llm:provider:gemini:api_key" # ---------------------------------------------------------------------- # 额外:encode/decode SecretEntry 往返 # ---------------------------------------------------------------------- async def test_encode_decode_secret_entry_roundtrip(): """encode → decode 应保留关键字段。""" store = SecretsStore(master_key=b"x" * 32) entry = await store.set_secret("llm:provider:openai:api_key", "sk-xxx") encoded = ProviderConfig._encode_secret_entry(entry, "llm:provider:openai:api_key") decoded = ProviderConfig._decode_secret_entry(encoded) assert decoded.key == "llm:provider:openai:api_key" assert decoded.value == entry.value assert decoded.nonce == entry.nonce assert decoded.salt == entry.salt