305 lines
12 KiB
Python
305 lines
12 KiB
Python
"""钉钉 IM 适配器单元测试 (U12)。
|
||
|
||
覆盖场景:
|
||
- 签名校验(有效/无效/过期/缺签名头)
|
||
- Token 校验(匹配/不匹配/未配置)
|
||
- 文本消息解析(含 @ 提及剥离)
|
||
- 不支持消息类型
|
||
- send_message 成功/失败
|
||
- accessToken 缓存
|
||
- senderStaffId/senderId/staffId 回退
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import base64
|
||
import hashlib
|
||
import hmac
|
||
import json
|
||
import time
|
||
from typing import Any
|
||
from unittest.mock import AsyncMock, MagicMock
|
||
|
||
from agentkit.channels.base import ChannelType, IncomingMessage, OutgoingMessage
|
||
from agentkit.channels.dingtalk import DingTalkMessageAdapter
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 辅助函数
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _sign(app_secret: str, timestamp_ms: int) -> str:
|
||
"""构造钉钉 Sign 头值:base64(hmac-sha256(key=app_secret, msg="{ts}\n{secret}"))。"""
|
||
string_to_sign = f"{timestamp_ms}\n{app_secret}"
|
||
digest = hmac.new(
|
||
app_secret.encode("utf-8"),
|
||
string_to_sign.encode("utf-8"),
|
||
hashlib.sha256,
|
||
).digest()
|
||
return base64.b64encode(digest).decode("utf-8")
|
||
|
||
|
||
def _make_body(
|
||
*,
|
||
text: str = "hello",
|
||
msgtype: str = "text",
|
||
conversation_id: str = "cid1",
|
||
sender_staff_id: str | None = "u1",
|
||
sender_id: str | None = None,
|
||
staff_id: str | None = None,
|
||
msg_id: str = "m1",
|
||
session_expired: int = 1700000000000,
|
||
) -> dict[str, Any]:
|
||
"""构造钉钉 webhook 事件 JSON 体。"""
|
||
body: dict[str, Any] = {
|
||
"conversationId": conversation_id,
|
||
"msgId": msg_id,
|
||
"msgtype": msgtype,
|
||
"sessionWebhookExpiredTime": session_expired,
|
||
}
|
||
if sender_staff_id is not None:
|
||
body["senderStaffId"] = sender_staff_id
|
||
if sender_id is not None:
|
||
body["senderId"] = sender_id
|
||
if staff_id is not None:
|
||
body["staffId"] = staff_id
|
||
if msgtype == "text":
|
||
body["text"] = {"content": text}
|
||
else:
|
||
body["richText"] = {"content": text}
|
||
return body
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 签名校验
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class TestSignatureVerification:
|
||
"""钉钉签名校验。"""
|
||
|
||
async def test_valid_signature(self):
|
||
"""正确 Sign + Timestamp 返回 True。"""
|
||
app_secret = "secret123"
|
||
adapter = DingTalkMessageAdapter(app_key="k", app_secret=app_secret, robot_code="r")
|
||
ts_ms = int(time.time() * 1000)
|
||
headers = {"Sign": _sign(app_secret, ts_ms), "Timestamp": str(ts_ms)}
|
||
assert await adapter.verify_signature(headers, b"{}") is True
|
||
|
||
async def test_invalid_signature(self):
|
||
"""篡改 Sign 返回 False。"""
|
||
app_secret = "secret123"
|
||
adapter = DingTalkMessageAdapter(app_key="k", app_secret=app_secret, robot_code="r")
|
||
ts_ms = int(time.time() * 1000)
|
||
headers = {"Sign": "tampered_signature", "Timestamp": str(ts_ms)}
|
||
assert await adapter.verify_signature(headers, b"{}") is False
|
||
|
||
async def test_expired_timestamp(self):
|
||
"""时间戳超过 1 小时返回 False。"""
|
||
app_secret = "secret123"
|
||
adapter = DingTalkMessageAdapter(app_key="k", app_secret=app_secret, robot_code="r")
|
||
ts_ms = int((time.time() - 3700) * 1000)
|
||
headers = {"Sign": _sign(app_secret, ts_ms), "Timestamp": str(ts_ms)}
|
||
assert await adapter.verify_signature(headers, b"{}") is False
|
||
|
||
async def test_missing_signature_headers(self):
|
||
"""缺 Sign + Timestamp 头且未配置 token 返回 False。"""
|
||
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
|
||
assert await adapter.verify_signature({}, b"{}") is False
|
||
|
||
|
||
class TestTokenVerification:
|
||
"""Token 校验。"""
|
||
|
||
async def test_token_mismatch(self):
|
||
"""配置 token 后 Token 头不匹配返回 False。"""
|
||
adapter = DingTalkMessageAdapter(
|
||
app_key="k", app_secret="s", robot_code="r", token="abc"
|
||
)
|
||
headers = {"Token": "wrong"}
|
||
assert await adapter.verify_signature(headers, b"{}") is False
|
||
|
||
async def test_token_match_without_sign(self):
|
||
"""配置 token 且 Token 头匹配(无 Sign 头)返回 True。"""
|
||
adapter = DingTalkMessageAdapter(
|
||
app_key="k", app_secret="s", robot_code="r", token="abc"
|
||
)
|
||
headers = {"Token": "abc"}
|
||
assert await adapter.verify_signature(headers, b"{}") is True
|
||
|
||
async def test_token_none_skips_token_check(self):
|
||
"""token=None 时无需 Token 头,仅凭签名放行。"""
|
||
app_secret = "secret123"
|
||
adapter = DingTalkMessageAdapter(
|
||
app_key="k", app_secret=app_secret, robot_code="r", token=None
|
||
)
|
||
ts_ms = int(time.time() * 1000)
|
||
headers = {"Sign": _sign(app_secret, ts_ms), "Timestamp": str(ts_ms)}
|
||
assert await adapter.verify_signature(headers, b"{}") is True
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 消息解析
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class TestMessageParsing:
|
||
"""文本消息解析。"""
|
||
|
||
async def test_text_message_parsing(self):
|
||
"""文本事件解析为 IncomingMessage。"""
|
||
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
|
||
body = json.dumps(_make_body(text="hello world")).encode("utf-8")
|
||
msg = await adapter.receive_message({}, body)
|
||
assert isinstance(msg, IncomingMessage)
|
||
assert msg.channel == ChannelType.DINGTALK
|
||
assert msg.content == "hello world"
|
||
assert msg.chat_id == "cid1"
|
||
assert msg.user_id == "u1"
|
||
assert msg.platform_message_id == "m1"
|
||
assert msg.timestamp == "1700000000000"
|
||
|
||
async def test_mention_stripping(self):
|
||
"""@ 机器人前缀被剥离。"""
|
||
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
|
||
body = json.dumps(_make_body(text="@robotName hello")).encode("utf-8")
|
||
msg = await adapter.receive_message({}, body)
|
||
assert msg.content == "hello"
|
||
|
||
async def test_unsupported_message_type(self):
|
||
"""非 text 类型返回 unsupported 占位内容。"""
|
||
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
|
||
body = json.dumps(_make_body(msgtype="image")).encode("utf-8")
|
||
msg = await adapter.receive_message({}, body)
|
||
assert msg.content.startswith("[unsupported message type: image]")
|
||
|
||
async def test_senderid_fallback(self):
|
||
"""缺少 senderStaffId 时回退到 senderId。"""
|
||
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
|
||
body = json.dumps(
|
||
_make_body(sender_staff_id=None, sender_id="fallback_user")
|
||
).encode("utf-8")
|
||
msg = await adapter.receive_message({}, body)
|
||
assert msg.user_id == "fallback_user"
|
||
|
||
async def test_staffid_fallback(self):
|
||
"""缺少 senderStaffId 与 senderId 时回退到 staffId。"""
|
||
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
|
||
body = json.dumps(
|
||
_make_body(sender_staff_id=None, sender_id=None, staff_id="staff_user")
|
||
).encode("utf-8")
|
||
msg = await adapter.receive_message({}, body)
|
||
assert msg.user_id == "staff_user"
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# send_message
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class TestSendMessage:
|
||
"""send_message 行为。"""
|
||
|
||
async def test_send_message_success(self):
|
||
"""HTTP 200 返回 True,且 send 调用携带正确的 access token 头。"""
|
||
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
|
||
mock_token = MagicMock()
|
||
mock_token.status_code = 200
|
||
mock_token.json.return_value = {"accessToken": "tok_123"}
|
||
|
||
mock_send = MagicMock()
|
||
mock_send.status_code = 200
|
||
mock_send.json.return_value = {"processQueryKey": "x"}
|
||
|
||
mock_client = AsyncMock()
|
||
mock_client.post = AsyncMock(side_effect=[mock_token, mock_send])
|
||
adapter._client = mock_client
|
||
|
||
out = OutgoingMessage(channel=ChannelType.DINGTALK, chat_id="c1", content="hi")
|
||
assert await adapter.send_message(out) is True
|
||
|
||
send_call = mock_client.post.call_args_list[1]
|
||
assert "robot/oToMessages/batchSend" in send_call.args[0]
|
||
assert send_call.kwargs["headers"]["x-acs-dingtalk-access-token"] == "tok_123"
|
||
assert send_call.kwargs["json"]["robotCode"] == "r"
|
||
|
||
async def test_send_message_failure(self):
|
||
"""send 返回非 200 返回 False。"""
|
||
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
|
||
mock_token = MagicMock()
|
||
mock_token.status_code = 200
|
||
mock_token.json.return_value = {"accessToken": "tok_x"}
|
||
|
||
mock_send = MagicMock()
|
||
mock_send.status_code = 400
|
||
mock_send.text = "invalid request"
|
||
|
||
mock_client = AsyncMock()
|
||
mock_client.post = AsyncMock(side_effect=[mock_token, mock_send])
|
||
adapter._client = mock_client
|
||
|
||
out = OutgoingMessage(channel=ChannelType.DINGTALK, chat_id="c1", content="hi")
|
||
assert await adapter.send_message(out) is False
|
||
|
||
async def test_send_message_token_fetch_failure(self):
|
||
"""获取 accessToken 失败返回 False。"""
|
||
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
|
||
mock_token = MagicMock()
|
||
mock_token.status_code = 401
|
||
mock_token.text = "invalid"
|
||
|
||
mock_client = AsyncMock()
|
||
mock_client.post = AsyncMock(return_value=mock_token)
|
||
adapter._client = mock_client
|
||
|
||
out = OutgoingMessage(channel=ChannelType.DINGTALK, chat_id="c1", content="hi")
|
||
assert await adapter.send_message(out) is False
|
||
|
||
async def test_access_token_caching(self):
|
||
"""同 TTL 内的两次 send_message 只拉取一次 accessToken。"""
|
||
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
|
||
mock_token = MagicMock()
|
||
mock_token.status_code = 200
|
||
mock_token.json.return_value = {"accessToken": "cached_tok"}
|
||
|
||
mock_send = MagicMock()
|
||
mock_send.status_code = 200
|
||
mock_send.json.return_value = {"processQueryKey": "x"}
|
||
|
||
mock_client = AsyncMock()
|
||
# 第一次:token + send;第二次:仅 send(token 走缓存)
|
||
mock_client.post = AsyncMock(side_effect=[mock_token, mock_send, mock_send])
|
||
adapter._client = mock_client
|
||
|
||
out = OutgoingMessage(channel=ChannelType.DINGTALK, chat_id="c1", content="hi")
|
||
await adapter.send_message(out)
|
||
await adapter.send_message(out)
|
||
|
||
# 仅 1 次 token 调用 + 2 次 send 调用 = 3 次(未缓存会是 4 次)
|
||
assert mock_client.post.call_count == 3
|
||
first_url = mock_client.post.call_args_list[0].args[0]
|
||
assert "accessToken" in first_url
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 资源释放
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
class TestClose:
|
||
"""资源释放。"""
|
||
|
||
async def test_close_no_client_is_noop(self):
|
||
"""未创建 httpx 客户端时 close 不抛异常。"""
|
||
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
|
||
await adapter.close()
|
||
|
||
async def test_close_resets_client(self):
|
||
"""close 后客户端引用清空。"""
|
||
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
|
||
adapter._get_client()
|
||
assert adapter._client is not None
|
||
await adapter.close()
|
||
assert adapter._client is None
|