fischer-agentkit/tests/unit/channels/test_dingtalk.py

305 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""钉钉 IM 适配器单元测试 (U12)。
覆盖场景:
- 签名校验(有效/无效/过期/缺签名头)
- Token 校验(匹配/不匹配/未配置)
- 文本消息解析(含 @ 提及剥离)
- 不支持消息类型
- send_message 成功/失败
- accessToken 缓存
- senderStaffId/senderId/staffId 回退
"""
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import time
from typing import Any
from unittest.mock import AsyncMock, MagicMock
from agentkit.channels.base import ChannelType, IncomingMessage, OutgoingMessage
from agentkit.channels.dingtalk import DingTalkMessageAdapter
# ---------------------------------------------------------------------------
# 辅助函数
# ---------------------------------------------------------------------------
def _sign(app_secret: str, timestamp_ms: int) -> str:
"""构造钉钉 Sign 头值base64(hmac-sha256(key=app_secret, msg="{ts}\n{secret}"))。"""
string_to_sign = f"{timestamp_ms}\n{app_secret}"
digest = hmac.new(
app_secret.encode("utf-8"),
string_to_sign.encode("utf-8"),
hashlib.sha256,
).digest()
return base64.b64encode(digest).decode("utf-8")
def _make_body(
*,
text: str = "hello",
msgtype: str = "text",
conversation_id: str = "cid1",
sender_staff_id: str | None = "u1",
sender_id: str | None = None,
staff_id: str | None = None,
msg_id: str = "m1",
session_expired: int = 1700000000000,
) -> dict[str, Any]:
"""构造钉钉 webhook 事件 JSON 体。"""
body: dict[str, Any] = {
"conversationId": conversation_id,
"msgId": msg_id,
"msgtype": msgtype,
"sessionWebhookExpiredTime": session_expired,
}
if sender_staff_id is not None:
body["senderStaffId"] = sender_staff_id
if sender_id is not None:
body["senderId"] = sender_id
if staff_id is not None:
body["staffId"] = staff_id
if msgtype == "text":
body["text"] = {"content": text}
else:
body["richText"] = {"content": text}
return body
# ---------------------------------------------------------------------------
# 签名校验
# ---------------------------------------------------------------------------
class TestSignatureVerification:
"""钉钉签名校验。"""
async def test_valid_signature(self):
"""正确 Sign + Timestamp 返回 True。"""
app_secret = "secret123"
adapter = DingTalkMessageAdapter(app_key="k", app_secret=app_secret, robot_code="r")
ts_ms = int(time.time() * 1000)
headers = {"Sign": _sign(app_secret, ts_ms), "Timestamp": str(ts_ms)}
assert await adapter.verify_signature(headers, b"{}") is True
async def test_invalid_signature(self):
"""篡改 Sign 返回 False。"""
app_secret = "secret123"
adapter = DingTalkMessageAdapter(app_key="k", app_secret=app_secret, robot_code="r")
ts_ms = int(time.time() * 1000)
headers = {"Sign": "tampered_signature", "Timestamp": str(ts_ms)}
assert await adapter.verify_signature(headers, b"{}") is False
async def test_expired_timestamp(self):
"""时间戳超过 1 小时返回 False。"""
app_secret = "secret123"
adapter = DingTalkMessageAdapter(app_key="k", app_secret=app_secret, robot_code="r")
ts_ms = int((time.time() - 3700) * 1000)
headers = {"Sign": _sign(app_secret, ts_ms), "Timestamp": str(ts_ms)}
assert await adapter.verify_signature(headers, b"{}") is False
async def test_missing_signature_headers(self):
"""缺 Sign + Timestamp 头且未配置 token 返回 False。"""
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
assert await adapter.verify_signature({}, b"{}") is False
class TestTokenVerification:
"""Token 校验。"""
async def test_token_mismatch(self):
"""配置 token 后 Token 头不匹配返回 False。"""
adapter = DingTalkMessageAdapter(
app_key="k", app_secret="s", robot_code="r", token="abc"
)
headers = {"Token": "wrong"}
assert await adapter.verify_signature(headers, b"{}") is False
async def test_token_match_without_sign(self):
"""配置 token 且 Token 头匹配(无 Sign 头)返回 True。"""
adapter = DingTalkMessageAdapter(
app_key="k", app_secret="s", robot_code="r", token="abc"
)
headers = {"Token": "abc"}
assert await adapter.verify_signature(headers, b"{}") is True
async def test_token_none_skips_token_check(self):
"""token=None 时无需 Token 头,仅凭签名放行。"""
app_secret = "secret123"
adapter = DingTalkMessageAdapter(
app_key="k", app_secret=app_secret, robot_code="r", token=None
)
ts_ms = int(time.time() * 1000)
headers = {"Sign": _sign(app_secret, ts_ms), "Timestamp": str(ts_ms)}
assert await adapter.verify_signature(headers, b"{}") is True
# ---------------------------------------------------------------------------
# 消息解析
# ---------------------------------------------------------------------------
class TestMessageParsing:
"""文本消息解析。"""
async def test_text_message_parsing(self):
"""文本事件解析为 IncomingMessage。"""
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
body = json.dumps(_make_body(text="hello world")).encode("utf-8")
msg = await adapter.receive_message({}, body)
assert isinstance(msg, IncomingMessage)
assert msg.channel == ChannelType.DINGTALK
assert msg.content == "hello world"
assert msg.chat_id == "cid1"
assert msg.user_id == "u1"
assert msg.platform_message_id == "m1"
assert msg.timestamp == "1700000000000"
async def test_mention_stripping(self):
"""@ 机器人前缀被剥离。"""
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
body = json.dumps(_make_body(text="@robotName hello")).encode("utf-8")
msg = await adapter.receive_message({}, body)
assert msg.content == "hello"
async def test_unsupported_message_type(self):
"""非 text 类型返回 unsupported 占位内容。"""
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
body = json.dumps(_make_body(msgtype="image")).encode("utf-8")
msg = await adapter.receive_message({}, body)
assert msg.content.startswith("[unsupported message type: image]")
async def test_senderid_fallback(self):
"""缺少 senderStaffId 时回退到 senderId。"""
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
body = json.dumps(
_make_body(sender_staff_id=None, sender_id="fallback_user")
).encode("utf-8")
msg = await adapter.receive_message({}, body)
assert msg.user_id == "fallback_user"
async def test_staffid_fallback(self):
"""缺少 senderStaffId 与 senderId 时回退到 staffId。"""
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
body = json.dumps(
_make_body(sender_staff_id=None, sender_id=None, staff_id="staff_user")
).encode("utf-8")
msg = await adapter.receive_message({}, body)
assert msg.user_id == "staff_user"
# ---------------------------------------------------------------------------
# send_message
# ---------------------------------------------------------------------------
class TestSendMessage:
"""send_message 行为。"""
async def test_send_message_success(self):
"""HTTP 200 返回 True且 send 调用携带正确的 access token 头。"""
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
mock_token = MagicMock()
mock_token.status_code = 200
mock_token.json.return_value = {"accessToken": "tok_123"}
mock_send = MagicMock()
mock_send.status_code = 200
mock_send.json.return_value = {"processQueryKey": "x"}
mock_client = AsyncMock()
mock_client.post = AsyncMock(side_effect=[mock_token, mock_send])
adapter._client = mock_client
out = OutgoingMessage(channel=ChannelType.DINGTALK, chat_id="c1", content="hi")
assert await adapter.send_message(out) is True
send_call = mock_client.post.call_args_list[1]
assert "robot/oToMessages/batchSend" in send_call.args[0]
assert send_call.kwargs["headers"]["x-acs-dingtalk-access-token"] == "tok_123"
assert send_call.kwargs["json"]["robotCode"] == "r"
async def test_send_message_failure(self):
"""send 返回非 200 返回 False。"""
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
mock_token = MagicMock()
mock_token.status_code = 200
mock_token.json.return_value = {"accessToken": "tok_x"}
mock_send = MagicMock()
mock_send.status_code = 400
mock_send.text = "invalid request"
mock_client = AsyncMock()
mock_client.post = AsyncMock(side_effect=[mock_token, mock_send])
adapter._client = mock_client
out = OutgoingMessage(channel=ChannelType.DINGTALK, chat_id="c1", content="hi")
assert await adapter.send_message(out) is False
async def test_send_message_token_fetch_failure(self):
"""获取 accessToken 失败返回 False。"""
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
mock_token = MagicMock()
mock_token.status_code = 401
mock_token.text = "invalid"
mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=mock_token)
adapter._client = mock_client
out = OutgoingMessage(channel=ChannelType.DINGTALK, chat_id="c1", content="hi")
assert await adapter.send_message(out) is False
async def test_access_token_caching(self):
"""同 TTL 内的两次 send_message 只拉取一次 accessToken。"""
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
mock_token = MagicMock()
mock_token.status_code = 200
mock_token.json.return_value = {"accessToken": "cached_tok"}
mock_send = MagicMock()
mock_send.status_code = 200
mock_send.json.return_value = {"processQueryKey": "x"}
mock_client = AsyncMock()
# 第一次token + send第二次仅 sendtoken 走缓存)
mock_client.post = AsyncMock(side_effect=[mock_token, mock_send, mock_send])
adapter._client = mock_client
out = OutgoingMessage(channel=ChannelType.DINGTALK, chat_id="c1", content="hi")
await adapter.send_message(out)
await adapter.send_message(out)
# 仅 1 次 token 调用 + 2 次 send 调用 = 3 次(未缓存会是 4 次)
assert mock_client.post.call_count == 3
first_url = mock_client.post.call_args_list[0].args[0]
assert "accessToken" in first_url
# ---------------------------------------------------------------------------
# 资源释放
# ---------------------------------------------------------------------------
class TestClose:
"""资源释放。"""
async def test_close_no_client_is_noop(self):
"""未创建 httpx 客户端时 close 不抛异常。"""
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
await adapter.close()
async def test_close_resets_client(self):
"""close 后客户端引用清空。"""
adapter = DingTalkMessageAdapter(app_key="k", app_secret="s", robot_code="r")
adapter._get_client()
assert adapter._client is not None
await adapter.close()
assert adapter._client is None