fischer-agentkit/tests/unit/channels/test_secrets.py

246 lines
9.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""SecretsStore 加密凭证存储测试 (U10 / KTD8)。
覆盖场景:
- secrets 写入后加密存储(非明文)
- secrets 读取时解密
- 不同 master key / 不同次加密产生不同密文
- 错误 master key 解密失败
- set/get/delete/list CRUD 操作
- 生产环境启动 guard
"""
from __future__ import annotations
import base64
import os
import pytest
from cryptography.exceptions import InvalidTag
from agentkit.channels.secrets import (
KEY_SIZE,
SecretsStore,
assert_production_master_key,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def master_key() -> bytes:
"""确定性的测试 master key32 字节)。"""
return b"\x01" * KEY_SIZE
@pytest.fixture
def store(master_key: bytes) -> SecretsStore:
"""使用确定性 master key 的 SecretsStore。"""
return SecretsStore(master_key=master_key)
@pytest.fixture(autouse=True)
def _clean_production_env(monkeypatch):
"""确保测试不在生产模式下运行(移除 AGENTKIT_ENV=production"""
monkeypatch.delenv("AGENTKIT_ENV", raising=False)
monkeypatch.delenv("AGENTKIT_MASTER_KEY", raising=False)
# ---------------------------------------------------------------------------
# 加密 / 解密往返
# ---------------------------------------------------------------------------
class TestEncryptDecrypt:
"""加密与解密核心行为。"""
def test_roundtrip_returns_plaintext(self, store: SecretsStore):
"""加密后解密应返回原始明文。"""
plaintext = "feishu-app-secret-12345"
entry = store.encrypt(plaintext)
assert store.decrypt(entry) == plaintext
def test_encrypted_value_is_not_plaintext(self, store: SecretsStore):
"""写入后密文不应包含明文(非明文存储)。"""
plaintext = "super-secret-token"
entry = store.encrypt(plaintext)
# 密文 base64 不应包含明文
assert plaintext not in entry.value
assert plaintext not in entry.nonce
assert plaintext not in entry.salt
# 解码后的密文也不应包含明文字节
ciphertext_bytes = base64.b64decode(entry.value)
assert plaintext.encode() not in ciphertext_bytes
def test_each_encrypt_produces_different_ciphertext(self, store: SecretsStore):
"""同一明文每次加密应产生不同密文(随机 nonce+salt"""
plaintext = "same-secret"
entry1 = store.encrypt(plaintext)
entry2 = store.encrypt(plaintext)
assert entry1.value != entry2.value
assert entry1.nonce != entry2.nonce
assert entry1.salt != entry2.salt
# 两者都能正确解密
assert store.decrypt(entry1) == plaintext
assert store.decrypt(entry2) == plaintext
def test_different_master_keys_produce_different_ciphertext(self):
"""不同 master key 派生不同 per-row 密钥。"""
plaintext = "shared-secret"
store_a = SecretsStore(master_key=b"\x01" * KEY_SIZE)
store_b = SecretsStore(master_key=b"\x02" * KEY_SIZE)
entry_a = store_a.encrypt(plaintext)
# store_b 无法解密 store_a 加密的条目
with pytest.raises(InvalidTag):
store_b.decrypt(entry_a)
# store_a 自身可解密
assert store_a.decrypt(entry_a) == plaintext
def test_decrypt_with_wrong_key_fails(self, store: SecretsStore):
"""错误 master key 解密应抛出 InvalidTag。"""
entry = store.encrypt("secret-data")
wrong_store = SecretsStore(master_key=b"\xab" * KEY_SIZE)
with pytest.raises(InvalidTag):
wrong_store.decrypt(entry)
def test_entry_has_correct_metadata(self, store: SecretsStore):
"""加密条目应包含 nonce、salt、key_id 等元数据。"""
entry = store.encrypt("data")
assert entry.key == ""
assert entry.nonce
assert entry.salt
assert entry.key_id == "default"
# nonce 解码后为 12 字节
assert len(base64.b64decode(entry.nonce)) == 12
# salt 解码后为 16 字节
assert len(base64.b64decode(entry.salt)) == 16
# ---------------------------------------------------------------------------
# CRUD 异步操作
# ---------------------------------------------------------------------------
class TestSecretsCrud:
"""set/get/delete/list 异步操作。"""
async def test_set_and_get_secret(self, store: SecretsStore):
"""写入后读取应返回明文。"""
await store.set_secret("feishu:app_id", "cli_xxx")
assert await store.get_secret("feishu:app_id") == "cli_xxx"
async def test_get_nonexistent_secret_returns_none(self, store: SecretsStore):
"""读取不存在的 key 返回 None。"""
assert await store.get_secret("missing") is None
async def test_set_overwrites_existing(self, store: SecretsStore):
"""同名 key 写入应覆盖旧值。"""
await store.set_secret("k", "old")
await store.set_secret("k", "new")
assert await store.get_secret("k") == "new"
async def test_delete_secret(self, store: SecretsStore):
"""删除凭证后不可再读取。"""
await store.set_secret("k", "v")
assert await store.delete_secret("k") is True
assert await store.get_secret("k") is None
async def test_delete_nonexistent_returns_false(self, store: SecretsStore):
"""删除不存在的 key 返回 False。"""
assert await store.delete_secret("missing") is False
async def test_list_keys(self, store: SecretsStore):
"""列出全部凭证键。"""
await store.set_secret("feishu:a", "1")
await store.set_secret("feishu:b", "2")
await store.set_secret("dingtalk:c", "3")
keys = await store.list_keys()
assert set(keys) == {"feishu:a", "feishu:b", "dingtalk:c"}
async def test_list_keys_with_prefix(self, store: SecretsStore):
"""按前缀过滤凭证键。"""
await store.set_secret("feishu:a", "1")
await store.set_secret("feishu:b", "2")
await store.set_secret("dingtalk:c", "3")
keys = await store.list_keys(prefix="feishu:")
assert set(keys) == {"feishu:a", "feishu:b"}
async def test_stored_value_is_encrypted_not_plaintext(self, store: SecretsStore):
"""写入后内部存储的 value 字段应为密文,非明文。"""
plaintext = "plaintext-token"
await store.set_secret("k", plaintext)
# 直接检查内存中的 entry.value确认非明文
entry = store._store["k"]
assert plaintext not in entry.value
assert plaintext not in base64.b64decode(entry.value).decode(errors="ignore")
# ---------------------------------------------------------------------------
# 生产环境 guard
# ---------------------------------------------------------------------------
class TestProductionGuard:
"""生产环境 master key 启动 guard。"""
def test_dev_mode_allows_env_source(self, master_key: bytes, monkeypatch):
"""开发模式下 env 来源 master key 允许构造。"""
monkeypatch.delenv("AGENTKIT_ENV", raising=False)
# 不应抛异常
store = SecretsStore(master_key=master_key, key_source="env")
assert store is not None
def test_production_mode_rejects_env_source(self, master_key: bytes, monkeypatch):
"""生产模式下 env 来源 master key 应拒绝启动。"""
monkeypatch.setenv("AGENTKIT_ENV", "production")
with pytest.raises(RuntimeError, match="云 KMS"):
SecretsStore(master_key=master_key, key_source="env")
def test_production_mode_rejects_missing_key(self, monkeypatch):
"""生产模式下无 master key 应拒绝启动。"""
monkeypatch.setenv("AGENTKIT_ENV", "production")
with pytest.raises(RuntimeError, match="云 KMS"):
SecretsStore(master_key=None, key_source="env")
def test_production_mode_allows_kms_source(self, master_key: bytes, monkeypatch):
"""生产模式下 KMS 来源 master key 允许构造。"""
monkeypatch.setenv("AGENTKIT_ENV", "production")
store = SecretsStore(master_key=master_key, key_source="kms")
assert store is not None
def test_assert_production_master_key_helper(self, master_key: bytes, monkeypatch):
"""直接测试 guard 辅助函数。"""
monkeypatch.setenv("AGENTKIT_ENV", "production")
# env 来源拒绝
with pytest.raises(RuntimeError):
assert_production_master_key(master_key, source="env")
# kms 来源通过
assert_production_master_key(master_key, source="kms")
# 空密钥拒绝
with pytest.raises(RuntimeError):
assert_production_master_key(None, source="kms")
# ---------------------------------------------------------------------------
# 环境变量加载 fallback
# ---------------------------------------------------------------------------
class TestEnvLoading:
"""开发环境从环境变量加载 master key。"""
def test_loads_master_key_from_env(self, monkeypatch):
"""AGENTKIT_MASTER_KEY 环境变量应被加载。"""
key = os.urandom(KEY_SIZE)
monkeypatch.setenv("AGENTKIT_MASTER_KEY", base64.b64encode(key).decode())
store = SecretsStore() # 不显式传入 master_key
assert store._master_key == key
def test_ephemeral_key_when_no_env(self, monkeypatch):
"""无环境变量时生成临时 key开发 fallback"""
monkeypatch.delenv("AGENTKIT_MASTER_KEY", raising=False)
store = SecretsStore()
assert len(store._master_key) == KEY_SIZE