280 lines
9.4 KiB
Python
280 lines
9.4 KiB
Python
"""U15 — ProviderConfig API Key 加密迁移测试。
|
||
|
||
覆盖:
|
||
- get_api_key plaintext 回退
|
||
- aget_api_key + secrets_store 解密
|
||
- aget_api_key dual-read 回退(解密失败 → plaintext)
|
||
- migrate_to_secrets 完整流程
|
||
- migrate_to_secrets 幂等
|
||
- LLMConfig.from_dict 解析新字段
|
||
- LLMConfig.from_dict 缺省新字段
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
from unittest.mock import AsyncMock
|
||
|
||
from agentkit.channels.secrets import SecretsStore
|
||
from agentkit.llm.config import LLMConfig, ProviderConfig
|
||
|
||
|
||
# ----------------------------------------------------------------------
|
||
# 15: get_api_key plaintext
|
||
# ----------------------------------------------------------------------
|
||
|
||
|
||
def test_get_api_key_plaintext_no_store():
|
||
"""get_api_key 返回 plaintext(同步入口,不处理加密列)。"""
|
||
pconf = ProviderConfig(api_key="sk-xxx", base_url="", type="openai")
|
||
assert pconf.get_api_key() == "sk-xxx"
|
||
|
||
|
||
# ----------------------------------------------------------------------
|
||
# 16: aget_api_key + secrets_store 解密成功
|
||
# ----------------------------------------------------------------------
|
||
|
||
|
||
async def test_aget_api_key_decrypts_from_store():
|
||
"""加密列 + secrets_store → 解密返回 key。"""
|
||
store = SecretsStore(master_key=b"x" * 32)
|
||
# 先在 store 里放一个 key
|
||
await store.set_secret("llm:provider:openai:api_key", "sk-decrypted")
|
||
# 构造一个加密的 ProviderConfig(直接用真实加密流程)
|
||
pconf = ProviderConfig(
|
||
api_key="",
|
||
base_url="",
|
||
type="openai",
|
||
api_key_encrypted="", # 占位,下面重新生成
|
||
api_key_source="secrets_store",
|
||
)
|
||
# 用 store 加密 "sk-decrypted" 得到真实 encrypted 字段
|
||
entry = await store.set_secret("llm:provider:openai:api_key", "sk-decrypted")
|
||
pconf.api_key_encrypted = ProviderConfig._encode_secret_entry(
|
||
entry, "llm:provider:openai:api_key"
|
||
)
|
||
|
||
result = await pconf.aget_api_key(store)
|
||
assert result == "sk-decrypted"
|
||
|
||
|
||
# ----------------------------------------------------------------------
|
||
# 17: aget_api_key dual-read 回退
|
||
# ----------------------------------------------------------------------
|
||
|
||
|
||
async def test_aget_api_key_dual_read_fallback_to_plaintext():
|
||
"""加密列存在但解密失败 → 回退 plaintext。"""
|
||
# secrets_store 返回 None(模拟 key 不存在 / 解密失败)
|
||
store = AsyncMock(spec=SecretsStore)
|
||
store.get_secret = AsyncMock(return_value=None)
|
||
|
||
pconf = ProviderConfig(
|
||
api_key="sk-plaintext",
|
||
base_url="",
|
||
type="openai",
|
||
api_key_encrypted=json.dumps(
|
||
{
|
||
"key": "llm:provider:openai:api_key",
|
||
"value": "fake",
|
||
"nonce": "fake",
|
||
"salt": "fake",
|
||
"key_id": "default",
|
||
"created_at": "",
|
||
"updated_at": "",
|
||
}
|
||
),
|
||
api_key_source="dual",
|
||
)
|
||
|
||
result = await pconf.aget_api_key(store)
|
||
assert result == "sk-plaintext"
|
||
|
||
|
||
async def test_aget_api_key_decrypt_exception_falls_back():
|
||
"""解密抛异常时回退 plaintext。"""
|
||
store = AsyncMock(spec=SecretsStore)
|
||
store.get_secret = AsyncMock(side_effect=RuntimeError("decrypt failed"))
|
||
|
||
pconf = ProviderConfig(
|
||
api_key="sk-plaintext",
|
||
base_url="",
|
||
type="openai",
|
||
api_key_encrypted=json.dumps(
|
||
{
|
||
"key": "llm:provider:openai:api_key",
|
||
"value": "fake",
|
||
"nonce": "fake",
|
||
"salt": "fake",
|
||
"key_id": "default",
|
||
"created_at": "",
|
||
"updated_at": "",
|
||
}
|
||
),
|
||
api_key_source="dual",
|
||
)
|
||
|
||
result = await pconf.aget_api_key(store)
|
||
assert result == "sk-plaintext"
|
||
|
||
|
||
async def test_aget_api_key_no_encrypted_returns_plaintext():
|
||
"""无加密列时直接返回 plaintext。"""
|
||
store = SecretsStore(master_key=b"x" * 32)
|
||
pconf = ProviderConfig(api_key="sk-plain", base_url="", type="openai")
|
||
assert await pconf.aget_api_key(store) == "sk-plain"
|
||
|
||
|
||
# ----------------------------------------------------------------------
|
||
# 18: migrate_to_secrets 完整流程
|
||
# ----------------------------------------------------------------------
|
||
|
||
|
||
async def test_migrate_to_secrets_full_flow():
|
||
"""plaintext → secrets_store 迁移:加密、验证、清空 plaintext。"""
|
||
store = SecretsStore(master_key=b"x" * 32)
|
||
pconf = ProviderConfig(
|
||
api_key="sk-secret",
|
||
base_url="",
|
||
type="openai",
|
||
)
|
||
|
||
await pconf.migrate_to_secrets(store)
|
||
|
||
assert pconf.api_key == "" # plaintext 清空
|
||
assert pconf.api_key_encrypted is not None # 加密列写入
|
||
assert pconf.api_key_source == "secrets_store"
|
||
|
||
# 验证:通过 aget_api_key 能读回原 key
|
||
decrypted = await pconf.aget_api_key(store)
|
||
assert decrypted == "sk-secret"
|
||
|
||
|
||
# ----------------------------------------------------------------------
|
||
# 19: migrate_to_secrets 幂等
|
||
# ----------------------------------------------------------------------
|
||
|
||
|
||
async def test_migrate_to_secrets_idempotent():
|
||
"""已迁移的 config 再次调用 migrate_to_secrets 是 no-op。"""
|
||
store = SecretsStore(master_key=b"x" * 32)
|
||
pconf = ProviderConfig(api_key="sk-secret", base_url="", type="openai")
|
||
|
||
await pconf.migrate_to_secrets(store)
|
||
encrypted_after_first = pconf.api_key_encrypted
|
||
assert pconf.api_key_source == "secrets_store"
|
||
|
||
# 第二次调用 — no-op
|
||
await pconf.migrate_to_secrets(store)
|
||
assert pconf.api_key_encrypted == encrypted_after_first
|
||
assert pconf.api_key_source == "secrets_store"
|
||
assert pconf.api_key == ""
|
||
|
||
|
||
async def test_migrate_to_secrets_skips_empty_plaintext():
|
||
"""plaintext 为空时跳过迁移。"""
|
||
store = SecretsStore(master_key=b"x" * 32)
|
||
pconf = ProviderConfig(api_key="", base_url="", type="openai")
|
||
|
||
await pconf.migrate_to_secrets(store)
|
||
|
||
assert pconf.api_key == ""
|
||
assert pconf.api_key_encrypted is None
|
||
assert pconf.api_key_source == "plaintext"
|
||
|
||
|
||
# ----------------------------------------------------------------------
|
||
# 20-21: LLMConfig.from_dict 解析新字段
|
||
# ----------------------------------------------------------------------
|
||
|
||
|
||
def test_from_dict_parses_new_encrypted_fields():
|
||
"""LLMConfig.from_dict 解析 api_key_encrypted / api_key_source。"""
|
||
data = {
|
||
"providers": {
|
||
"openai": {
|
||
"api_key": "",
|
||
"base_url": "https://api.openai.com/v1",
|
||
"type": "openai",
|
||
"api_key_encrypted": '{"key":"k","value":"v","nonce":"n","salt":"s"}',
|
||
"api_key_source": "secrets_store",
|
||
},
|
||
},
|
||
}
|
||
|
||
config = LLMConfig.from_dict(data)
|
||
pconf = config.providers["openai"]
|
||
assert pconf.api_key_encrypted == '{"key":"k","value":"v","nonce":"n","salt":"s"}'
|
||
assert pconf.api_key_source == "secrets_store"
|
||
|
||
|
||
def test_from_dict_defaults_new_fields_to_plaintext():
|
||
"""LLMConfig.from_dict 缺省新字段时默认 plaintext 行为。"""
|
||
data = {
|
||
"providers": {
|
||
"openai": {
|
||
"api_key": "sk-test",
|
||
"base_url": "https://api.openai.com/v1",
|
||
"type": "openai",
|
||
},
|
||
},
|
||
}
|
||
|
||
config = LLMConfig.from_dict(data)
|
||
pconf = config.providers["openai"]
|
||
assert pconf.api_key_encrypted is None
|
||
assert pconf.api_key_source == "plaintext"
|
||
|
||
|
||
def test_from_dict_dual_source_during_migration():
|
||
"""双写窗口:api_key_source="dual" 时两个字段都保留。"""
|
||
data = {
|
||
"providers": {
|
||
"openai": {
|
||
"api_key": "sk-plaintext",
|
||
"base_url": "",
|
||
"type": "openai",
|
||
"api_key_encrypted": '{"key":"k","value":"v","nonce":"n","salt":"s"}',
|
||
"api_key_source": "dual",
|
||
},
|
||
},
|
||
}
|
||
|
||
config = LLMConfig.from_dict(data)
|
||
pconf = config.providers["openai"]
|
||
assert pconf.api_key == "sk-plaintext"
|
||
assert pconf.api_key_encrypted is not None
|
||
assert pconf.api_key_source == "dual"
|
||
|
||
|
||
# ----------------------------------------------------------------------
|
||
# 额外:_secret_key_for_type 命名空间
|
||
# ----------------------------------------------------------------------
|
||
|
||
|
||
def test_secret_key_namespaced_by_type():
|
||
"""secret key 应按 provider type 命名空间隔离。"""
|
||
pconf = ProviderConfig(api_key="", base_url="", type="anthropic")
|
||
assert pconf._secret_key_for_type() == "llm:provider:anthropic:api_key"
|
||
|
||
pconf2 = ProviderConfig(api_key="", base_url="", type="gemini")
|
||
assert pconf2._secret_key_for_type() == "llm:provider:gemini:api_key"
|
||
|
||
|
||
# ----------------------------------------------------------------------
|
||
# 额外:encode/decode SecretEntry 往返
|
||
# ----------------------------------------------------------------------
|
||
|
||
|
||
async def test_encode_decode_secret_entry_roundtrip():
|
||
"""encode → decode 应保留关键字段。"""
|
||
store = SecretsStore(master_key=b"x" * 32)
|
||
entry = await store.set_secret("llm:provider:openai:api_key", "sk-xxx")
|
||
encoded = ProviderConfig._encode_secret_entry(entry, "llm:provider:openai:api_key")
|
||
decoded = ProviderConfig._decode_secret_entry(encoded)
|
||
|
||
assert decoded.key == "llm:provider:openai:api_key"
|
||
assert decoded.value == entry.value
|
||
assert decoded.nonce == entry.nonce
|
||
assert decoded.salt == entry.salt
|