fischer-agentkit/tests/unit/channels/test_base.py

369 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""MessageAdapter ABC 与渠道管理端点测试 (U10)。
覆盖场景:
- MessageAdapter ABC 不能直接实例化
- 具体子类实现协议方法后可正常工作
- ChannelType / MessageDirection 枚举
- IncomingMessage / OutgoingMessage 数据类
- 渠道管理端点 CRUD 工作GET/POST/GET/PUT/DELETE
"""
from __future__ import annotations
import pytest
from fastapi import FastAPI
from fastapi.testclient import TestClient
from agentkit.channels import (
ChannelType,
IncomingMessage,
MessageAdapter,
MessageDirection,
OutgoingMessage,
)
from agentkit.channels.secrets import KEY_SIZE, SecretsStore
from agentkit.server.routes import channels as channels_routes
# ---------------------------------------------------------------------------
# ABC 协议测试
# ---------------------------------------------------------------------------
class TestMessageAdapterAbc:
"""MessageAdapter 抽象基类协议。"""
def test_abc_cannot_be_instantiated_directly(self):
"""ABC 不能直接实例化(缺少抽象方法实现)。"""
with pytest.raises(TypeError):
MessageAdapter() # type: ignore[abstract]
def test_subclass_missing_method_cannot_instantiate(self):
"""子类未实现全部抽象方法时仍不能实例化。"""
class PartialAdapter(MessageAdapter):
async def verify_signature(self, headers, body):
return True
with pytest.raises(TypeError):
PartialAdapter() # type: ignore[abstract]
def test_concrete_subclass_works(self):
"""完整实现的子类可正常实例化并调用方法。"""
adapter = _StubAdapter()
assert isinstance(adapter, MessageAdapter)
async def test_concrete_subclass_lifecycle(self):
"""具体子类的完整生命周期调用。"""
adapter = _StubAdapter()
headers = {"X-Signature": "valid"}
body = b'{"msg":"hi"}'
# verify_signature
assert await adapter.verify_signature(headers, body) is True
# receive_message
msg = await adapter.receive_message(headers, body)
assert isinstance(msg, IncomingMessage)
assert msg.channel == ChannelType.FEISHU
assert msg.content == "hi"
# send_message
out = OutgoingMessage(channel=ChannelType.FEISHU, chat_id="c1", content="ok")
assert await adapter.send_message(out) is True
# close
await adapter.close()
assert adapter.closed is True
async def test_verify_signature_failure(self):
"""verify_signature 返回 False 时调用方应拒绝。"""
adapter = _StubAdapter()
# _StubAdapter 对 "bad" 签名返回 False
assert await adapter.verify_signature({"X-Signature": "bad"}, b"") is False
# ---------------------------------------------------------------------------
# 枚举与数据类测试
# ---------------------------------------------------------------------------
class TestEnumsAndDataclasses:
"""枚举值与数据类字段。"""
def test_channel_type_values(self):
"""ChannelType 包含飞书、钉钉、企微、Slack。"""
assert ChannelType.FEISHU.value == "feishu"
assert ChannelType.DINGTALK.value == "dingtalk"
assert ChannelType.WECOM.value == "wecom"
assert ChannelType.SLACK.value == "slack"
def test_message_direction_values(self):
"""MessageDirection 包含 inbound/outbound。"""
assert MessageDirection.INBOUND.value == "inbound"
assert MessageDirection.OUTBOUND.value == "outbound"
def test_incoming_message_defaults(self):
"""IncomingMessage 默认 raw_event 为空 dicttimestamp 为空。"""
msg = IncomingMessage(
channel=ChannelType.FEISHU,
platform_message_id="m1",
user_id="u1",
chat_id="c1",
content="hello",
)
assert msg.raw_event == {}
assert msg.timestamp == ""
# 默认值独立性(每次实例化应得到独立 dict
msg2 = IncomingMessage(
channel=ChannelType.FEISHU,
platform_message_id="m2",
user_id="u2",
chat_id="c2",
content="hi",
)
msg.raw_event["k"] = "v"
assert "k" not in msg2.raw_event
def test_outgoing_message_optional_reply_to(self):
"""OutgoingMessage 的 reply_to_message_id 默认 None。"""
msg = OutgoingMessage(channel=ChannelType.SLACK, chat_id="c1", content="reply")
assert msg.reply_to_message_id is None
# ---------------------------------------------------------------------------
# 渠道管理端点 CRUD 测试
# ---------------------------------------------------------------------------
@pytest.fixture
def app(monkeypatch):
"""构造仅挂载 channels 路由的最小 FastAPI 应用。
使用确定性 master key 并清理模块级状态以保证测试隔离。
注入伪造的 admin 用户中间件,使 SYSTEM_CONFIG 权限校验通过。
"""
monkeypatch.delenv("AGENTKIT_ENV", raising=False)
monkeypatch.delenv("AGENTKIT_MASTER_KEY", raising=False)
# 注入确定性 store避免依赖环境变量
channels_routes._secrets_store = SecretsStore(master_key=b"\x01" * KEY_SIZE)
channels_routes._channels.clear()
application = FastAPI()
application.include_router(channels_routes.router, prefix="/api/v1")
# 伪造 admin 用户,使 require_permission(SYSTEM_CONFIG) 通过
# SYSTEM_CONFIG 为高风险权限dev mode 下会 401故需显式注入用户
@application.middleware("http")
async def _fake_admin_auth(request, call_next):
request.state.current_user = {
"user_id": "u1",
"username": "admin",
"role": "admin",
}
return await call_next(request)
return application
@pytest.fixture
def client(app):
return TestClient(app)
class TestChannelRoutesCrud:
"""渠道管理端点 CRUD。"""
def test_list_channels_empty(self, client):
"""空状态下列出渠道返回空列表。"""
resp = client.get("/api/v1/channels")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 0
assert data["channels"] == []
def test_create_and_get_channel(self, client):
"""创建渠道后可读取配置(不含凭证明文)。"""
resp = client.post(
"/api/v1/channels",
json={
"channel_id": "feishu-prod",
"channel_type": "feishu",
"name": "飞书生产",
"config": {"webhook_path": "/hook/feishu"},
"secrets": {"app_id": "cli_xxx", "app_secret": "topsecret"},
},
)
assert resp.status_code == 201, resp.text
created = resp.json()
assert created["channel_id"] == "feishu-prod"
assert created["channel_type"] == "feishu"
assert created["name"] == "飞书生产"
# secret_keys 返回字段名,不含明文
assert set(created["secret_keys"]) == {"app_id", "app_secret"}
assert "topsecret" not in resp.text
# GET 单个渠道
resp = client.get("/api/v1/channels/feishu-prod")
assert resp.status_code == 200
got = resp.json()
assert got["channel_id"] == "feishu-prod"
assert set(got["secret_keys"]) == {"app_id", "app_secret"}
assert "topsecret" not in resp.text
def test_create_duplicate_returns_409(self, client):
"""重复创建同 ID 渠道返回 409。"""
payload = {
"channel_id": "dingtalk-1",
"channel_type": "dingtalk",
"name": "钉钉",
}
client.post("/api/v1/channels", json=payload)
resp = client.post("/api/v1/channels", json=payload)
assert resp.status_code == 409
def test_create_invalid_channel_id_returns_400(self, client):
"""非法渠道 ID 返回 400。"""
resp = client.post(
"/api/v1/channels",
json={
"channel_id": "Invalid ID!",
"channel_type": "feishu",
"name": "x",
},
)
assert resp.status_code == 400
def test_get_nonexistent_returns_404(self, client):
"""获取不存在渠道返回 404。"""
resp = client.get("/api/v1/channels/missing")
assert resp.status_code == 404
def test_update_channel(self, client):
"""更新渠道配置与凭证。"""
client.post(
"/api/v1/channels",
json={
"channel_id": "wecom-1",
"channel_type": "wecom",
"name": "企微",
"secrets": {"token": "old-token"},
},
)
resp = client.put(
"/api/v1/channels/wecom-1",
json={
"name": "企微更新",
"config": {"enabled": True},
"secrets": {"token": "new-token", "extra": "v"},
},
)
assert resp.status_code == 200, resp.text
updated = resp.json()
assert updated["name"] == "企微更新"
assert updated["config"] == {"enabled": True}
assert set(updated["secret_keys"]) == {"token", "extra"}
# 凭证已在 secrets store 中加密更新(明文不可见)
assert "new-token" not in resp.text
def test_delete_channel(self, client):
"""删除渠道后不可再获取。"""
client.post(
"/api/v1/channels",
json={
"channel_id": "slack-1",
"channel_type": "slack",
"name": "Slack",
},
)
resp = client.delete("/api/v1/channels/slack-1")
assert resp.status_code == 200
assert resp.json() == {"deleted": "slack-1"}
# 再获取应 404
assert client.get("/api/v1/channels/slack-1").status_code == 404
def test_delete_nonexistent_returns_404(self, client):
"""删除不存在渠道返回 404。"""
resp = client.delete("/api/v1/channels/missing")
assert resp.status_code == 404
def test_list_after_create(self, client):
"""创建后列出渠道包含新建项。"""
client.post(
"/api/v1/channels",
json={
"channel_id": "feishu-a",
"channel_type": "feishu",
"name": "A",
},
)
client.post(
"/api/v1/channels",
json={
"channel_id": "dingtalk-b",
"channel_type": "dingtalk",
"name": "B",
},
)
resp = client.get("/api/v1/channels")
assert resp.status_code == 200
data = resp.json()
assert data["total"] == 2
ids = {c["channel_id"] for c in data["channels"]}
assert ids == {"feishu-a", "dingtalk-b"}
def test_secrets_encrypted_in_store(self, client):
"""创建渠道后 secrets store 内部存储为密文。"""
client.post(
"/api/v1/channels",
json={
"channel_id": "feishu-enc",
"channel_type": "feishu",
"name": "enc",
"secrets": {"app_secret": "plaintext-value"},
},
)
store = channels_routes._get_secrets_store()
entry = store._store["feishu-enc:app_secret"]
# 内部存储的 value 不含明文
assert "plaintext-value" not in entry.value
# 但可正确解密store 为纯内存对象,可在新事件循环中调用)
import asyncio
decrypted = asyncio.run(store.get_secret("feishu-enc:app_secret"))
assert decrypted == "plaintext-value"
# ---------------------------------------------------------------------------
# 辅助:具体子类桩
# ---------------------------------------------------------------------------
class _StubAdapter(MessageAdapter):
"""用于测试的桩适配器。"""
def __init__(self):
self.closed = False
async def verify_signature(self, headers: dict[str, str], body: bytes) -> bool:
return headers.get("X-Signature") == "valid"
async def receive_message(self, headers: dict[str, str], body: bytes) -> IncomingMessage:
import json
data = json.loads(body)
return IncomingMessage(
channel=ChannelType.FEISHU,
platform_message_id="m1",
user_id="u1",
chat_id="c1",
content=data.get("msg", ""),
raw_event=data,
)
async def send_message(self, message: OutgoingMessage) -> bool:
return True
async def close(self) -> None:
self.closed = True