fischer-agentkit/tests/unit/channels/test_feishu.py

626 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""飞书 IM 适配器端到端测试 (U11)。
覆盖场景:
- URL 验证 challenge 事件
- 签名校验(有效/无效/过期/缺失 encrypt_key
- 加密事件 AES-256-CBC 解密
- 文本消息解析(含 @ 提及剥离)
- 不支持消息类型
- verification_token 校验
- send_message 成功/失败
- tenant_access_token 缓存
- Webhook 端点限流、签名失败、URL 验证、nonce dedup、404/400、立即 200
"""
from __future__ import annotations
import base64
import hashlib
import json
import time
from datetime import datetime, timezone
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.padding import PKCS7
from fastapi import FastAPI
from fastapi.testclient import TestClient
from agentkit.channels.base import ChannelType, IncomingMessage, OutgoingMessage
from agentkit.channels.feishu import (
FeishuMessageAdapter,
SignatureVerificationError,
URLVerificationChallenge,
)
from agentkit.channels.secrets import KEY_SIZE, SecretsStore
from agentkit.server.routes import channels as channels_routes
# ---------------------------------------------------------------------------
# 辅助函数
# ---------------------------------------------------------------------------
def _make_event_body(
*,
text: str = "hello",
message_type: str = "text",
event_id: str = "evt_001",
chat_id: str = "oc_test_chat",
open_id: str = "ou_test_user",
create_time: str = "1700000000000",
verification_token: str | None = "right_token",
) -> dict[str, Any]:
"""构造飞书事件 JSON 体(非加密)。"""
content: Any
if message_type == "text":
content = json.dumps({"text": text})
else:
content = json.dumps({"image_key": "img_xxx"})
event: dict[str, Any] = {
"event_id": event_id,
"event": {
"message": {
"message_id": "om_test_msg",
"chat_id": chat_id,
"message_type": message_type,
"content": content,
"create_time": create_time,
},
"sender": {"sender_id": {"open_id": open_id}},
},
}
if verification_token is not None:
event["verification_token"] = verification_token
return event
def _sign(
*, encrypt_key: str, body: bytes, timestamp: int | None = None, nonce: str = "n1"
) -> dict[str, str]:
"""构造包含正确飞书签名的 headers。"""
if timestamp is None:
timestamp = int(datetime.now(timezone.utc).timestamp())
body_str = body.decode("utf-8")
sig = hashlib.sha256(f"{timestamp}{nonce}{encrypt_key}{body_str}".encode("utf-8")).hexdigest()
return {
"X-Lark-Signature": sig,
"X-Lark-Request-Timestamp": str(timestamp),
"X-Lark-Request-Nonce": nonce,
}
def _encrypt_event(encrypt_key: str, plaintext: dict[str, Any]) -> bytes:
"""用飞书 AES-256-CBC 协议加密事件,返回包含 ``encrypt`` 字段的 JSON body。"""
key = hashlib.sha256(encrypt_key.encode("utf-8")).digest()
iv = b"\x00" * 16 # 测试用确定性 IV生产环境飞书用随机 IV
plaintext_bytes = json.dumps(plaintext).encode("utf-8")
padder = PKCS7(algorithms.AES.block_size).padder()
padded = padder.update(plaintext_bytes) + padder.finalize()
cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
encryptor = cipher.encryptor()
ciphertext = encryptor.update(padded) + encryptor.finalize()
# 飞书协议ciphertext = IV(16B) + 密文
combined = iv + ciphertext
encrypted_b64 = base64.b64encode(combined).decode("utf-8")
return json.dumps({"encrypt": encrypted_b64}).encode("utf-8")
# ---------------------------------------------------------------------------
# FeishuMessageAdapter 单元测试
# ---------------------------------------------------------------------------
class TestUrlVerification:
"""URL 验证 challenge 流程。"""
async def test_url_verification_raises_challenge(self):
"""事件含 url_verification 字段时抛 URLVerificationChallenge。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
body = json.dumps({"url_verification": "v1", "challenge": "abc123", "token": "xxx"}).encode(
"utf-8"
)
with pytest.raises(URLVerificationChallenge) as exc_info:
await adapter.receive_message({}, body)
assert exc_info.value.challenge == "abc123"
async def test_challenge_only_field_also_raises(self):
"""仅有 challenge 字段时也抛出 URLVerificationChallenge。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
body = json.dumps({"challenge": "xyz"}).encode("utf-8")
with pytest.raises(URLVerificationChallenge) as exc_info:
await adapter.receive_message({}, body)
assert exc_info.value.challenge == "xyz"
class TestSignatureVerification:
"""签名校验。"""
async def test_valid_signature(self):
"""正确签名返回 True。"""
encrypt_key = "test_key"
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", encrypt_key=encrypt_key)
body = json.dumps(_make_event_body()).encode("utf-8")
headers = _sign(encrypt_key=encrypt_key, body=body)
assert await adapter.verify_signature(headers, body) is True
async def test_invalid_signature(self):
"""篡改签名返回 False。"""
encrypt_key = "test_key"
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", encrypt_key=encrypt_key)
body = json.dumps(_make_event_body()).encode("utf-8")
headers = _sign(encrypt_key=encrypt_key, body=body)
headers["X-Lark-Signature"] = "tampered_signature"
assert await adapter.verify_signature(headers, body) is False
async def test_expired_timestamp(self):
"""时间戳超过 5 分钟返回 False。"""
encrypt_key = "test_key"
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", encrypt_key=encrypt_key)
body = json.dumps(_make_event_body()).encode("utf-8")
old_ts = int(datetime.now(timezone.utc).timestamp()) - 600
headers = _sign(encrypt_key=encrypt_key, body=body, timestamp=old_ts)
assert await adapter.verify_signature(headers, body) is False
async def test_missing_encrypt_key(self):
"""适配器未配置 encrypt_key 时返回 Falsefail-closed"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
body = json.dumps(_make_event_body()).encode("utf-8")
headers = _sign(encrypt_key="any", body=body)
assert await adapter.verify_signature(headers, body) is False
async def test_missing_signature_header(self):
"""缺 X-Lark-Signature 头返回 False。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", encrypt_key="k")
body = b'{"x":1}'
assert await adapter.verify_signature({}, body) is False
async def test_lowercase_header_lookup(self):
"""header 名大小写不敏感查找。"""
encrypt_key = "test_key"
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", encrypt_key=encrypt_key)
body = json.dumps(_make_event_body()).encode("utf-8")
headers = _sign(encrypt_key=encrypt_key, body=body)
# 改为小写键
headers = {k.lower(): v for k, v in headers.items()}
assert await adapter.verify_signature(headers, body) is True
class TestEncryptedEventDecryption:
"""AES-256-CBC 加密事件解密。"""
async def test_decrypt_encrypted_event(self):
"""加密事件可正确解密并返回 IncomingMessage。"""
encrypt_key = "test_key"
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", encrypt_key=encrypt_key)
plain_event = _make_event_body(text="secret hello", verification_token=None)
body = _encrypt_event(encrypt_key, plain_event)
msg = await adapter.receive_message({}, body)
assert isinstance(msg, IncomingMessage)
assert msg.channel == ChannelType.FEISHU
assert msg.content == "secret hello"
assert msg.platform_message_id == "evt_001"
assert msg.chat_id == "oc_test_chat"
assert msg.user_id == "ou_test_user"
class TestTextMessageParsing:
"""文本消息解析。"""
async def test_plain_text_message(self):
"""非加密文本事件解析为 IncomingMessage。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token=None)
body = json.dumps(_make_event_body(text="hello world")).encode("utf-8")
msg = await adapter.receive_message({}, body)
assert msg.content == "hello world"
assert msg.channel == ChannelType.FEISHU
async def test_mention_stripping(self):
"""文本中的 @ 提及标记被剥离。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token=None)
body = json.dumps(_make_event_body(text="@_user_1 hello world")).encode("utf-8")
msg = await adapter.receive_message({}, body)
assert msg.content == "hello world"
async def test_mention_multiple_users(self):
"""多个 @ 提及标记全部被剥离。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token=None)
body = json.dumps(_make_event_body(text="@_user_1 @_user_2 hi there")).encode("utf-8")
msg = await adapter.receive_message({}, body)
assert msg.content == "hi there"
async def test_unsupported_message_type(self):
"""非 text 类型返回 unsupported 占位内容。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token=None)
body = json.dumps(_make_event_body(message_type="image")).encode("utf-8")
msg = await adapter.receive_message({}, body)
assert msg.content.startswith("[unsupported message type: image]")
async def test_timestamp_extracted(self):
"""create_time 转换为 timestamp 字符串。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token=None)
body = json.dumps(_make_event_body(create_time="1700000000123")).encode("utf-8")
msg = await adapter.receive_message({}, body)
assert msg.timestamp == "1700000000123"
class TestVerificationTokenCheck:
"""verification_token 校验。"""
async def test_token_mismatch_raises(self):
"""verification_token 不匹配抛 SignatureVerificationError。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token="right")
body = json.dumps(_make_event_body(verification_token="wrong")).encode("utf-8")
with pytest.raises(SignatureVerificationError):
await adapter.receive_message({}, body)
async def test_token_match_passes(self):
"""verification_token 匹配则正常处理。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token="right")
body = json.dumps(_make_event_body(verification_token="right")).encode("utf-8")
msg = await adapter.receive_message({}, body)
assert msg.content == "hello"
async def test_missing_token_raises_when_configured(self):
"""配置了 token 但事件缺失 token 字段抛 SignatureVerificationError。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token="right")
body = json.dumps(_make_event_body(verification_token=None)).encode("utf-8")
with pytest.raises(SignatureVerificationError):
await adapter.receive_message({}, body)
class TestSendMessage:
"""send_message 行为。"""
async def test_send_message_success(self):
"""HTTP 200 + code=0 返回 True。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
# mock token 与 send_message 两次 HTTP 调用
mock_response_token = MagicMock()
mock_response_token.status_code = 200
mock_response_token.json.return_value = {"code": 0, "tenant_access_token": "tok_123"}
mock_response_send = MagicMock()
mock_response_send.status_code = 200
mock_response_send.json.return_value = {"code": 0, "data": {"message_id": "om_x"}}
mock_client = AsyncMock()
mock_client.post = AsyncMock(side_effect=[mock_response_token, mock_response_send])
adapter._client = mock_client
out = OutgoingMessage(channel=ChannelType.FEISHU, chat_id="c1", content="hi")
result = await adapter.send_message(out)
assert result is True
# 验证 send_message 调用 URL 与 payload
second_call = mock_client.post.call_args_list[1]
assert "messages" in second_call.args[0]
assert second_call.kwargs["params"] == {"receive_id_type": "chat_id"}
assert second_call.kwargs["headers"]["Authorization"] == "Bearer tok_123"
async def test_send_message_business_failure(self):
"""HTTP 200 但 code != 0 返回 False。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
mock_response_token = MagicMock()
mock_response_token.status_code = 200
mock_response_token.json.return_value = {"code": 0, "tenant_access_token": "tok_x"}
mock_response_send = MagicMock()
mock_response_send.status_code = 200
mock_response_send.json.return_value = {"code": 9999, "msg": "error"}
mock_client = AsyncMock()
mock_client.post = AsyncMock(side_effect=[mock_response_token, mock_response_send])
adapter._client = mock_client
out = OutgoingMessage(channel=ChannelType.FEISHU, chat_id="c1", content="hi")
assert await adapter.send_message(out) is False
async def test_send_message_token_fetch_failure(self):
"""获取 token 失败返回 False。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
mock_response_token = MagicMock()
mock_response_token.status_code = 401
mock_response_token.json.return_value = {"code": 99991661, "msg": "invalid app"}
mock_response_token.text = "invalid"
mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=mock_response_token)
adapter._client = mock_client
out = OutgoingMessage(channel=ChannelType.FEISHU, chat_id="c1", content="hi")
assert await adapter.send_message(out) is False
async def test_tenant_token_caching(self):
"""同 TTL 内的两次 send_message 只拉取一次 token。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
mock_response_token = MagicMock()
mock_response_token.status_code = 200
mock_response_token.json.return_value = {"code": 0, "tenant_access_token": "cached_tok"}
mock_response_send = MagicMock()
mock_response_send.status_code = 200
mock_response_send.json.return_value = {"code": 0}
mock_client = AsyncMock()
# 第一次token + send第二次仅 sendtoken 走缓存)
mock_client.post = AsyncMock(
side_effect=[mock_response_token, mock_response_send, mock_response_send]
)
adapter._client = mock_client
out = OutgoingMessage(channel=ChannelType.FEISHU, chat_id="c1", content="hi")
await adapter.send_message(out)
await adapter.send_message(out)
# 仅 1 次 token 调用 + 2 次 send 调用 = 3 次(但若未缓存会是 4 次)
assert mock_client.post.call_count == 3
# 验证第一次 URL 是 tenant_token 端点
first_call_url = mock_client.post.call_args_list[0].args[0]
assert "tenant_access_token" in first_call_url
async def test_tenant_token_refresh_after_expiry(self):
"""TTL 过期后重新拉取 token。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
mock_response_token = MagicMock()
mock_response_token.status_code = 200
mock_response_token.json.return_value = {"code": 0, "tenant_access_token": "tok"}
mock_response_send = MagicMock()
mock_response_send.status_code = 200
mock_response_send.json.return_value = {"code": 0}
mock_client = AsyncMock()
mock_client.post = AsyncMock(
side_effect=[
mock_response_token,
mock_response_send,
mock_response_token,
mock_response_send,
]
)
adapter._client = mock_client
# 模拟 token 已过期
adapter._token_cache = ("old_tok", time.monotonic() - 1)
out = OutgoingMessage(channel=ChannelType.FEISHU, chat_id="c1", content="hi")
await adapter.send_message(out)
# 应该重新拉取 token
assert mock_client.post.call_count == 2
first_call_url = mock_client.post.call_args_list[0].args[0]
assert "tenant_access_token" in first_call_url
class TestClose:
"""资源释放。"""
async def test_close_no_client_is_noop(self):
"""未创建 httpx 客户端时 close 不抛异常。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
await adapter.close() # 不应抛异常
async def test_close_resets_client(self):
"""close 后客户端引用清空。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
adapter._get_client() # 触发懒加载
assert adapter._client is not None
await adapter.close()
assert adapter._client is None
# ---------------------------------------------------------------------------
# Webhook 端点测试
# ---------------------------------------------------------------------------
@pytest.fixture
def webhook_app(monkeypatch):
"""挂载 channels 路由的最小 FastAPI 应用 — 用于 webhook 测试。
清理模块级状态_channels、限流、nonce、secrets store保证测试隔离。
"""
monkeypatch.delenv("AGENTKIT_ENV", raising=False)
monkeypatch.delenv("AGENTKIT_MASTER_KEY", raising=False)
channels_routes._secrets_store = SecretsStore(master_key=b"\x01" * KEY_SIZE)
channels_routes._channels.clear()
channels_routes._reset_webhook_state()
application = FastAPI()
application.include_router(channels_routes.router, prefix="/api/v1")
# webhook 端点不需要认证(外部平台调用)
@application.middleware("http")
async def _no_auth(request, call_next):
request.state.current_user = {"user_id": "anon", "role": "admin"}
return await call_next(request)
return application
@pytest.fixture
def webhook_client(webhook_app):
return TestClient(webhook_app)
def _register_feishu_channel(
client: TestClient,
*,
channel_id: str = "feishu-test",
encrypt_key: str = "test_key",
verification_token: str = "right_token",
) -> None:
"""注册一个飞书渠道(带完整凭证)。"""
client.post(
"/api/v1/channels",
json={
"channel_id": channel_id,
"channel_type": "feishu",
"name": "飞书测试",
"secrets": {
"app_id": "cli_test",
"app_secret": "secret_test",
"encrypt_key": encrypt_key,
"verification_token": verification_token,
},
},
)
class TestWebhookRateLimit:
"""Webhook 限流。"""
def test_rate_limit_blocks_after_100_requests(self, webhook_client):
"""同 IP 100 次请求放行,第 101 次 429。"""
_register_feishu_channel(webhook_client)
# 100 次快速请求 — 都会因签名失败 401但限流先于签名校验
for _ in range(100):
resp = webhook_client.post(
"/api/v1/channels/feishu-test/webhook",
content=b"{}",
headers={"Content-Type": "application/json"},
)
assert resp.status_code in (401, 200) # 401 = 签名失败(限流通过)
# 第 101 次 — 限流触发
resp = webhook_client.post(
"/api/v1/channels/feishu-test/webhook",
content=b"{}",
headers={"Content-Type": "application/json"},
)
assert resp.status_code == 429
class TestWebhookSignatureFailure:
"""Webhook 签名校验失败。"""
def test_missing_signature_returns_401(self, webhook_client):
"""无 X-Lark-Signature 头返回 401。"""
_register_feishu_channel(webhook_client)
resp = webhook_client.post(
"/api/v1/channels/feishu-test/webhook",
content=json.dumps(_make_event_body()).encode("utf-8"),
headers={"Content-Type": "application/json"},
)
assert resp.status_code == 401
class TestWebhookUrlVerification:
"""Webhook URL 验证流程。"""
def test_url_verification_returns_challenge(self, webhook_client):
"""url_verification 事件返回 {"challenge": ...}。"""
_register_feishu_channel(webhook_client)
body = json.dumps(
{"url_verification": "v1", "challenge": "verify_abc", "token": "right_token"}
).encode("utf-8")
headers = _sign(encrypt_key="test_key", body=body)
# 添加 Content-Type 以匹配 webhook 端点
headers["Content-Type"] = "application/json"
resp = webhook_client.post(
"/api/v1/channels/feishu-test/webhook",
content=body,
headers=headers,
)
assert resp.status_code == 200
data = resp.json()
assert data.get("challenge") == "verify_abc"
class TestWebhookNonceDedup:
"""Webhook nonce 去重。"""
def test_duplicate_nonce_returns_duplicate_marker(self, webhook_client):
"""相同 nonce 第二次返回 duplicate 标记。"""
_register_feishu_channel(webhook_client)
body = json.dumps(_make_event_body(text="hi")).encode("utf-8")
headers = _sign(encrypt_key="test_key", body=body, nonce="dup_nonce")
headers["Content-Type"] = "application/json"
# 第一次 — 正常处理
resp1 = webhook_client.post(
"/api/v1/channels/feishu-test/webhook",
content=body,
headers=headers,
)
assert resp1.status_code == 200
# 第二次 — nonce 重复
resp2 = webhook_client.post(
"/api/v1/channels/feishu-test/webhook",
content=body,
headers=headers,
)
assert resp2.status_code == 200
assert resp2.json().get("msg") == "duplicate"
class TestWebhookErrors:
"""Webhook 错误响应。"""
def test_unknown_channel_returns_404(self, webhook_client):
"""POST 到不存在的渠道 webhook 返回 404。"""
resp = webhook_client.post(
"/api/v1/channels/nonexistent/webhook",
content=b"{}",
headers={"Content-Type": "application/json"},
)
assert resp.status_code == 404
def test_dingtalk_channel_without_secrets_returns_500(self, webhook_client):
"""钉钉渠道未配置凭证时 webhook 返回 500U12 后钉钉受支持)。"""
# 注册一个钉钉渠道(不配置凭证)
webhook_client.post(
"/api/v1/channels",
json={
"channel_id": "dingtalk-1",
"channel_type": "dingtalk",
"name": "钉钉",
},
)
resp = webhook_client.post(
"/api/v1/channels/dingtalk-1/webhook",
content=b"{}",
headers={"Content-Type": "application/json"},
)
assert resp.status_code == 500
class TestWebhookImmediateResponse:
"""Webhook 立即响应 + 后台任务调度。"""
def test_returns_200_immediately_with_background_task(self, webhook_client):
"""webhook 立即返回 200后台任务被调度。"""
_register_feishu_channel(webhook_client)
body = json.dumps(_make_event_body(text="hello")).encode("utf-8")
headers = _sign(encrypt_key="test_key", body=body, nonce="task_nonce_001")
headers["Content-Type"] = "application/json"
# mock _process_inbound_message 防止实际执行 chat 链路
with patch.object(
channels_routes, "_process_inbound_message", new_callable=AsyncMock
) as mock_proc:
resp = webhook_client.post(
"/api/v1/channels/feishu-test/webhook",
content=body,
headers=headers,
)
assert resp.status_code == 200
assert resp.json() == {"code": 0}
# 后台任务被调度TestClient 同步运行事件循环)
assert mock_proc.call_count == 1
# 验证传入的 message 内容
call_args = mock_proc.call_args
# call_args.args: (app_state, adapter, message)
passed_message = call_args.args[2]
assert passed_message.content == "hello"
assert passed_message.chat_id == "oc_test_chat"