"""MessageAdapter ABC 与渠道管理端点测试 (U10)。 覆盖场景: - MessageAdapter ABC 不能直接实例化 - 具体子类实现协议方法后可正常工作 - ChannelType / MessageDirection 枚举 - IncomingMessage / OutgoingMessage 数据类 - 渠道管理端点 CRUD 工作(GET/POST/GET/PUT/DELETE) """ from __future__ import annotations import pytest from fastapi import FastAPI from fastapi.testclient import TestClient from agentkit.channels import ( ChannelType, IncomingMessage, MessageAdapter, MessageDirection, OutgoingMessage, ) from agentkit.channels.secrets import KEY_SIZE, SecretsStore from agentkit.server.routes import channels as channels_routes # --------------------------------------------------------------------------- # ABC 协议测试 # --------------------------------------------------------------------------- class TestMessageAdapterAbc: """MessageAdapter 抽象基类协议。""" def test_abc_cannot_be_instantiated_directly(self): """ABC 不能直接实例化(缺少抽象方法实现)。""" with pytest.raises(TypeError): MessageAdapter() # type: ignore[abstract] def test_subclass_missing_method_cannot_instantiate(self): """子类未实现全部抽象方法时仍不能实例化。""" class PartialAdapter(MessageAdapter): async def verify_signature(self, headers, body): return True with pytest.raises(TypeError): PartialAdapter() # type: ignore[abstract] def test_concrete_subclass_works(self): """完整实现的子类可正常实例化并调用方法。""" adapter = _StubAdapter() assert isinstance(adapter, MessageAdapter) async def test_concrete_subclass_lifecycle(self): """具体子类的完整生命周期调用。""" adapter = _StubAdapter() headers = {"X-Signature": "valid"} body = b'{"msg":"hi"}' # verify_signature assert await adapter.verify_signature(headers, body) is True # receive_message msg = await adapter.receive_message(headers, body) assert isinstance(msg, IncomingMessage) assert msg.channel == ChannelType.FEISHU assert msg.content == "hi" # send_message out = OutgoingMessage(channel=ChannelType.FEISHU, chat_id="c1", content="ok") assert await adapter.send_message(out) is True # close await adapter.close() assert adapter.closed is True async def test_verify_signature_failure(self): """verify_signature 返回 False 时调用方应拒绝。""" adapter = _StubAdapter() # _StubAdapter 对 "bad" 签名返回 False assert await adapter.verify_signature({"X-Signature": "bad"}, b"") is False # --------------------------------------------------------------------------- # 枚举与数据类测试 # --------------------------------------------------------------------------- class TestEnumsAndDataclasses: """枚举值与数据类字段。""" def test_channel_type_values(self): """ChannelType 包含飞书、钉钉、企微、Slack。""" assert ChannelType.FEISHU.value == "feishu" assert ChannelType.DINGTALK.value == "dingtalk" assert ChannelType.WECOM.value == "wecom" assert ChannelType.SLACK.value == "slack" def test_message_direction_values(self): """MessageDirection 包含 inbound/outbound。""" assert MessageDirection.INBOUND.value == "inbound" assert MessageDirection.OUTBOUND.value == "outbound" def test_incoming_message_defaults(self): """IncomingMessage 默认 raw_event 为空 dict,timestamp 为空。""" msg = IncomingMessage( channel=ChannelType.FEISHU, platform_message_id="m1", user_id="u1", chat_id="c1", content="hello", ) assert msg.raw_event == {} assert msg.timestamp == "" # 默认值独立性(每次实例化应得到独立 dict) msg2 = IncomingMessage( channel=ChannelType.FEISHU, platform_message_id="m2", user_id="u2", chat_id="c2", content="hi", ) msg.raw_event["k"] = "v" assert "k" not in msg2.raw_event def test_outgoing_message_optional_reply_to(self): """OutgoingMessage 的 reply_to_message_id 默认 None。""" msg = OutgoingMessage(channel=ChannelType.SLACK, chat_id="c1", content="reply") assert msg.reply_to_message_id is None # --------------------------------------------------------------------------- # 渠道管理端点 CRUD 测试 # --------------------------------------------------------------------------- @pytest.fixture def app(monkeypatch): """构造仅挂载 channels 路由的最小 FastAPI 应用。 使用确定性 master key 并清理模块级状态以保证测试隔离。 注入伪造的 admin 用户中间件,使 SYSTEM_CONFIG 权限校验通过。 """ monkeypatch.delenv("AGENTKIT_ENV", raising=False) monkeypatch.delenv("AGENTKIT_MASTER_KEY", raising=False) # 注入确定性 store,避免依赖环境变量 channels_routes._secrets_store = SecretsStore(master_key=b"\x01" * KEY_SIZE) channels_routes._channels.clear() application = FastAPI() application.include_router(channels_routes.router, prefix="/api/v1") # 伪造 admin 用户,使 require_permission(SYSTEM_CONFIG) 通过 # (SYSTEM_CONFIG 为高风险权限,dev mode 下会 401,故需显式注入用户) @application.middleware("http") async def _fake_admin_auth(request, call_next): request.state.current_user = { "user_id": "u1", "username": "admin", "role": "admin", } return await call_next(request) return application @pytest.fixture def client(app): return TestClient(app) class TestChannelRoutesCrud: """渠道管理端点 CRUD。""" def test_list_channels_empty(self, client): """空状态下列出渠道返回空列表。""" resp = client.get("/api/v1/channels") assert resp.status_code == 200 data = resp.json() assert data["total"] == 0 assert data["channels"] == [] def test_create_and_get_channel(self, client): """创建渠道后可读取配置(不含凭证明文)。""" resp = client.post( "/api/v1/channels", json={ "channel_id": "feishu-prod", "channel_type": "feishu", "name": "飞书生产", "config": {"webhook_path": "/hook/feishu"}, "secrets": {"app_id": "cli_xxx", "app_secret": "topsecret"}, }, ) assert resp.status_code == 201, resp.text created = resp.json() assert created["channel_id"] == "feishu-prod" assert created["channel_type"] == "feishu" assert created["name"] == "飞书生产" # secret_keys 返回字段名,不含明文 assert set(created["secret_keys"]) == {"app_id", "app_secret"} assert "topsecret" not in resp.text # GET 单个渠道 resp = client.get("/api/v1/channels/feishu-prod") assert resp.status_code == 200 got = resp.json() assert got["channel_id"] == "feishu-prod" assert set(got["secret_keys"]) == {"app_id", "app_secret"} assert "topsecret" not in resp.text def test_create_duplicate_returns_409(self, client): """重复创建同 ID 渠道返回 409。""" payload = { "channel_id": "dingtalk-1", "channel_type": "dingtalk", "name": "钉钉", } client.post("/api/v1/channels", json=payload) resp = client.post("/api/v1/channels", json=payload) assert resp.status_code == 409 def test_create_invalid_channel_id_returns_400(self, client): """非法渠道 ID 返回 400。""" resp = client.post( "/api/v1/channels", json={ "channel_id": "Invalid ID!", "channel_type": "feishu", "name": "x", }, ) assert resp.status_code == 400 def test_get_nonexistent_returns_404(self, client): """获取不存在渠道返回 404。""" resp = client.get("/api/v1/channels/missing") assert resp.status_code == 404 def test_update_channel(self, client): """更新渠道配置与凭证。""" client.post( "/api/v1/channels", json={ "channel_id": "wecom-1", "channel_type": "wecom", "name": "企微", "secrets": {"token": "old-token"}, }, ) resp = client.put( "/api/v1/channels/wecom-1", json={ "name": "企微更新", "config": {"enabled": True}, "secrets": {"token": "new-token", "extra": "v"}, }, ) assert resp.status_code == 200, resp.text updated = resp.json() assert updated["name"] == "企微更新" assert updated["config"] == {"enabled": True} assert set(updated["secret_keys"]) == {"token", "extra"} # 凭证已在 secrets store 中加密更新(明文不可见) assert "new-token" not in resp.text def test_delete_channel(self, client): """删除渠道后不可再获取。""" client.post( "/api/v1/channels", json={ "channel_id": "slack-1", "channel_type": "slack", "name": "Slack", }, ) resp = client.delete("/api/v1/channels/slack-1") assert resp.status_code == 200 assert resp.json() == {"deleted": "slack-1"} # 再获取应 404 assert client.get("/api/v1/channels/slack-1").status_code == 404 def test_delete_nonexistent_returns_404(self, client): """删除不存在渠道返回 404。""" resp = client.delete("/api/v1/channels/missing") assert resp.status_code == 404 def test_list_after_create(self, client): """创建后列出渠道包含新建项。""" client.post( "/api/v1/channels", json={ "channel_id": "feishu-a", "channel_type": "feishu", "name": "A", }, ) client.post( "/api/v1/channels", json={ "channel_id": "dingtalk-b", "channel_type": "dingtalk", "name": "B", }, ) resp = client.get("/api/v1/channels") assert resp.status_code == 200 data = resp.json() assert data["total"] == 2 ids = {c["channel_id"] for c in data["channels"]} assert ids == {"feishu-a", "dingtalk-b"} def test_secrets_encrypted_in_store(self, client): """创建渠道后 secrets store 内部存储为密文。""" client.post( "/api/v1/channels", json={ "channel_id": "feishu-enc", "channel_type": "feishu", "name": "enc", "secrets": {"app_secret": "plaintext-value"}, }, ) store = channels_routes._get_secrets_store() entry = store._store["feishu-enc:app_secret"] # 内部存储的 value 不含明文 assert "plaintext-value" not in entry.value # 但可正确解密(store 为纯内存对象,可在新事件循环中调用) import asyncio decrypted = asyncio.run(store.get_secret("feishu-enc:app_secret")) assert decrypted == "plaintext-value" # --------------------------------------------------------------------------- # 辅助:具体子类桩 # --------------------------------------------------------------------------- class _StubAdapter(MessageAdapter): """用于测试的桩适配器。""" def __init__(self): self.closed = False async def verify_signature(self, headers: dict[str, str], body: bytes) -> bool: return headers.get("X-Signature") == "valid" async def receive_message(self, headers: dict[str, str], body: bytes) -> IncomingMessage: import json data = json.loads(body) return IncomingMessage( channel=ChannelType.FEISHU, platform_message_id="m1", user_id="u1", chat_id="c1", content=data.get("msg", ""), raw_event=data, ) async def send_message(self, message: OutgoingMessage) -> bool: return True async def close(self) -> None: self.closed = True