feat(channels): U11 — Feishu IM adapter end-to-end (webhook + signature + AES-CBC decrypt + chat integration)

This commit is contained in:
chiguyong 2026-06-25 20:24:21 +08:00
parent 5572387c01
commit 4b58e8f661
3 changed files with 1208 additions and 11 deletions

View File

@ -0,0 +1,364 @@
"""飞书 IM 适配器 (U11)。
实现 :class:`MessageAdapter` 协议对接飞书开放平台事件订阅 webhook
关键设计决策
- 事件加密使用 AES-256-CBC飞书官方协议 secrets store AES-256-GCM 不同
- 签名校验 fail-closed``encrypt_key`` 缺失或签名头缺失一律返回 False
- ``tenant_access_token`` 简单 TTL 缓存5 分钟过期后重新拉取
- httpx 客户端懒构造避免未使用的适配器持有连接池
"""
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import logging
import re
import time
from datetime import datetime, timezone
from typing import Any
import httpx
from agentkit.channels.base import (
ChannelType,
IncomingMessage,
MessageAdapter,
OutgoingMessage,
)
logger = logging.getLogger(__name__)
# 签名时间戳允许的最大偏移(秒)— 与飞书官方文档保持一致
_SIGNATURE_MAX_AGE_SECONDS = 300
# tenant_access_token 缓存 TTL— 飞书 token 实际有效期为 2h留 5min 余量
_TOKEN_CACHE_TTL = 300.0
# 飞书开放平台 API 端点
_TENANT_TOKEN_URL = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
_SEND_MESSAGE_URL = "https://open.feishu.cn/open-apis/im/v1/messages"
# @用户 提及标记正则 — 飞书文本消息中提及用户格式为 "@_user_N"
_MENTION_RE = re.compile(r"@_user_\d+\s*")
class URLVerificationChallenge(Exception):
"""飞书 URL 验证事件 — webhook 端点需返回 ``{"challenge": ...}`` 响应。
飞书在配置 webhook 时会发送一次 ``url_verification`` 事件要求服务端
原样返回 ``challenge`` 字段以验证 URL 可达
"""
def __init__(self, challenge: str) -> None:
super().__init__(f"URL verification challenge: {challenge}")
self.challenge = challenge
class SignatureVerificationError(Exception):
"""事件 ``verification_token`` 校验失败 — 拒绝处理。"""
class FeishuMessageAdapter(MessageAdapter):
"""飞书 IM 适配器。
生命周期
``__init__`` :meth:`verify_signature` :meth:`receive_message`
:meth:`send_message` :meth:`close`
Args:
app_id: 飞书应用 App ID
app_secret: 飞书应用 App Secret
encrypt_key: 事件订阅加密密钥可选 启用加密订阅时必填
verification_token: 事件订阅 Verification Token可选 用于校验事件来源
"""
def __init__(
self,
app_id: str,
app_secret: str,
encrypt_key: str | None = None,
verification_token: str | None = None,
) -> None:
self.app_id = app_id
self.app_secret = app_secret
self.encrypt_key = encrypt_key
self.verification_token = verification_token
# 懒加载 httpx 客户端 — 避免未使用的适配器持有连接池
self._client: httpx.AsyncClient | None = None
# ponytail: 简单 TTL 缓存 (token, expiry)。天花板:单实例内存;
# 升级路径Redis 缓存共享给多实例。
self._token_cache: tuple[str, float] | None = None
# ------------------------------------------------------------------
# httpx 客户端懒加载
# ------------------------------------------------------------------
def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(timeout=10.0)
return self._client
# ------------------------------------------------------------------
# 签名验证
# ------------------------------------------------------------------
async def verify_signature(self, headers: dict[str, str], body: bytes) -> bool:
"""验证飞书 webhook 签名。
fail-closed``encrypt_key`` 缺失或签名头缺失一律返回 False
时间戳超过 5 分钟视为重放攻击拒绝
Args:
headers: HTTP 请求头键大小写不敏感查找
body: 原始请求体字节
Returns:
True 表示签名校验通过
"""
if not self.encrypt_key:
logger.warning("飞书适配器未配置 encrypt_key — 拒绝所有 webhook 请求")
return False
signature = _header_get(headers, "X-Lark-Signature")
if not signature:
return False
timestamp_str = _header_get(headers, "X-Lark-Request-Timestamp")
nonce = _header_get(headers, "X-Lark-Request-Nonce")
if not timestamp_str or not nonce:
return False
# 时间戳重放保护
try:
ts = int(timestamp_str)
except ValueError:
return False
now = datetime.now(timezone.utc).timestamp()
if abs(now - ts) > _SIGNATURE_MAX_AGE_SECONDS:
logger.warning("飞书 webhook 时间戳超出 %ds 窗口 — 拒绝", _SIGNATURE_MAX_AGE_SECONDS)
return False
# 计算签名sha256(timestamp + nonce + encrypt_key + body)
body_str = body.decode("utf-8")
expected = hashlib.sha256(
f"{timestamp_str}{nonce}{self.encrypt_key}{body_str}".encode("utf-8")
).hexdigest()
return hmac.compare_digest(signature, expected)
# ------------------------------------------------------------------
# 消息接收 / 解析
# ------------------------------------------------------------------
async def receive_message(self, headers: dict[str, str], body: bytes) -> IncomingMessage:
"""解析飞书 webhook 事件为标准化 :class:`IncomingMessage`。
Raises:
URLVerificationChallenge: 事件为 URL 验证请求
SignatureVerificationError: ``verification_token`` 不匹配
ValueError: 事件结构无法解析
"""
try:
data: dict[str, Any] = json.loads(body)
except json.JSONDecodeError as exc:
raise ValueError(f"飞书事件 body 不是合法 JSON: {exc}") from exc
# URL 验证流程 — 飞书配置 webhook 时发送
if "url_verification" in data or "challenge" in data:
raise URLVerificationChallenge(data.get("challenge", ""))
# 加密事件 — AES-256-CBC 解密
if "encrypt" in data:
data = self._decrypt_event(data["encrypt"])
# 校验 verification_token
if self.verification_token is not None:
token = data.get("verification_token") or data.get("header", {}).get("token")
if not token:
raise SignatureVerificationError("事件缺少 verification_token 字段")
if not hmac.compare_digest(token, self.verification_token):
raise SignatureVerificationError("verification_token 不匹配")
event_id = data.get("event_id") or data.get("header", {}).get("event_id", "")
event = data.get("event", {})
message = event.get("message", {})
sender = event.get("sender", {}).get("sender_id", {})
chat_id = message.get("chat_id", "")
open_id = sender.get("open_id", "")
create_time = message.get("create_time", "")
timestamp = str(create_time) if create_time else ""
content = self._extract_content(message)
return IncomingMessage(
channel=ChannelType.FEISHU,
platform_message_id=event_id,
user_id=open_id,
chat_id=chat_id,
content=content,
raw_event=data,
timestamp=timestamp,
)
def _decrypt_event(self, encrypt_b64: str) -> dict[str, Any]:
"""AES-256-CBC 解密飞书加密事件。
飞书协议
key = sha256(encrypt_key).digest()[:32]
ciphertext = IV(16B) + 密文
plaintext = AES-256-CBC 解密后去除 PKCS7 padding
"""
if not self.encrypt_key:
raise SignatureVerificationError("加密事件但未配置 encrypt_key")
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.padding import PKCS7
key = hashlib.sha256(self.encrypt_key.encode("utf-8")).digest()
ciphertext = base64.b64decode(encrypt_b64)
if len(ciphertext) < 17: # IV(16) + 至少 1 字节密文
raise ValueError("加密事件密文长度不足")
iv = ciphertext[:16]
encrypted = ciphertext[16:]
cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
decryptor = cipher.decryptor()
padded = decryptor.update(encrypted) + decryptor.finalize()
unpadder = PKCS7(algorithms.AES.block_size).unpadder()
plaintext = unpadder.update(padded) + unpadder.finalize()
return json.loads(plaintext.decode("utf-8"))
def _extract_content(self, message: dict[str, Any]) -> str:
"""从飞书 message 字段提取文本内容。
- text 类型解析 ``content`` JSON 中的 ``text`` 字段剥离 @ 提及标记
- 其他类型返回 ``[unsupported message type: {type}]``
"""
message_type = message.get("message_type", "")
content_raw = message.get("content", "{}")
if message_type == "text":
try:
content_obj = (
json.loads(content_raw) if isinstance(content_raw, str) else content_raw
)
text = content_obj.get("text", "") if isinstance(content_obj, dict) else ""
except (json.JSONDecodeError, TypeError):
return ""
# 剥离 @ 提及标记(如 "@_user_1 hello" → "hello"
return _MENTION_RE.sub("", text).strip()
return f"[unsupported message type: {message_type}]"
# ------------------------------------------------------------------
# 消息发送
# ------------------------------------------------------------------
async def send_message(self, message: OutgoingMessage) -> bool:
"""向飞书发送文本消息。
Returns:
True 表示 HTTP 200 且响应 ``code == 0``
"""
try:
token = await self._get_tenant_access_token()
if not token:
return False
client = self._get_client()
payload = {
"receive_id": message.chat_id,
"msg_type": "text",
"content": json.dumps({"text": message.content}),
}
resp = await client.post(
_SEND_MESSAGE_URL,
params={"receive_id_type": "chat_id"},
json=payload,
headers={"Authorization": f"Bearer {token}"},
)
if resp.status_code != 200:
logger.error("飞书 send_message HTTP %d: %s", resp.status_code, resp.text[:200])
return False
data = resp.json()
if data.get("code") != 0:
logger.error(
"飞书 send_message 业务失败 code=%s msg=%s",
data.get("code"),
data.get("msg", "")[:200],
)
return False
return True
except httpx.HTTPError as exc:
logger.error("飞书 send_message 网络错误: %s", exc)
return False
async def _get_tenant_access_token(self) -> str | None:
"""获取并缓存 ``tenant_access_token``。"""
# 命中缓存
if self._token_cache is not None:
token, expiry = self._token_cache
if time.monotonic() < expiry:
return token
try:
client = self._get_client()
resp = await client.post(
_TENANT_TOKEN_URL,
json={"app_id": self.app_id, "app_secret": self.app_secret},
)
if resp.status_code != 200:
logger.error("飞书 tenant_token HTTP %d: %s", resp.status_code, resp.text[:200])
return None
data = resp.json()
if data.get("code") != 0:
logger.error(
"飞书 tenant_token 业务失败 code=%s msg=%s",
data.get("code"),
data.get("msg", "")[:200],
)
return None
token = data.get("tenant_access_token", "")
if not token:
return None
self._token_cache = (token, time.monotonic() + _TOKEN_CACHE_TTL)
return token
except httpx.HTTPError as exc:
logger.error("飞书 tenant_token 网络错误: %s", exc)
return None
# ------------------------------------------------------------------
# 资源释放
# ------------------------------------------------------------------
async def close(self) -> None:
"""关闭 httpx 客户端(如已创建)。"""
if self._client is not None:
await self._client.aclose()
self._client = None
# ---------------------------------------------------------------------------
# 辅助函数
# ---------------------------------------------------------------------------
def _header_get(headers: dict[str, str], name: str) -> str | None:
"""大小写不敏感的 header 查找。"""
# 直接命中
if name in headers:
return headers[name]
lower = name.lower()
for k, v in headers.items():
if k.lower() == lower:
return v
return None

View File

@ -1,4 +1,4 @@
"""渠道管理端点 — 消息渠道配置的 CRUD """渠道管理端点 — 消息渠道配置的 CRUD + Webhook 入站处理
端点 端点
- ``GET /channels`` 列出已配置渠道 - ``GET /channels`` 列出已配置渠道
@ -6,25 +6,28 @@
- ``GET /channels/{id}`` 获取渠道配置不返回凭证明文 - ``GET /channels/{id}`` 获取渠道配置不返回凭证明文
- ``PUT /channels/{id}`` 更新渠道配置 - ``PUT /channels/{id}`` 更新渠道配置
- ``DELETE /channels/{id}`` 删除渠道 - ``DELETE /channels/{id}`` 删除渠道
- ``POST /channels/{id}/webhook`` 平台 webhook 入站端点
凭证通过 :class:`SecretsStore` 加密存储响应中绝不返回凭证明文 凭证通过 :class:`SecretsStore` 加密存储响应中绝不返回凭证明文
仅返回 secret 字段名列表以便前端管理 仅返回 secret 字段名列表以便前端管理
Webhook 入站端点接收平台消息 fail-closed 行为Redis 不可用或 Webhook 入站端点的 fail-closed 行为签名校验失败返回 401限流超限
签名校验失败时返回 503不可跳过 nonce dedup 直接处理消息具体 返回 429nonce dedup 用于防重放重复事件返回 200飞书要求 3s 内响应
webhook 端点由各平台适配器子模块实现本模块仅提供配置管理
""" """
from __future__ import annotations from __future__ import annotations
import asyncio
import logging import logging
import re import re
import time
from typing import Any from typing import Any
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException, Request
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from agentkit.channels.base import ChannelType from agentkit.channels.base import ChannelType, MessageAdapter, OutgoingMessage
from agentkit.channels.feishu import FeishuMessageAdapter, URLVerificationChallenge
from agentkit.channels.secrets import SecretsStore from agentkit.channels.secrets import SecretsStore
from agentkit.server.auth.dependencies import require_permission from agentkit.server.auth.dependencies import require_permission
from agentkit.server.auth.permissions import Permission from agentkit.server.auth.permissions import Permission
@ -64,6 +67,57 @@ def _reset_secrets_store() -> None:
_channels: dict[str, dict[str, Any]] = {} _channels: dict[str, dict[str, Any]] = {}
# ---------------------------------------------------------------------------
# Webhook 限流 + Nonce dedup模块级单例
#
# ponytail: 单进程内存实现。天花板:多 worker / 多实例部署下状态不共享,
# 限流与 nonce dedup 仅在单进程内有效。升级路径:
# - 限流Redis 滑动窗口INCR + EXPIRE 或 ZSET
# - Nonce dedupRedis SETNX with TTL
# ---------------------------------------------------------------------------
# IP -> 请求时间戳列表(滑动窗口)
_rate_limits: dict[str, list[float]] = {}
_RATE_LIMIT_WINDOW = 60.0 # 窗口大小(秒)
_RATE_LIMIT_MAX = 100 # 窗口内最大请求数
# nonce -> 过期时间戳(与飞书签名时间戳窗口一致)
_seen_nonces: dict[str, float] = {}
_NONCE_TTL = 300.0
def _check_rate_limit(client_ip: str) -> bool:
"""滑动窗口限流。返回 True 表示放行False 表示超限。"""
now = time.monotonic()
timestamps = _rate_limits.get(client_ip, [])
cutoff = now - _RATE_LIMIT_WINDOW
timestamps = [t for t in timestamps if t > cutoff]
if len(timestamps) >= _RATE_LIMIT_MAX:
_rate_limits[client_ip] = timestamps
return False
timestamps.append(now)
_rate_limits[client_ip] = timestamps
return True
def _check_nonce_dedup(nonce: str) -> bool:
"""Nonce 去重。返回 True 表示新 nonce应处理False 表示重复(跳过)。"""
now = time.monotonic()
# 惰性清理过期项
expired = [k for k, v in _seen_nonces.items() if v < now]
for k in expired:
del _seen_nonces[k]
if nonce in _seen_nonces:
return False
_seen_nonces[nonce] = now + _NONCE_TTL
return True
def _reset_webhook_state() -> None:
"""重置限流与 nonce 状态(仅供测试使用)。"""
_rate_limits.clear()
_seen_nonces.clear()
def _validate_channel_id(channel_id: str) -> str: def _validate_channel_id(channel_id: str) -> str:
"""校验渠道 ID非法时抛 400。""" """校验渠道 ID非法时抛 400。"""
@ -251,3 +305,157 @@ async def delete_channel(
await store.delete_secret(f"{channel_id}:{name}") await store.delete_secret(f"{channel_id}:{name}")
return {"deleted": channel_id} return {"deleted": channel_id}
# ---------------------------------------------------------------------------
# Webhook 入站端点U11 — 飞书 IM 适配器端到端)
# ---------------------------------------------------------------------------
async def _build_adapter(channel_id: str) -> MessageAdapter:
"""根据渠道配置与 secrets 构造适配器实例。
Raises:
HTTPException: 渠道不存在404渠道类型非飞书400缺少必要凭证500
"""
cfg = _channels.get(channel_id)
if cfg is None:
raise HTTPException(status_code=404, detail=f"渠道 '{channel_id}' 不存在")
if cfg["channel_type"] != ChannelType.FEISHU.value:
raise HTTPException(status_code=400, detail=f"渠道 '{channel_id}' 不是飞书渠道")
store = _get_secrets_store()
app_id = await store.get_secret(f"{channel_id}:app_id")
app_secret = await store.get_secret(f"{channel_id}:app_secret")
encrypt_key = await store.get_secret(f"{channel_id}:encrypt_key")
verification_token = await store.get_secret(f"{channel_id}:verification_token")
if not app_id or not app_secret:
raise HTTPException(
status_code=500, detail=f"渠道 '{channel_id}' 缺少 app_id 或 app_secret"
)
return FeishuMessageAdapter(
app_id=app_id,
app_secret=app_secret,
encrypt_key=encrypt_key,
verification_token=verification_token,
)
async def _process_inbound_message(
app_state: Any, adapter: FeishuMessageAdapter, message: Any
) -> None:
"""后台处理入站消息 — 调用 chat 链路并通过适配器回复。
整个流程 try/except 包裹任何异常仅记录日志不向上抛出
webhook 必须保持响应能力``adapter.close()`` finally 中调用
"""
try:
request_preprocessor = getattr(app_state, "request_preprocessor", None)
llm_gateway = getattr(app_state, "llm_gateway", None)
if request_preprocessor is None or llm_gateway is None:
logger.warning("app.state 缺少 request_preprocessor 或 llm_gateway — 跳过消息处理")
return
# 路由预处理 — IM 场景使用默认 agent无需技能注册表
routing = await request_preprocessor.preprocess(
content=message.content, default_agent_name="default"
)
final_content = ""
execution_mode = getattr(routing, "execution_mode", None)
# DIRECT_CHAT 模式 — 直接调用 LLM
if execution_mode is not None and execution_mode.value == "direct_chat":
response = await llm_gateway.chat(
messages=[{"role": "user", "content": message.content}],
model=routing.model or "default",
)
final_content = response.content
else:
# REACT 或其他模式 — 优先使用 ReActEngine失败回退到 DIRECT_CHAT
try:
from agentkit.core.react import ReActEngine
engine = ReActEngine(llm_gateway=llm_gateway)
result = await engine.execute(
messages=[{"role": "user", "content": message.content}],
tools=getattr(routing, "tools", None) or None,
model=routing.model or "default",
)
final_content = getattr(result, "content", "") or ""
except Exception as exc: # noqa: BLE001 — 回退路径需捕获全部异常
logger.warning("ReActEngine 执行失败,回退到 DIRECT_CHAT: %s", exc)
response = await llm_gateway.chat(
messages=[{"role": "user", "content": message.content}],
model=routing.model or "default",
)
final_content = response.content
if not final_content:
logger.warning("消息处理未产生内容 — 不发送回复")
return
outgoing = OutgoingMessage(
channel=ChannelType.FEISHU,
chat_id=message.chat_id,
content=final_content,
)
await adapter.send_message(outgoing)
except Exception as exc: # noqa: BLE001 — webhook 必须保持响应能力
logger.exception("处理飞书入站消息失败: %s", exc)
finally:
try:
await adapter.close()
except Exception: # noqa: BLE001
logger.debug("adapter.close() 异常已忽略")
def _get_client_ip(request: Request) -> str:
"""获取客户端 IP — 优先取 X-Forwarded-For 首个 IP否则取 request.client.host。"""
forwarded = request.headers.get("x-forwarded-for")
if forwarded:
# 取首个 IP链路中第一个为原始客户端
return forwarded.split(",")[0].strip()
if request.client:
return request.client.host
return "unknown"
@router.post("/channels/{channel_id}/webhook")
async def channel_webhook(channel_id: str, request: Request) -> dict[str, Any]:
"""飞书 webhook 端点 — 接收平台事件。
安全流程按顺序
1. Per-IP 限流100 req/min 超限 429
2. 读取原始 body未解析
3. 签名验证 失败 401
4. Nonce dedup 重复返回 200飞书要求 3s 内响应
5. URL verification challenge 返回 ``{"challenge": ...}``
6. 解析消息 后台异步处理 立即返回 200
"""
client_ip = _get_client_ip(request)
if not _check_rate_limit(client_ip):
raise HTTPException(status_code=429, detail="请求过于频繁,请稍后重试")
body = await request.body()
adapter = await _build_adapter(channel_id)
headers_dict = dict(request.headers)
if not await adapter.verify_signature(headers_dict, body):
raise HTTPException(status_code=401, detail="签名校验失败")
# Nonce dedup可选 — 若头不存在则跳过去重)
nonce = request.headers.get("x-lark-request-nonce")
if nonce and not _check_nonce_dedup(nonce):
return {"code": 0, "msg": "duplicate"}
try:
message = await adapter.receive_message(headers_dict, body)
except URLVerificationChallenge as e:
# URL 验证流程 — 飞书配置 webhook 时发送
return {"challenge": e.challenge}
# 异步处理 — 不阻塞 webhook 响应(飞书要求 3s 内返回 200
asyncio.create_task(_process_inbound_message(request.app.state, adapter, message))
return {"code": 0}

View File

@ -0,0 +1,625 @@
"""飞书 IM 适配器端到端测试 (U11)。
覆盖场景
- URL 验证 challenge 事件
- 签名校验有效/无效/过期/缺失 encrypt_key
- 加密事件 AES-256-CBC 解密
- 文本消息解析 @ 提及剥离
- 不支持消息类型
- verification_token 校验
- send_message 成功/失败
- tenant_access_token 缓存
- Webhook 端点限流签名失败URL 验证nonce dedup404/400立即 200
"""
from __future__ import annotations
import base64
import hashlib
import json
import time
from datetime import datetime, timezone
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.padding import PKCS7
from fastapi import FastAPI
from fastapi.testclient import TestClient
from agentkit.channels.base import ChannelType, IncomingMessage, OutgoingMessage
from agentkit.channels.feishu import (
FeishuMessageAdapter,
SignatureVerificationError,
URLVerificationChallenge,
)
from agentkit.channels.secrets import KEY_SIZE, SecretsStore
from agentkit.server.routes import channels as channels_routes
# ---------------------------------------------------------------------------
# 辅助函数
# ---------------------------------------------------------------------------
def _make_event_body(
*,
text: str = "hello",
message_type: str = "text",
event_id: str = "evt_001",
chat_id: str = "oc_test_chat",
open_id: str = "ou_test_user",
create_time: str = "1700000000000",
verification_token: str | None = "right_token",
) -> dict[str, Any]:
"""构造飞书事件 JSON 体(非加密)。"""
content: Any
if message_type == "text":
content = json.dumps({"text": text})
else:
content = json.dumps({"image_key": "img_xxx"})
event: dict[str, Any] = {
"event_id": event_id,
"event": {
"message": {
"message_id": "om_test_msg",
"chat_id": chat_id,
"message_type": message_type,
"content": content,
"create_time": create_time,
},
"sender": {"sender_id": {"open_id": open_id}},
},
}
if verification_token is not None:
event["verification_token"] = verification_token
return event
def _sign(
*, encrypt_key: str, body: bytes, timestamp: int | None = None, nonce: str = "n1"
) -> dict[str, str]:
"""构造包含正确飞书签名的 headers。"""
if timestamp is None:
timestamp = int(datetime.now(timezone.utc).timestamp())
body_str = body.decode("utf-8")
sig = hashlib.sha256(f"{timestamp}{nonce}{encrypt_key}{body_str}".encode("utf-8")).hexdigest()
return {
"X-Lark-Signature": sig,
"X-Lark-Request-Timestamp": str(timestamp),
"X-Lark-Request-Nonce": nonce,
}
def _encrypt_event(encrypt_key: str, plaintext: dict[str, Any]) -> bytes:
"""用飞书 AES-256-CBC 协议加密事件,返回包含 ``encrypt`` 字段的 JSON body。"""
key = hashlib.sha256(encrypt_key.encode("utf-8")).digest()
iv = b"\x00" * 16 # 测试用确定性 IV生产环境飞书用随机 IV
plaintext_bytes = json.dumps(plaintext).encode("utf-8")
padder = PKCS7(algorithms.AES.block_size).padder()
padded = padder.update(plaintext_bytes) + padder.finalize()
cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
encryptor = cipher.encryptor()
ciphertext = encryptor.update(padded) + encryptor.finalize()
# 飞书协议ciphertext = IV(16B) + 密文
combined = iv + ciphertext
encrypted_b64 = base64.b64encode(combined).decode("utf-8")
return json.dumps({"encrypt": encrypted_b64}).encode("utf-8")
# ---------------------------------------------------------------------------
# FeishuMessageAdapter 单元测试
# ---------------------------------------------------------------------------
class TestUrlVerification:
"""URL 验证 challenge 流程。"""
async def test_url_verification_raises_challenge(self):
"""事件含 url_verification 字段时抛 URLVerificationChallenge。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
body = json.dumps({"url_verification": "v1", "challenge": "abc123", "token": "xxx"}).encode(
"utf-8"
)
with pytest.raises(URLVerificationChallenge) as exc_info:
await adapter.receive_message({}, body)
assert exc_info.value.challenge == "abc123"
async def test_challenge_only_field_also_raises(self):
"""仅有 challenge 字段时也抛出 URLVerificationChallenge。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
body = json.dumps({"challenge": "xyz"}).encode("utf-8")
with pytest.raises(URLVerificationChallenge) as exc_info:
await adapter.receive_message({}, body)
assert exc_info.value.challenge == "xyz"
class TestSignatureVerification:
"""签名校验。"""
async def test_valid_signature(self):
"""正确签名返回 True。"""
encrypt_key = "test_key"
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", encrypt_key=encrypt_key)
body = json.dumps(_make_event_body()).encode("utf-8")
headers = _sign(encrypt_key=encrypt_key, body=body)
assert await adapter.verify_signature(headers, body) is True
async def test_invalid_signature(self):
"""篡改签名返回 False。"""
encrypt_key = "test_key"
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", encrypt_key=encrypt_key)
body = json.dumps(_make_event_body()).encode("utf-8")
headers = _sign(encrypt_key=encrypt_key, body=body)
headers["X-Lark-Signature"] = "tampered_signature"
assert await adapter.verify_signature(headers, body) is False
async def test_expired_timestamp(self):
"""时间戳超过 5 分钟返回 False。"""
encrypt_key = "test_key"
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", encrypt_key=encrypt_key)
body = json.dumps(_make_event_body()).encode("utf-8")
old_ts = int(datetime.now(timezone.utc).timestamp()) - 600
headers = _sign(encrypt_key=encrypt_key, body=body, timestamp=old_ts)
assert await adapter.verify_signature(headers, body) is False
async def test_missing_encrypt_key(self):
"""适配器未配置 encrypt_key 时返回 Falsefail-closed"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
body = json.dumps(_make_event_body()).encode("utf-8")
headers = _sign(encrypt_key="any", body=body)
assert await adapter.verify_signature(headers, body) is False
async def test_missing_signature_header(self):
"""缺 X-Lark-Signature 头返回 False。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", encrypt_key="k")
body = b'{"x":1}'
assert await adapter.verify_signature({}, body) is False
async def test_lowercase_header_lookup(self):
"""header 名大小写不敏感查找。"""
encrypt_key = "test_key"
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", encrypt_key=encrypt_key)
body = json.dumps(_make_event_body()).encode("utf-8")
headers = _sign(encrypt_key=encrypt_key, body=body)
# 改为小写键
headers = {k.lower(): v for k, v in headers.items()}
assert await adapter.verify_signature(headers, body) is True
class TestEncryptedEventDecryption:
"""AES-256-CBC 加密事件解密。"""
async def test_decrypt_encrypted_event(self):
"""加密事件可正确解密并返回 IncomingMessage。"""
encrypt_key = "test_key"
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", encrypt_key=encrypt_key)
plain_event = _make_event_body(text="secret hello", verification_token=None)
body = _encrypt_event(encrypt_key, plain_event)
msg = await adapter.receive_message({}, body)
assert isinstance(msg, IncomingMessage)
assert msg.channel == ChannelType.FEISHU
assert msg.content == "secret hello"
assert msg.platform_message_id == "evt_001"
assert msg.chat_id == "oc_test_chat"
assert msg.user_id == "ou_test_user"
class TestTextMessageParsing:
"""文本消息解析。"""
async def test_plain_text_message(self):
"""非加密文本事件解析为 IncomingMessage。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token=None)
body = json.dumps(_make_event_body(text="hello world")).encode("utf-8")
msg = await adapter.receive_message({}, body)
assert msg.content == "hello world"
assert msg.channel == ChannelType.FEISHU
async def test_mention_stripping(self):
"""文本中的 @ 提及标记被剥离。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token=None)
body = json.dumps(_make_event_body(text="@_user_1 hello world")).encode("utf-8")
msg = await adapter.receive_message({}, body)
assert msg.content == "hello world"
async def test_mention_multiple_users(self):
"""多个 @ 提及标记全部被剥离。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token=None)
body = json.dumps(_make_event_body(text="@_user_1 @_user_2 hi there")).encode("utf-8")
msg = await adapter.receive_message({}, body)
assert msg.content == "hi there"
async def test_unsupported_message_type(self):
"""非 text 类型返回 unsupported 占位内容。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token=None)
body = json.dumps(_make_event_body(message_type="image")).encode("utf-8")
msg = await adapter.receive_message({}, body)
assert msg.content.startswith("[unsupported message type: image]")
async def test_timestamp_extracted(self):
"""create_time 转换为 timestamp 字符串。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token=None)
body = json.dumps(_make_event_body(create_time="1700000000123")).encode("utf-8")
msg = await adapter.receive_message({}, body)
assert msg.timestamp == "1700000000123"
class TestVerificationTokenCheck:
"""verification_token 校验。"""
async def test_token_mismatch_raises(self):
"""verification_token 不匹配抛 SignatureVerificationError。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token="right")
body = json.dumps(_make_event_body(verification_token="wrong")).encode("utf-8")
with pytest.raises(SignatureVerificationError):
await adapter.receive_message({}, body)
async def test_token_match_passes(self):
"""verification_token 匹配则正常处理。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token="right")
body = json.dumps(_make_event_body(verification_token="right")).encode("utf-8")
msg = await adapter.receive_message({}, body)
assert msg.content == "hello"
async def test_missing_token_raises_when_configured(self):
"""配置了 token 但事件缺失 token 字段抛 SignatureVerificationError。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token="right")
body = json.dumps(_make_event_body(verification_token=None)).encode("utf-8")
with pytest.raises(SignatureVerificationError):
await adapter.receive_message({}, body)
class TestSendMessage:
"""send_message 行为。"""
async def test_send_message_success(self):
"""HTTP 200 + code=0 返回 True。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
# mock token 与 send_message 两次 HTTP 调用
mock_response_token = MagicMock()
mock_response_token.status_code = 200
mock_response_token.json.return_value = {"code": 0, "tenant_access_token": "tok_123"}
mock_response_send = MagicMock()
mock_response_send.status_code = 200
mock_response_send.json.return_value = {"code": 0, "data": {"message_id": "om_x"}}
mock_client = AsyncMock()
mock_client.post = AsyncMock(side_effect=[mock_response_token, mock_response_send])
adapter._client = mock_client
out = OutgoingMessage(channel=ChannelType.FEISHU, chat_id="c1", content="hi")
result = await adapter.send_message(out)
assert result is True
# 验证 send_message 调用 URL 与 payload
second_call = mock_client.post.call_args_list[1]
assert "messages" in second_call.args[0]
assert second_call.kwargs["params"] == {"receive_id_type": "chat_id"}
assert second_call.kwargs["headers"]["Authorization"] == "Bearer tok_123"
async def test_send_message_business_failure(self):
"""HTTP 200 但 code != 0 返回 False。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
mock_response_token = MagicMock()
mock_response_token.status_code = 200
mock_response_token.json.return_value = {"code": 0, "tenant_access_token": "tok_x"}
mock_response_send = MagicMock()
mock_response_send.status_code = 200
mock_response_send.json.return_value = {"code": 9999, "msg": "error"}
mock_client = AsyncMock()
mock_client.post = AsyncMock(side_effect=[mock_response_token, mock_response_send])
adapter._client = mock_client
out = OutgoingMessage(channel=ChannelType.FEISHU, chat_id="c1", content="hi")
assert await adapter.send_message(out) is False
async def test_send_message_token_fetch_failure(self):
"""获取 token 失败返回 False。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
mock_response_token = MagicMock()
mock_response_token.status_code = 401
mock_response_token.json.return_value = {"code": 99991661, "msg": "invalid app"}
mock_response_token.text = "invalid"
mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=mock_response_token)
adapter._client = mock_client
out = OutgoingMessage(channel=ChannelType.FEISHU, chat_id="c1", content="hi")
assert await adapter.send_message(out) is False
async def test_tenant_token_caching(self):
"""同 TTL 内的两次 send_message 只拉取一次 token。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
mock_response_token = MagicMock()
mock_response_token.status_code = 200
mock_response_token.json.return_value = {"code": 0, "tenant_access_token": "cached_tok"}
mock_response_send = MagicMock()
mock_response_send.status_code = 200
mock_response_send.json.return_value = {"code": 0}
mock_client = AsyncMock()
# 第一次token + send第二次仅 sendtoken 走缓存)
mock_client.post = AsyncMock(
side_effect=[mock_response_token, mock_response_send, mock_response_send]
)
adapter._client = mock_client
out = OutgoingMessage(channel=ChannelType.FEISHU, chat_id="c1", content="hi")
await adapter.send_message(out)
await adapter.send_message(out)
# 仅 1 次 token 调用 + 2 次 send 调用 = 3 次(但若未缓存会是 4 次)
assert mock_client.post.call_count == 3
# 验证第一次 URL 是 tenant_token 端点
first_call_url = mock_client.post.call_args_list[0].args[0]
assert "tenant_access_token" in first_call_url
async def test_tenant_token_refresh_after_expiry(self):
"""TTL 过期后重新拉取 token。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
mock_response_token = MagicMock()
mock_response_token.status_code = 200
mock_response_token.json.return_value = {"code": 0, "tenant_access_token": "tok"}
mock_response_send = MagicMock()
mock_response_send.status_code = 200
mock_response_send.json.return_value = {"code": 0}
mock_client = AsyncMock()
mock_client.post = AsyncMock(
side_effect=[
mock_response_token,
mock_response_send,
mock_response_token,
mock_response_send,
]
)
adapter._client = mock_client
# 模拟 token 已过期
adapter._token_cache = ("old_tok", time.monotonic() - 1)
out = OutgoingMessage(channel=ChannelType.FEISHU, chat_id="c1", content="hi")
await adapter.send_message(out)
# 应该重新拉取 token
assert mock_client.post.call_count == 2
first_call_url = mock_client.post.call_args_list[0].args[0]
assert "tenant_access_token" in first_call_url
class TestClose:
"""资源释放。"""
async def test_close_no_client_is_noop(self):
"""未创建 httpx 客户端时 close 不抛异常。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
await adapter.close() # 不应抛异常
async def test_close_resets_client(self):
"""close 后客户端引用清空。"""
adapter = FeishuMessageAdapter(app_id="a", app_secret="b")
adapter._get_client() # 触发懒加载
assert adapter._client is not None
await adapter.close()
assert adapter._client is None
# ---------------------------------------------------------------------------
# Webhook 端点测试
# ---------------------------------------------------------------------------
@pytest.fixture
def webhook_app(monkeypatch):
"""挂载 channels 路由的最小 FastAPI 应用 — 用于 webhook 测试。
清理模块级状态_channels限流noncesecrets store保证测试隔离
"""
monkeypatch.delenv("AGENTKIT_ENV", raising=False)
monkeypatch.delenv("AGENTKIT_MASTER_KEY", raising=False)
channels_routes._secrets_store = SecretsStore(master_key=b"\x01" * KEY_SIZE)
channels_routes._channels.clear()
channels_routes._reset_webhook_state()
application = FastAPI()
application.include_router(channels_routes.router, prefix="/api/v1")
# webhook 端点不需要认证(外部平台调用)
@application.middleware("http")
async def _no_auth(request, call_next):
request.state.current_user = {"user_id": "anon", "role": "admin"}
return await call_next(request)
return application
@pytest.fixture
def webhook_client(webhook_app):
return TestClient(webhook_app)
def _register_feishu_channel(
client: TestClient,
*,
channel_id: str = "feishu-test",
encrypt_key: str = "test_key",
verification_token: str = "right_token",
) -> None:
"""注册一个飞书渠道(带完整凭证)。"""
client.post(
"/api/v1/channels",
json={
"channel_id": channel_id,
"channel_type": "feishu",
"name": "飞书测试",
"secrets": {
"app_id": "cli_test",
"app_secret": "secret_test",
"encrypt_key": encrypt_key,
"verification_token": verification_token,
},
},
)
class TestWebhookRateLimit:
"""Webhook 限流。"""
def test_rate_limit_blocks_after_100_requests(self, webhook_client):
"""同 IP 100 次请求放行,第 101 次 429。"""
_register_feishu_channel(webhook_client)
# 100 次快速请求 — 都会因签名失败 401但限流先于签名校验
for _ in range(100):
resp = webhook_client.post(
"/api/v1/channels/feishu-test/webhook",
content=b"{}",
headers={"Content-Type": "application/json"},
)
assert resp.status_code in (401, 200) # 401 = 签名失败(限流通过)
# 第 101 次 — 限流触发
resp = webhook_client.post(
"/api/v1/channels/feishu-test/webhook",
content=b"{}",
headers={"Content-Type": "application/json"},
)
assert resp.status_code == 429
class TestWebhookSignatureFailure:
"""Webhook 签名校验失败。"""
def test_missing_signature_returns_401(self, webhook_client):
"""无 X-Lark-Signature 头返回 401。"""
_register_feishu_channel(webhook_client)
resp = webhook_client.post(
"/api/v1/channels/feishu-test/webhook",
content=json.dumps(_make_event_body()).encode("utf-8"),
headers={"Content-Type": "application/json"},
)
assert resp.status_code == 401
class TestWebhookUrlVerification:
"""Webhook URL 验证流程。"""
def test_url_verification_returns_challenge(self, webhook_client):
"""url_verification 事件返回 {"challenge": ...}。"""
_register_feishu_channel(webhook_client)
body = json.dumps(
{"url_verification": "v1", "challenge": "verify_abc", "token": "right_token"}
).encode("utf-8")
headers = _sign(encrypt_key="test_key", body=body)
# 添加 Content-Type 以匹配 webhook 端点
headers["Content-Type"] = "application/json"
resp = webhook_client.post(
"/api/v1/channels/feishu-test/webhook",
content=body,
headers=headers,
)
assert resp.status_code == 200
data = resp.json()
assert data.get("challenge") == "verify_abc"
class TestWebhookNonceDedup:
"""Webhook nonce 去重。"""
def test_duplicate_nonce_returns_duplicate_marker(self, webhook_client):
"""相同 nonce 第二次返回 duplicate 标记。"""
_register_feishu_channel(webhook_client)
body = json.dumps(_make_event_body(text="hi")).encode("utf-8")
headers = _sign(encrypt_key="test_key", body=body, nonce="dup_nonce")
headers["Content-Type"] = "application/json"
# 第一次 — 正常处理
resp1 = webhook_client.post(
"/api/v1/channels/feishu-test/webhook",
content=body,
headers=headers,
)
assert resp1.status_code == 200
# 第二次 — nonce 重复
resp2 = webhook_client.post(
"/api/v1/channels/feishu-test/webhook",
content=body,
headers=headers,
)
assert resp2.status_code == 200
assert resp2.json().get("msg") == "duplicate"
class TestWebhookErrors:
"""Webhook 错误响应。"""
def test_unknown_channel_returns_404(self, webhook_client):
"""POST 到不存在的渠道 webhook 返回 404。"""
resp = webhook_client.post(
"/api/v1/channels/nonexistent/webhook",
content=b"{}",
headers={"Content-Type": "application/json"},
)
assert resp.status_code == 404
def test_non_feishu_channel_returns_400(self, webhook_client):
"""POST 到非飞书渠道 webhook 返回 400。"""
# 注册一个钉钉渠道
webhook_client.post(
"/api/v1/channels",
json={
"channel_id": "dingtalk-1",
"channel_type": "dingtalk",
"name": "钉钉",
},
)
resp = webhook_client.post(
"/api/v1/channels/dingtalk-1/webhook",
content=b"{}",
headers={"Content-Type": "application/json"},
)
assert resp.status_code == 400
class TestWebhookImmediateResponse:
"""Webhook 立即响应 + 后台任务调度。"""
def test_returns_200_immediately_with_background_task(self, webhook_client):
"""webhook 立即返回 200后台任务被调度。"""
_register_feishu_channel(webhook_client)
body = json.dumps(_make_event_body(text="hello")).encode("utf-8")
headers = _sign(encrypt_key="test_key", body=body, nonce="task_nonce_001")
headers["Content-Type"] = "application/json"
# mock _process_inbound_message 防止实际执行 chat 链路
with patch.object(
channels_routes, "_process_inbound_message", new_callable=AsyncMock
) as mock_proc:
resp = webhook_client.post(
"/api/v1/channels/feishu-test/webhook",
content=body,
headers=headers,
)
assert resp.status_code == 200
assert resp.json() == {"code": 0}
# 后台任务被调度TestClient 同步运行事件循环)
assert mock_proc.call_count == 1
# 验证传入的 message 内容
call_args = mock_proc.call_args
# call_args.args: (app_state, adapter, message)
passed_message = call_args.args[2]
assert passed_message.content == "hello"
assert passed_message.chat_id == "oc_test_chat"