"""SecretsStore 加密凭证存储测试 (U10 / KTD8)。 覆盖场景: - secrets 写入后加密存储(非明文) - secrets 读取时解密 - 不同 master key / 不同次加密产生不同密文 - 错误 master key 解密失败 - set/get/delete/list CRUD 操作 - 生产环境启动 guard """ from __future__ import annotations import base64 import os import pytest from cryptography.exceptions import InvalidTag from agentkit.channels.secrets import ( KEY_SIZE, SecretsStore, assert_production_master_key, ) # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def master_key() -> bytes: """确定性的测试 master key(32 字节)。""" return b"\x01" * KEY_SIZE @pytest.fixture def store(master_key: bytes) -> SecretsStore: """使用确定性 master key 的 SecretsStore。""" return SecretsStore(master_key=master_key) @pytest.fixture(autouse=True) def _clean_production_env(monkeypatch): """确保测试不在生产模式下运行(移除 AGENTKIT_ENV=production)。""" monkeypatch.delenv("AGENTKIT_ENV", raising=False) monkeypatch.delenv("AGENTKIT_MASTER_KEY", raising=False) # --------------------------------------------------------------------------- # 加密 / 解密往返 # --------------------------------------------------------------------------- class TestEncryptDecrypt: """加密与解密核心行为。""" def test_roundtrip_returns_plaintext(self, store: SecretsStore): """加密后解密应返回原始明文。""" plaintext = "feishu-app-secret-12345" entry = store.encrypt(plaintext) assert store.decrypt(entry) == plaintext def test_encrypted_value_is_not_plaintext(self, store: SecretsStore): """写入后密文不应包含明文(非明文存储)。""" plaintext = "super-secret-token" entry = store.encrypt(plaintext) # 密文 base64 不应包含明文 assert plaintext not in entry.value assert plaintext not in entry.nonce assert plaintext not in entry.salt # 解码后的密文也不应包含明文字节 ciphertext_bytes = base64.b64decode(entry.value) assert plaintext.encode() not in ciphertext_bytes def test_each_encrypt_produces_different_ciphertext(self, store: SecretsStore): """同一明文每次加密应产生不同密文(随机 nonce+salt)。""" plaintext = "same-secret" entry1 = store.encrypt(plaintext) entry2 = store.encrypt(plaintext) assert entry1.value != entry2.value assert entry1.nonce != entry2.nonce assert entry1.salt != entry2.salt # 两者都能正确解密 assert store.decrypt(entry1) == plaintext assert store.decrypt(entry2) == plaintext def test_different_master_keys_produce_different_ciphertext(self): """不同 master key 派生不同 per-row 密钥。""" plaintext = "shared-secret" store_a = SecretsStore(master_key=b"\x01" * KEY_SIZE) store_b = SecretsStore(master_key=b"\x02" * KEY_SIZE) entry_a = store_a.encrypt(plaintext) # store_b 无法解密 store_a 加密的条目 with pytest.raises(InvalidTag): store_b.decrypt(entry_a) # store_a 自身可解密 assert store_a.decrypt(entry_a) == plaintext def test_decrypt_with_wrong_key_fails(self, store: SecretsStore): """错误 master key 解密应抛出 InvalidTag。""" entry = store.encrypt("secret-data") wrong_store = SecretsStore(master_key=b"\xab" * KEY_SIZE) with pytest.raises(InvalidTag): wrong_store.decrypt(entry) def test_entry_has_correct_metadata(self, store: SecretsStore): """加密条目应包含 nonce、salt、key_id 等元数据。""" entry = store.encrypt("data") assert entry.key == "" assert entry.nonce assert entry.salt assert entry.key_id == "default" # nonce 解码后为 12 字节 assert len(base64.b64decode(entry.nonce)) == 12 # salt 解码后为 16 字节 assert len(base64.b64decode(entry.salt)) == 16 # --------------------------------------------------------------------------- # CRUD 异步操作 # --------------------------------------------------------------------------- class TestSecretsCrud: """set/get/delete/list 异步操作。""" async def test_set_and_get_secret(self, store: SecretsStore): """写入后读取应返回明文。""" await store.set_secret("feishu:app_id", "cli_xxx") assert await store.get_secret("feishu:app_id") == "cli_xxx" async def test_get_nonexistent_secret_returns_none(self, store: SecretsStore): """读取不存在的 key 返回 None。""" assert await store.get_secret("missing") is None async def test_set_overwrites_existing(self, store: SecretsStore): """同名 key 写入应覆盖旧值。""" await store.set_secret("k", "old") await store.set_secret("k", "new") assert await store.get_secret("k") == "new" async def test_delete_secret(self, store: SecretsStore): """删除凭证后不可再读取。""" await store.set_secret("k", "v") assert await store.delete_secret("k") is True assert await store.get_secret("k") is None async def test_delete_nonexistent_returns_false(self, store: SecretsStore): """删除不存在的 key 返回 False。""" assert await store.delete_secret("missing") is False async def test_list_keys(self, store: SecretsStore): """列出全部凭证键。""" await store.set_secret("feishu:a", "1") await store.set_secret("feishu:b", "2") await store.set_secret("dingtalk:c", "3") keys = await store.list_keys() assert set(keys) == {"feishu:a", "feishu:b", "dingtalk:c"} async def test_list_keys_with_prefix(self, store: SecretsStore): """按前缀过滤凭证键。""" await store.set_secret("feishu:a", "1") await store.set_secret("feishu:b", "2") await store.set_secret("dingtalk:c", "3") keys = await store.list_keys(prefix="feishu:") assert set(keys) == {"feishu:a", "feishu:b"} async def test_stored_value_is_encrypted_not_plaintext(self, store: SecretsStore): """写入后内部存储的 value 字段应为密文,非明文。""" plaintext = "plaintext-token" await store.set_secret("k", plaintext) # 直接检查内存中的 entry.value,确认非明文 entry = store._store["k"] assert plaintext not in entry.value assert plaintext not in base64.b64decode(entry.value).decode(errors="ignore") # --------------------------------------------------------------------------- # 生产环境 guard # --------------------------------------------------------------------------- class TestProductionGuard: """生产环境 master key 启动 guard。""" def test_dev_mode_allows_env_source(self, master_key: bytes, monkeypatch): """开发模式下 env 来源 master key 允许构造。""" monkeypatch.delenv("AGENTKIT_ENV", raising=False) # 不应抛异常 store = SecretsStore(master_key=master_key, key_source="env") assert store is not None def test_production_mode_rejects_env_source(self, master_key: bytes, monkeypatch): """生产模式下 env 来源 master key 应拒绝启动。""" monkeypatch.setenv("AGENTKIT_ENV", "production") with pytest.raises(RuntimeError, match="云 KMS"): SecretsStore(master_key=master_key, key_source="env") def test_production_mode_rejects_missing_key(self, monkeypatch): """生产模式下无 master key 应拒绝启动。""" monkeypatch.setenv("AGENTKIT_ENV", "production") with pytest.raises(RuntimeError, match="云 KMS"): SecretsStore(master_key=None, key_source="env") def test_production_mode_allows_kms_source(self, master_key: bytes, monkeypatch): """生产模式下 KMS 来源 master key 允许构造。""" monkeypatch.setenv("AGENTKIT_ENV", "production") store = SecretsStore(master_key=master_key, key_source="kms") assert store is not None def test_assert_production_master_key_helper(self, master_key: bytes, monkeypatch): """直接测试 guard 辅助函数。""" monkeypatch.setenv("AGENTKIT_ENV", "production") # env 来源拒绝 with pytest.raises(RuntimeError): assert_production_master_key(master_key, source="env") # kms 来源通过 assert_production_master_key(master_key, source="kms") # 空密钥拒绝 with pytest.raises(RuntimeError): assert_production_master_key(None, source="kms") # --------------------------------------------------------------------------- # 环境变量加载 fallback # --------------------------------------------------------------------------- class TestEnvLoading: """开发环境从环境变量加载 master key。""" def test_loads_master_key_from_env(self, monkeypatch): """AGENTKIT_MASTER_KEY 环境变量应被加载。""" key = os.urandom(KEY_SIZE) monkeypatch.setenv("AGENTKIT_MASTER_KEY", base64.b64encode(key).decode()) store = SecretsStore() # 不显式传入 master_key assert store._master_key == key def test_ephemeral_key_when_no_env(self, monkeypatch): """无环境变量时生成临时 key(开发 fallback)。""" monkeypatch.delenv("AGENTKIT_MASTER_KEY", raising=False) store = SecretsStore() assert len(store._master_key) == KEY_SIZE