From 4b58e8f661d545f31a54324ffb260554d21d9924 Mon Sep 17 00:00:00 2001 From: chiguyong Date: Thu, 25 Jun 2026 20:24:21 +0800 Subject: [PATCH] =?UTF-8?q?feat(channels):=20U11=20=E2=80=94=20Feishu=20IM?= =?UTF-8?q?=20adapter=20end-to-end=20(webhook=20+=20signature=20+=20AES-CB?= =?UTF-8?q?C=20decrypt=20+=20chat=20integration)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/agentkit/channels/feishu.py | 364 ++++++++++++++ src/agentkit/server/routes/channels.py | 230 ++++++++- tests/unit/channels/test_feishu.py | 625 +++++++++++++++++++++++++ 3 files changed, 1208 insertions(+), 11 deletions(-) create mode 100644 src/agentkit/channels/feishu.py create mode 100644 tests/unit/channels/test_feishu.py diff --git a/src/agentkit/channels/feishu.py b/src/agentkit/channels/feishu.py new file mode 100644 index 0000000..42d94f1 --- /dev/null +++ b/src/agentkit/channels/feishu.py @@ -0,0 +1,364 @@ +"""飞书 IM 适配器 (U11)。 + +实现 :class:`MessageAdapter` 协议,对接飞书开放平台事件订阅 webhook。 + +关键设计决策: +- 事件加密使用 AES-256-CBC(飞书官方协议),与 secrets store 的 AES-256-GCM 不同。 +- 签名校验 fail-closed:``encrypt_key`` 缺失或签名头缺失一律返回 False。 +- ``tenant_access_token`` 简单 TTL 缓存(5 分钟);过期后重新拉取。 +- httpx 客户端懒构造,避免未使用的适配器持有连接池。 +""" + +from __future__ import annotations + +import base64 +import hashlib +import hmac +import json +import logging +import re +import time +from datetime import datetime, timezone +from typing import Any + +import httpx + +from agentkit.channels.base import ( + ChannelType, + IncomingMessage, + MessageAdapter, + OutgoingMessage, +) + +logger = logging.getLogger(__name__) + +# 签名时间戳允许的最大偏移(秒)— 与飞书官方文档保持一致 +_SIGNATURE_MAX_AGE_SECONDS = 300 +# tenant_access_token 缓存 TTL(秒)— 飞书 token 实际有效期为 2h,留 5min 余量 +_TOKEN_CACHE_TTL = 300.0 + +# 飞书开放平台 API 端点 +_TENANT_TOKEN_URL = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal" +_SEND_MESSAGE_URL = "https://open.feishu.cn/open-apis/im/v1/messages" + +# @用户 提及标记正则 — 飞书文本消息中提及用户格式为 "@_user_N" +_MENTION_RE = re.compile(r"@_user_\d+\s*") + + +class URLVerificationChallenge(Exception): + """飞书 URL 验证事件 — webhook 端点需返回 ``{"challenge": ...}`` 响应。 + + 飞书在配置 webhook 时会发送一次 ``url_verification`` 事件,要求服务端 + 原样返回 ``challenge`` 字段以验证 URL 可达。 + """ + + def __init__(self, challenge: str) -> None: + super().__init__(f"URL verification challenge: {challenge}") + self.challenge = challenge + + +class SignatureVerificationError(Exception): + """事件 ``verification_token`` 校验失败 — 拒绝处理。""" + + +class FeishuMessageAdapter(MessageAdapter): + """飞书 IM 适配器。 + + 生命周期: + ``__init__`` → :meth:`verify_signature` → :meth:`receive_message` + → :meth:`send_message` → :meth:`close` + + Args: + app_id: 飞书应用 App ID。 + app_secret: 飞书应用 App Secret。 + encrypt_key: 事件订阅加密密钥(可选 — 启用加密订阅时必填)。 + verification_token: 事件订阅 Verification Token(可选 — 用于校验事件来源)。 + """ + + def __init__( + self, + app_id: str, + app_secret: str, + encrypt_key: str | None = None, + verification_token: str | None = None, + ) -> None: + self.app_id = app_id + self.app_secret = app_secret + self.encrypt_key = encrypt_key + self.verification_token = verification_token + # 懒加载 httpx 客户端 — 避免未使用的适配器持有连接池 + self._client: httpx.AsyncClient | None = None + # ponytail: 简单 TTL 缓存 (token, expiry)。天花板:单实例内存; + # 升级路径:Redis 缓存共享给多实例。 + self._token_cache: tuple[str, float] | None = None + + # ------------------------------------------------------------------ + # httpx 客户端懒加载 + # ------------------------------------------------------------------ + + def _get_client(self) -> httpx.AsyncClient: + if self._client is None: + self._client = httpx.AsyncClient(timeout=10.0) + return self._client + + # ------------------------------------------------------------------ + # 签名验证 + # ------------------------------------------------------------------ + + async def verify_signature(self, headers: dict[str, str], body: bytes) -> bool: + """验证飞书 webhook 签名。 + + fail-closed:``encrypt_key`` 缺失或签名头缺失一律返回 False。 + 时间戳超过 5 分钟视为重放攻击,拒绝。 + + Args: + headers: HTTP 请求头(键大小写不敏感查找)。 + body: 原始请求体字节。 + + Returns: + True 表示签名校验通过。 + """ + if not self.encrypt_key: + logger.warning("飞书适配器未配置 encrypt_key — 拒绝所有 webhook 请求") + return False + + signature = _header_get(headers, "X-Lark-Signature") + if not signature: + return False + + timestamp_str = _header_get(headers, "X-Lark-Request-Timestamp") + nonce = _header_get(headers, "X-Lark-Request-Nonce") + if not timestamp_str or not nonce: + return False + + # 时间戳重放保护 + try: + ts = int(timestamp_str) + except ValueError: + return False + now = datetime.now(timezone.utc).timestamp() + if abs(now - ts) > _SIGNATURE_MAX_AGE_SECONDS: + logger.warning("飞书 webhook 时间戳超出 %ds 窗口 — 拒绝", _SIGNATURE_MAX_AGE_SECONDS) + return False + + # 计算签名:sha256(timestamp + nonce + encrypt_key + body) + body_str = body.decode("utf-8") + expected = hashlib.sha256( + f"{timestamp_str}{nonce}{self.encrypt_key}{body_str}".encode("utf-8") + ).hexdigest() + + return hmac.compare_digest(signature, expected) + + # ------------------------------------------------------------------ + # 消息接收 / 解析 + # ------------------------------------------------------------------ + + async def receive_message(self, headers: dict[str, str], body: bytes) -> IncomingMessage: + """解析飞书 webhook 事件为标准化 :class:`IncomingMessage`。 + + Raises: + URLVerificationChallenge: 事件为 URL 验证请求。 + SignatureVerificationError: ``verification_token`` 不匹配。 + ValueError: 事件结构无法解析。 + """ + try: + data: dict[str, Any] = json.loads(body) + except json.JSONDecodeError as exc: + raise ValueError(f"飞书事件 body 不是合法 JSON: {exc}") from exc + + # URL 验证流程 — 飞书配置 webhook 时发送 + if "url_verification" in data or "challenge" in data: + raise URLVerificationChallenge(data.get("challenge", "")) + + # 加密事件 — AES-256-CBC 解密 + if "encrypt" in data: + data = self._decrypt_event(data["encrypt"]) + + # 校验 verification_token + if self.verification_token is not None: + token = data.get("verification_token") or data.get("header", {}).get("token") + if not token: + raise SignatureVerificationError("事件缺少 verification_token 字段") + if not hmac.compare_digest(token, self.verification_token): + raise SignatureVerificationError("verification_token 不匹配") + + event_id = data.get("event_id") or data.get("header", {}).get("event_id", "") + event = data.get("event", {}) + message = event.get("message", {}) + sender = event.get("sender", {}).get("sender_id", {}) + + chat_id = message.get("chat_id", "") + open_id = sender.get("open_id", "") + create_time = message.get("create_time", "") + timestamp = str(create_time) if create_time else "" + + content = self._extract_content(message) + + return IncomingMessage( + channel=ChannelType.FEISHU, + platform_message_id=event_id, + user_id=open_id, + chat_id=chat_id, + content=content, + raw_event=data, + timestamp=timestamp, + ) + + def _decrypt_event(self, encrypt_b64: str) -> dict[str, Any]: + """AES-256-CBC 解密飞书加密事件。 + + 飞书协议: + key = sha256(encrypt_key).digest()[:32] + ciphertext = IV(16B) + 密文 + plaintext = AES-256-CBC 解密后去除 PKCS7 padding + """ + if not self.encrypt_key: + raise SignatureVerificationError("加密事件但未配置 encrypt_key") + + from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + from cryptography.hazmat.primitives.padding import PKCS7 + + key = hashlib.sha256(self.encrypt_key.encode("utf-8")).digest() + ciphertext = base64.b64decode(encrypt_b64) + + if len(ciphertext) < 17: # IV(16) + 至少 1 字节密文 + raise ValueError("加密事件密文长度不足") + + iv = ciphertext[:16] + encrypted = ciphertext[16:] + + cipher = Cipher(algorithms.AES(key), modes.CBC(iv)) + decryptor = cipher.decryptor() + padded = decryptor.update(encrypted) + decryptor.finalize() + + unpadder = PKCS7(algorithms.AES.block_size).unpadder() + plaintext = unpadder.update(padded) + unpadder.finalize() + + return json.loads(plaintext.decode("utf-8")) + + def _extract_content(self, message: dict[str, Any]) -> str: + """从飞书 message 字段提取文本内容。 + + - text 类型:解析 ``content`` JSON 中的 ``text`` 字段,剥离 @ 提及标记。 + - 其他类型:返回 ``[unsupported message type: {type}]``。 + """ + message_type = message.get("message_type", "") + content_raw = message.get("content", "{}") + + if message_type == "text": + try: + content_obj = ( + json.loads(content_raw) if isinstance(content_raw, str) else content_raw + ) + text = content_obj.get("text", "") if isinstance(content_obj, dict) else "" + except (json.JSONDecodeError, TypeError): + return "" + # 剥离 @ 提及标记(如 "@_user_1 hello" → "hello") + return _MENTION_RE.sub("", text).strip() + + return f"[unsupported message type: {message_type}]" + + # ------------------------------------------------------------------ + # 消息发送 + # ------------------------------------------------------------------ + + async def send_message(self, message: OutgoingMessage) -> bool: + """向飞书发送文本消息。 + + Returns: + True 表示 HTTP 200 且响应 ``code == 0``。 + """ + try: + token = await self._get_tenant_access_token() + if not token: + return False + + client = self._get_client() + payload = { + "receive_id": message.chat_id, + "msg_type": "text", + "content": json.dumps({"text": message.content}), + } + resp = await client.post( + _SEND_MESSAGE_URL, + params={"receive_id_type": "chat_id"}, + json=payload, + headers={"Authorization": f"Bearer {token}"}, + ) + if resp.status_code != 200: + logger.error("飞书 send_message HTTP %d: %s", resp.status_code, resp.text[:200]) + return False + + data = resp.json() + if data.get("code") != 0: + logger.error( + "飞书 send_message 业务失败 code=%s msg=%s", + data.get("code"), + data.get("msg", "")[:200], + ) + return False + return True + except httpx.HTTPError as exc: + logger.error("飞书 send_message 网络错误: %s", exc) + return False + + async def _get_tenant_access_token(self) -> str | None: + """获取并缓存 ``tenant_access_token``。""" + # 命中缓存 + if self._token_cache is not None: + token, expiry = self._token_cache + if time.monotonic() < expiry: + return token + + try: + client = self._get_client() + resp = await client.post( + _TENANT_TOKEN_URL, + json={"app_id": self.app_id, "app_secret": self.app_secret}, + ) + if resp.status_code != 200: + logger.error("飞书 tenant_token HTTP %d: %s", resp.status_code, resp.text[:200]) + return None + data = resp.json() + if data.get("code") != 0: + logger.error( + "飞书 tenant_token 业务失败 code=%s msg=%s", + data.get("code"), + data.get("msg", "")[:200], + ) + return None + token = data.get("tenant_access_token", "") + if not token: + return None + self._token_cache = (token, time.monotonic() + _TOKEN_CACHE_TTL) + return token + except httpx.HTTPError as exc: + logger.error("飞书 tenant_token 网络错误: %s", exc) + return None + + # ------------------------------------------------------------------ + # 资源释放 + # ------------------------------------------------------------------ + + async def close(self) -> None: + """关闭 httpx 客户端(如已创建)。""" + if self._client is not None: + await self._client.aclose() + self._client = None + + +# --------------------------------------------------------------------------- +# 辅助函数 +# --------------------------------------------------------------------------- + + +def _header_get(headers: dict[str, str], name: str) -> str | None: + """大小写不敏感的 header 查找。""" + # 直接命中 + if name in headers: + return headers[name] + lower = name.lower() + for k, v in headers.items(): + if k.lower() == lower: + return v + return None diff --git a/src/agentkit/server/routes/channels.py b/src/agentkit/server/routes/channels.py index 472c663..1d549ba 100644 --- a/src/agentkit/server/routes/channels.py +++ b/src/agentkit/server/routes/channels.py @@ -1,30 +1,33 @@ -"""渠道管理端点 — 消息渠道配置的 CRUD。 +"""渠道管理端点 — 消息渠道配置的 CRUD + Webhook 入站处理。 端点: -- ``GET /channels`` — 列出已配置渠道 -- ``POST /channels`` — 注册新渠道(凭证加密存储) -- ``GET /channels/{id}`` — 获取渠道配置(不返回凭证明文) -- ``PUT /channels/{id}`` — 更新渠道配置 -- ``DELETE /channels/{id}`` — 删除渠道 +- ``GET /channels`` — 列出已配置渠道 +- ``POST /channels`` — 注册新渠道(凭证加密存储) +- ``GET /channels/{id}`` — 获取渠道配置(不返回凭证明文) +- ``PUT /channels/{id}`` — 更新渠道配置 +- ``DELETE /channels/{id}`` — 删除渠道 +- ``POST /channels/{id}/webhook`` — 平台 webhook 入站端点 凭证通过 :class:`SecretsStore` 加密存储;响应中绝不返回凭证明文, 仅返回 secret 字段名列表以便前端管理。 -Webhook 入站端点(接收平台消息)的 fail-closed 行为:Redis 不可用或 -签名校验失败时返回 503,不可跳过 nonce dedup 直接处理消息。具体 -webhook 端点由各平台适配器子模块实现,本模块仅提供配置管理。 +Webhook 入站端点的 fail-closed 行为:签名校验失败返回 401,限流超限 +返回 429。nonce dedup 用于防重放,重复事件返回 200(飞书要求 3s 内响应)。 """ from __future__ import annotations +import asyncio import logging import re +import time from typing import Any -from fastapi import APIRouter, Depends, HTTPException +from fastapi import APIRouter, Depends, HTTPException, Request from pydantic import BaseModel, Field -from agentkit.channels.base import ChannelType +from agentkit.channels.base import ChannelType, MessageAdapter, OutgoingMessage +from agentkit.channels.feishu import FeishuMessageAdapter, URLVerificationChallenge from agentkit.channels.secrets import SecretsStore from agentkit.server.auth.dependencies import require_permission from agentkit.server.auth.permissions import Permission @@ -64,6 +67,57 @@ def _reset_secrets_store() -> None: _channels: dict[str, dict[str, Any]] = {} +# --------------------------------------------------------------------------- +# Webhook 限流 + Nonce dedup(模块级单例) +# +# ponytail: 单进程内存实现。天花板:多 worker / 多实例部署下状态不共享, +# 限流与 nonce dedup 仅在单进程内有效。升级路径: +# - 限流:Redis 滑动窗口(INCR + EXPIRE 或 ZSET) +# - Nonce dedup:Redis SETNX with TTL +# --------------------------------------------------------------------------- + +# IP -> 请求时间戳列表(滑动窗口) +_rate_limits: dict[str, list[float]] = {} +_RATE_LIMIT_WINDOW = 60.0 # 窗口大小(秒) +_RATE_LIMIT_MAX = 100 # 窗口内最大请求数 + +# nonce -> 过期时间戳(与飞书签名时间戳窗口一致) +_seen_nonces: dict[str, float] = {} +_NONCE_TTL = 300.0 + + +def _check_rate_limit(client_ip: str) -> bool: + """滑动窗口限流。返回 True 表示放行,False 表示超限。""" + now = time.monotonic() + timestamps = _rate_limits.get(client_ip, []) + cutoff = now - _RATE_LIMIT_WINDOW + timestamps = [t for t in timestamps if t > cutoff] + if len(timestamps) >= _RATE_LIMIT_MAX: + _rate_limits[client_ip] = timestamps + return False + timestamps.append(now) + _rate_limits[client_ip] = timestamps + return True + + +def _check_nonce_dedup(nonce: str) -> bool: + """Nonce 去重。返回 True 表示新 nonce(应处理),False 表示重复(跳过)。""" + now = time.monotonic() + # 惰性清理过期项 + expired = [k for k, v in _seen_nonces.items() if v < now] + for k in expired: + del _seen_nonces[k] + if nonce in _seen_nonces: + return False + _seen_nonces[nonce] = now + _NONCE_TTL + return True + + +def _reset_webhook_state() -> None: + """重置限流与 nonce 状态(仅供测试使用)。""" + _rate_limits.clear() + _seen_nonces.clear() + def _validate_channel_id(channel_id: str) -> str: """校验渠道 ID,非法时抛 400。""" @@ -251,3 +305,157 @@ async def delete_channel( await store.delete_secret(f"{channel_id}:{name}") return {"deleted": channel_id} + + +# --------------------------------------------------------------------------- +# Webhook 入站端点(U11 — 飞书 IM 适配器端到端) +# --------------------------------------------------------------------------- + + +async def _build_adapter(channel_id: str) -> MessageAdapter: + """根据渠道配置与 secrets 构造适配器实例。 + + Raises: + HTTPException: 渠道不存在(404)、渠道类型非飞书(400)、缺少必要凭证(500)。 + """ + cfg = _channels.get(channel_id) + if cfg is None: + raise HTTPException(status_code=404, detail=f"渠道 '{channel_id}' 不存在") + if cfg["channel_type"] != ChannelType.FEISHU.value: + raise HTTPException(status_code=400, detail=f"渠道 '{channel_id}' 不是飞书渠道") + + store = _get_secrets_store() + app_id = await store.get_secret(f"{channel_id}:app_id") + app_secret = await store.get_secret(f"{channel_id}:app_secret") + encrypt_key = await store.get_secret(f"{channel_id}:encrypt_key") + verification_token = await store.get_secret(f"{channel_id}:verification_token") + if not app_id or not app_secret: + raise HTTPException( + status_code=500, detail=f"渠道 '{channel_id}' 缺少 app_id 或 app_secret" + ) + return FeishuMessageAdapter( + app_id=app_id, + app_secret=app_secret, + encrypt_key=encrypt_key, + verification_token=verification_token, + ) + + +async def _process_inbound_message( + app_state: Any, adapter: FeishuMessageAdapter, message: Any +) -> None: + """后台处理入站消息 — 调用 chat 链路并通过适配器回复。 + + 整个流程 try/except 包裹,任何异常仅记录日志,不向上抛出 + (webhook 必须保持响应能力)。``adapter.close()`` 在 finally 中调用。 + """ + try: + request_preprocessor = getattr(app_state, "request_preprocessor", None) + llm_gateway = getattr(app_state, "llm_gateway", None) + if request_preprocessor is None or llm_gateway is None: + logger.warning("app.state 缺少 request_preprocessor 或 llm_gateway — 跳过消息处理") + return + + # 路由预处理 — IM 场景使用默认 agent,无需技能注册表 + routing = await request_preprocessor.preprocess( + content=message.content, default_agent_name="default" + ) + + final_content = "" + execution_mode = getattr(routing, "execution_mode", None) + # DIRECT_CHAT 模式 — 直接调用 LLM + if execution_mode is not None and execution_mode.value == "direct_chat": + response = await llm_gateway.chat( + messages=[{"role": "user", "content": message.content}], + model=routing.model or "default", + ) + final_content = response.content + else: + # REACT 或其他模式 — 优先使用 ReActEngine,失败回退到 DIRECT_CHAT + try: + from agentkit.core.react import ReActEngine + + engine = ReActEngine(llm_gateway=llm_gateway) + result = await engine.execute( + messages=[{"role": "user", "content": message.content}], + tools=getattr(routing, "tools", None) or None, + model=routing.model or "default", + ) + final_content = getattr(result, "content", "") or "" + except Exception as exc: # noqa: BLE001 — 回退路径需捕获全部异常 + logger.warning("ReActEngine 执行失败,回退到 DIRECT_CHAT: %s", exc) + response = await llm_gateway.chat( + messages=[{"role": "user", "content": message.content}], + model=routing.model or "default", + ) + final_content = response.content + + if not final_content: + logger.warning("消息处理未产生内容 — 不发送回复") + return + + outgoing = OutgoingMessage( + channel=ChannelType.FEISHU, + chat_id=message.chat_id, + content=final_content, + ) + await adapter.send_message(outgoing) + except Exception as exc: # noqa: BLE001 — webhook 必须保持响应能力 + logger.exception("处理飞书入站消息失败: %s", exc) + finally: + try: + await adapter.close() + except Exception: # noqa: BLE001 + logger.debug("adapter.close() 异常已忽略") + + +def _get_client_ip(request: Request) -> str: + """获取客户端 IP — 优先取 X-Forwarded-For 首个 IP,否则取 request.client.host。""" + forwarded = request.headers.get("x-forwarded-for") + if forwarded: + # 取首个 IP(链路中第一个为原始客户端) + return forwarded.split(",")[0].strip() + if request.client: + return request.client.host + return "unknown" + + +@router.post("/channels/{channel_id}/webhook") +async def channel_webhook(channel_id: str, request: Request) -> dict[str, Any]: + """飞书 webhook 端点 — 接收平台事件。 + + 安全流程(按顺序): + 1. Per-IP 限流(100 req/min)— 超限 429 + 2. 读取原始 body(未解析) + 3. 签名验证 — 失败 401 + 4. Nonce dedup — 重复返回 200(飞书要求 3s 内响应) + 5. URL verification challenge — 返回 ``{"challenge": ...}`` + 6. 解析消息 → 后台异步处理 → 立即返回 200 + """ + client_ip = _get_client_ip(request) + if not _check_rate_limit(client_ip): + raise HTTPException(status_code=429, detail="请求过于频繁,请稍后重试") + + body = await request.body() + + adapter = await _build_adapter(channel_id) + + headers_dict = dict(request.headers) + if not await adapter.verify_signature(headers_dict, body): + raise HTTPException(status_code=401, detail="签名校验失败") + + # Nonce dedup(可选 — 若头不存在则跳过去重) + nonce = request.headers.get("x-lark-request-nonce") + if nonce and not _check_nonce_dedup(nonce): + return {"code": 0, "msg": "duplicate"} + + try: + message = await adapter.receive_message(headers_dict, body) + except URLVerificationChallenge as e: + # URL 验证流程 — 飞书配置 webhook 时发送 + return {"challenge": e.challenge} + + # 异步处理 — 不阻塞 webhook 响应(飞书要求 3s 内返回 200) + asyncio.create_task(_process_inbound_message(request.app.state, adapter, message)) + + return {"code": 0} diff --git a/tests/unit/channels/test_feishu.py b/tests/unit/channels/test_feishu.py new file mode 100644 index 0000000..1bdc193 --- /dev/null +++ b/tests/unit/channels/test_feishu.py @@ -0,0 +1,625 @@ +"""飞书 IM 适配器端到端测试 (U11)。 + +覆盖场景: +- URL 验证 challenge 事件 +- 签名校验(有效/无效/过期/缺失 encrypt_key) +- 加密事件 AES-256-CBC 解密 +- 文本消息解析(含 @ 提及剥离) +- 不支持消息类型 +- verification_token 校验 +- send_message 成功/失败 +- tenant_access_token 缓存 +- Webhook 端点:限流、签名失败、URL 验证、nonce dedup、404/400、立即 200 +""" + +from __future__ import annotations + +import base64 +import hashlib +import json +import time +from datetime import datetime, timezone +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives.padding import PKCS7 +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from agentkit.channels.base import ChannelType, IncomingMessage, OutgoingMessage +from agentkit.channels.feishu import ( + FeishuMessageAdapter, + SignatureVerificationError, + URLVerificationChallenge, +) +from agentkit.channels.secrets import KEY_SIZE, SecretsStore +from agentkit.server.routes import channels as channels_routes + + +# --------------------------------------------------------------------------- +# 辅助函数 +# --------------------------------------------------------------------------- + + +def _make_event_body( + *, + text: str = "hello", + message_type: str = "text", + event_id: str = "evt_001", + chat_id: str = "oc_test_chat", + open_id: str = "ou_test_user", + create_time: str = "1700000000000", + verification_token: str | None = "right_token", +) -> dict[str, Any]: + """构造飞书事件 JSON 体(非加密)。""" + content: Any + if message_type == "text": + content = json.dumps({"text": text}) + else: + content = json.dumps({"image_key": "img_xxx"}) + + event: dict[str, Any] = { + "event_id": event_id, + "event": { + "message": { + "message_id": "om_test_msg", + "chat_id": chat_id, + "message_type": message_type, + "content": content, + "create_time": create_time, + }, + "sender": {"sender_id": {"open_id": open_id}}, + }, + } + if verification_token is not None: + event["verification_token"] = verification_token + return event + + +def _sign( + *, encrypt_key: str, body: bytes, timestamp: int | None = None, nonce: str = "n1" +) -> dict[str, str]: + """构造包含正确飞书签名的 headers。""" + if timestamp is None: + timestamp = int(datetime.now(timezone.utc).timestamp()) + body_str = body.decode("utf-8") + sig = hashlib.sha256(f"{timestamp}{nonce}{encrypt_key}{body_str}".encode("utf-8")).hexdigest() + return { + "X-Lark-Signature": sig, + "X-Lark-Request-Timestamp": str(timestamp), + "X-Lark-Request-Nonce": nonce, + } + + +def _encrypt_event(encrypt_key: str, plaintext: dict[str, Any]) -> bytes: + """用飞书 AES-256-CBC 协议加密事件,返回包含 ``encrypt`` 字段的 JSON body。""" + key = hashlib.sha256(encrypt_key.encode("utf-8")).digest() + iv = b"\x00" * 16 # 测试用确定性 IV;生产环境飞书用随机 IV + plaintext_bytes = json.dumps(plaintext).encode("utf-8") + + padder = PKCS7(algorithms.AES.block_size).padder() + padded = padder.update(plaintext_bytes) + padder.finalize() + + cipher = Cipher(algorithms.AES(key), modes.CBC(iv)) + encryptor = cipher.encryptor() + ciphertext = encryptor.update(padded) + encryptor.finalize() + + # 飞书协议:ciphertext = IV(16B) + 密文 + combined = iv + ciphertext + encrypted_b64 = base64.b64encode(combined).decode("utf-8") + return json.dumps({"encrypt": encrypted_b64}).encode("utf-8") + + +# --------------------------------------------------------------------------- +# FeishuMessageAdapter 单元测试 +# --------------------------------------------------------------------------- + + +class TestUrlVerification: + """URL 验证 challenge 流程。""" + + async def test_url_verification_raises_challenge(self): + """事件含 url_verification 字段时抛 URLVerificationChallenge。""" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b") + body = json.dumps({"url_verification": "v1", "challenge": "abc123", "token": "xxx"}).encode( + "utf-8" + ) + with pytest.raises(URLVerificationChallenge) as exc_info: + await adapter.receive_message({}, body) + assert exc_info.value.challenge == "abc123" + + async def test_challenge_only_field_also_raises(self): + """仅有 challenge 字段时也抛出 URLVerificationChallenge。""" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b") + body = json.dumps({"challenge": "xyz"}).encode("utf-8") + with pytest.raises(URLVerificationChallenge) as exc_info: + await adapter.receive_message({}, body) + assert exc_info.value.challenge == "xyz" + + +class TestSignatureVerification: + """签名校验。""" + + async def test_valid_signature(self): + """正确签名返回 True。""" + encrypt_key = "test_key" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b", encrypt_key=encrypt_key) + body = json.dumps(_make_event_body()).encode("utf-8") + headers = _sign(encrypt_key=encrypt_key, body=body) + assert await adapter.verify_signature(headers, body) is True + + async def test_invalid_signature(self): + """篡改签名返回 False。""" + encrypt_key = "test_key" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b", encrypt_key=encrypt_key) + body = json.dumps(_make_event_body()).encode("utf-8") + headers = _sign(encrypt_key=encrypt_key, body=body) + headers["X-Lark-Signature"] = "tampered_signature" + assert await adapter.verify_signature(headers, body) is False + + async def test_expired_timestamp(self): + """时间戳超过 5 分钟返回 False。""" + encrypt_key = "test_key" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b", encrypt_key=encrypt_key) + body = json.dumps(_make_event_body()).encode("utf-8") + old_ts = int(datetime.now(timezone.utc).timestamp()) - 600 + headers = _sign(encrypt_key=encrypt_key, body=body, timestamp=old_ts) + assert await adapter.verify_signature(headers, body) is False + + async def test_missing_encrypt_key(self): + """适配器未配置 encrypt_key 时返回 False(fail-closed)。""" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b") + body = json.dumps(_make_event_body()).encode("utf-8") + headers = _sign(encrypt_key="any", body=body) + assert await adapter.verify_signature(headers, body) is False + + async def test_missing_signature_header(self): + """缺 X-Lark-Signature 头返回 False。""" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b", encrypt_key="k") + body = b'{"x":1}' + assert await adapter.verify_signature({}, body) is False + + async def test_lowercase_header_lookup(self): + """header 名大小写不敏感查找。""" + encrypt_key = "test_key" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b", encrypt_key=encrypt_key) + body = json.dumps(_make_event_body()).encode("utf-8") + headers = _sign(encrypt_key=encrypt_key, body=body) + # 改为小写键 + headers = {k.lower(): v for k, v in headers.items()} + assert await adapter.verify_signature(headers, body) is True + + +class TestEncryptedEventDecryption: + """AES-256-CBC 加密事件解密。""" + + async def test_decrypt_encrypted_event(self): + """加密事件可正确解密并返回 IncomingMessage。""" + encrypt_key = "test_key" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b", encrypt_key=encrypt_key) + plain_event = _make_event_body(text="secret hello", verification_token=None) + body = _encrypt_event(encrypt_key, plain_event) + + msg = await adapter.receive_message({}, body) + assert isinstance(msg, IncomingMessage) + assert msg.channel == ChannelType.FEISHU + assert msg.content == "secret hello" + assert msg.platform_message_id == "evt_001" + assert msg.chat_id == "oc_test_chat" + assert msg.user_id == "ou_test_user" + + +class TestTextMessageParsing: + """文本消息解析。""" + + async def test_plain_text_message(self): + """非加密文本事件解析为 IncomingMessage。""" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token=None) + body = json.dumps(_make_event_body(text="hello world")).encode("utf-8") + msg = await adapter.receive_message({}, body) + assert msg.content == "hello world" + assert msg.channel == ChannelType.FEISHU + + async def test_mention_stripping(self): + """文本中的 @ 提及标记被剥离。""" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token=None) + body = json.dumps(_make_event_body(text="@_user_1 hello world")).encode("utf-8") + msg = await adapter.receive_message({}, body) + assert msg.content == "hello world" + + async def test_mention_multiple_users(self): + """多个 @ 提及标记全部被剥离。""" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token=None) + body = json.dumps(_make_event_body(text="@_user_1 @_user_2 hi there")).encode("utf-8") + msg = await adapter.receive_message({}, body) + assert msg.content == "hi there" + + async def test_unsupported_message_type(self): + """非 text 类型返回 unsupported 占位内容。""" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token=None) + body = json.dumps(_make_event_body(message_type="image")).encode("utf-8") + msg = await adapter.receive_message({}, body) + assert msg.content.startswith("[unsupported message type: image]") + + async def test_timestamp_extracted(self): + """create_time 转换为 timestamp 字符串。""" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token=None) + body = json.dumps(_make_event_body(create_time="1700000000123")).encode("utf-8") + msg = await adapter.receive_message({}, body) + assert msg.timestamp == "1700000000123" + + +class TestVerificationTokenCheck: + """verification_token 校验。""" + + async def test_token_mismatch_raises(self): + """verification_token 不匹配抛 SignatureVerificationError。""" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token="right") + body = json.dumps(_make_event_body(verification_token="wrong")).encode("utf-8") + with pytest.raises(SignatureVerificationError): + await adapter.receive_message({}, body) + + async def test_token_match_passes(self): + """verification_token 匹配则正常处理。""" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token="right") + body = json.dumps(_make_event_body(verification_token="right")).encode("utf-8") + msg = await adapter.receive_message({}, body) + assert msg.content == "hello" + + async def test_missing_token_raises_when_configured(self): + """配置了 token 但事件缺失 token 字段抛 SignatureVerificationError。""" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b", verification_token="right") + body = json.dumps(_make_event_body(verification_token=None)).encode("utf-8") + with pytest.raises(SignatureVerificationError): + await adapter.receive_message({}, body) + + +class TestSendMessage: + """send_message 行为。""" + + async def test_send_message_success(self): + """HTTP 200 + code=0 返回 True。""" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b") + # mock token 与 send_message 两次 HTTP 调用 + mock_response_token = MagicMock() + mock_response_token.status_code = 200 + mock_response_token.json.return_value = {"code": 0, "tenant_access_token": "tok_123"} + + mock_response_send = MagicMock() + mock_response_send.status_code = 200 + mock_response_send.json.return_value = {"code": 0, "data": {"message_id": "om_x"}} + + mock_client = AsyncMock() + mock_client.post = AsyncMock(side_effect=[mock_response_token, mock_response_send]) + adapter._client = mock_client + + out = OutgoingMessage(channel=ChannelType.FEISHU, chat_id="c1", content="hi") + result = await adapter.send_message(out) + assert result is True + # 验证 send_message 调用 URL 与 payload + second_call = mock_client.post.call_args_list[1] + assert "messages" in second_call.args[0] + assert second_call.kwargs["params"] == {"receive_id_type": "chat_id"} + assert second_call.kwargs["headers"]["Authorization"] == "Bearer tok_123" + + async def test_send_message_business_failure(self): + """HTTP 200 但 code != 0 返回 False。""" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b") + mock_response_token = MagicMock() + mock_response_token.status_code = 200 + mock_response_token.json.return_value = {"code": 0, "tenant_access_token": "tok_x"} + + mock_response_send = MagicMock() + mock_response_send.status_code = 200 + mock_response_send.json.return_value = {"code": 9999, "msg": "error"} + + mock_client = AsyncMock() + mock_client.post = AsyncMock(side_effect=[mock_response_token, mock_response_send]) + adapter._client = mock_client + + out = OutgoingMessage(channel=ChannelType.FEISHU, chat_id="c1", content="hi") + assert await adapter.send_message(out) is False + + async def test_send_message_token_fetch_failure(self): + """获取 token 失败返回 False。""" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b") + mock_response_token = MagicMock() + mock_response_token.status_code = 401 + mock_response_token.json.return_value = {"code": 99991661, "msg": "invalid app"} + mock_response_token.text = "invalid" + + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=mock_response_token) + adapter._client = mock_client + + out = OutgoingMessage(channel=ChannelType.FEISHU, chat_id="c1", content="hi") + assert await adapter.send_message(out) is False + + async def test_tenant_token_caching(self): + """同 TTL 内的两次 send_message 只拉取一次 token。""" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b") + mock_response_token = MagicMock() + mock_response_token.status_code = 200 + mock_response_token.json.return_value = {"code": 0, "tenant_access_token": "cached_tok"} + + mock_response_send = MagicMock() + mock_response_send.status_code = 200 + mock_response_send.json.return_value = {"code": 0} + + mock_client = AsyncMock() + # 第一次:token + send;第二次:仅 send(token 走缓存) + mock_client.post = AsyncMock( + side_effect=[mock_response_token, mock_response_send, mock_response_send] + ) + adapter._client = mock_client + + out = OutgoingMessage(channel=ChannelType.FEISHU, chat_id="c1", content="hi") + await adapter.send_message(out) + await adapter.send_message(out) + + # 仅 1 次 token 调用 + 2 次 send 调用 = 3 次(但若未缓存会是 4 次) + assert mock_client.post.call_count == 3 + # 验证第一次 URL 是 tenant_token 端点 + first_call_url = mock_client.post.call_args_list[0].args[0] + assert "tenant_access_token" in first_call_url + + async def test_tenant_token_refresh_after_expiry(self): + """TTL 过期后重新拉取 token。""" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b") + mock_response_token = MagicMock() + mock_response_token.status_code = 200 + mock_response_token.json.return_value = {"code": 0, "tenant_access_token": "tok"} + + mock_response_send = MagicMock() + mock_response_send.status_code = 200 + mock_response_send.json.return_value = {"code": 0} + + mock_client = AsyncMock() + mock_client.post = AsyncMock( + side_effect=[ + mock_response_token, + mock_response_send, + mock_response_token, + mock_response_send, + ] + ) + adapter._client = mock_client + + # 模拟 token 已过期 + adapter._token_cache = ("old_tok", time.monotonic() - 1) + + out = OutgoingMessage(channel=ChannelType.FEISHU, chat_id="c1", content="hi") + await adapter.send_message(out) + # 应该重新拉取 token + assert mock_client.post.call_count == 2 + first_call_url = mock_client.post.call_args_list[0].args[0] + assert "tenant_access_token" in first_call_url + + +class TestClose: + """资源释放。""" + + async def test_close_no_client_is_noop(self): + """未创建 httpx 客户端时 close 不抛异常。""" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b") + await adapter.close() # 不应抛异常 + + async def test_close_resets_client(self): + """close 后客户端引用清空。""" + adapter = FeishuMessageAdapter(app_id="a", app_secret="b") + adapter._get_client() # 触发懒加载 + assert adapter._client is not None + await adapter.close() + assert adapter._client is None + + +# --------------------------------------------------------------------------- +# Webhook 端点测试 +# --------------------------------------------------------------------------- + + +@pytest.fixture +def webhook_app(monkeypatch): + """挂载 channels 路由的最小 FastAPI 应用 — 用于 webhook 测试。 + + 清理模块级状态(_channels、限流、nonce、secrets store)保证测试隔离。 + """ + monkeypatch.delenv("AGENTKIT_ENV", raising=False) + monkeypatch.delenv("AGENTKIT_MASTER_KEY", raising=False) + + channels_routes._secrets_store = SecretsStore(master_key=b"\x01" * KEY_SIZE) + channels_routes._channels.clear() + channels_routes._reset_webhook_state() + + application = FastAPI() + application.include_router(channels_routes.router, prefix="/api/v1") + + # webhook 端点不需要认证(外部平台调用) + @application.middleware("http") + async def _no_auth(request, call_next): + request.state.current_user = {"user_id": "anon", "role": "admin"} + return await call_next(request) + + return application + + +@pytest.fixture +def webhook_client(webhook_app): + return TestClient(webhook_app) + + +def _register_feishu_channel( + client: TestClient, + *, + channel_id: str = "feishu-test", + encrypt_key: str = "test_key", + verification_token: str = "right_token", +) -> None: + """注册一个飞书渠道(带完整凭证)。""" + client.post( + "/api/v1/channels", + json={ + "channel_id": channel_id, + "channel_type": "feishu", + "name": "飞书测试", + "secrets": { + "app_id": "cli_test", + "app_secret": "secret_test", + "encrypt_key": encrypt_key, + "verification_token": verification_token, + }, + }, + ) + + +class TestWebhookRateLimit: + """Webhook 限流。""" + + def test_rate_limit_blocks_after_100_requests(self, webhook_client): + """同 IP 100 次请求放行,第 101 次 429。""" + _register_feishu_channel(webhook_client) + # 100 次快速请求 — 都会因签名失败 401,但限流先于签名校验 + for _ in range(100): + resp = webhook_client.post( + "/api/v1/channels/feishu-test/webhook", + content=b"{}", + headers={"Content-Type": "application/json"}, + ) + assert resp.status_code in (401, 200) # 401 = 签名失败(限流通过) + # 第 101 次 — 限流触发 + resp = webhook_client.post( + "/api/v1/channels/feishu-test/webhook", + content=b"{}", + headers={"Content-Type": "application/json"}, + ) + assert resp.status_code == 429 + + +class TestWebhookSignatureFailure: + """Webhook 签名校验失败。""" + + def test_missing_signature_returns_401(self, webhook_client): + """无 X-Lark-Signature 头返回 401。""" + _register_feishu_channel(webhook_client) + resp = webhook_client.post( + "/api/v1/channels/feishu-test/webhook", + content=json.dumps(_make_event_body()).encode("utf-8"), + headers={"Content-Type": "application/json"}, + ) + assert resp.status_code == 401 + + +class TestWebhookUrlVerification: + """Webhook URL 验证流程。""" + + def test_url_verification_returns_challenge(self, webhook_client): + """url_verification 事件返回 {"challenge": ...}。""" + _register_feishu_channel(webhook_client) + body = json.dumps( + {"url_verification": "v1", "challenge": "verify_abc", "token": "right_token"} + ).encode("utf-8") + headers = _sign(encrypt_key="test_key", body=body) + # 添加 Content-Type 以匹配 webhook 端点 + headers["Content-Type"] = "application/json" + + resp = webhook_client.post( + "/api/v1/channels/feishu-test/webhook", + content=body, + headers=headers, + ) + assert resp.status_code == 200 + data = resp.json() + assert data.get("challenge") == "verify_abc" + + +class TestWebhookNonceDedup: + """Webhook nonce 去重。""" + + def test_duplicate_nonce_returns_duplicate_marker(self, webhook_client): + """相同 nonce 第二次返回 duplicate 标记。""" + _register_feishu_channel(webhook_client) + body = json.dumps(_make_event_body(text="hi")).encode("utf-8") + headers = _sign(encrypt_key="test_key", body=body, nonce="dup_nonce") + headers["Content-Type"] = "application/json" + + # 第一次 — 正常处理 + resp1 = webhook_client.post( + "/api/v1/channels/feishu-test/webhook", + content=body, + headers=headers, + ) + assert resp1.status_code == 200 + + # 第二次 — nonce 重复 + resp2 = webhook_client.post( + "/api/v1/channels/feishu-test/webhook", + content=body, + headers=headers, + ) + assert resp2.status_code == 200 + assert resp2.json().get("msg") == "duplicate" + + +class TestWebhookErrors: + """Webhook 错误响应。""" + + def test_unknown_channel_returns_404(self, webhook_client): + """POST 到不存在的渠道 webhook 返回 404。""" + resp = webhook_client.post( + "/api/v1/channels/nonexistent/webhook", + content=b"{}", + headers={"Content-Type": "application/json"}, + ) + assert resp.status_code == 404 + + def test_non_feishu_channel_returns_400(self, webhook_client): + """POST 到非飞书渠道 webhook 返回 400。""" + # 注册一个钉钉渠道 + webhook_client.post( + "/api/v1/channels", + json={ + "channel_id": "dingtalk-1", + "channel_type": "dingtalk", + "name": "钉钉", + }, + ) + resp = webhook_client.post( + "/api/v1/channels/dingtalk-1/webhook", + content=b"{}", + headers={"Content-Type": "application/json"}, + ) + assert resp.status_code == 400 + + +class TestWebhookImmediateResponse: + """Webhook 立即响应 + 后台任务调度。""" + + def test_returns_200_immediately_with_background_task(self, webhook_client): + """webhook 立即返回 200,后台任务被调度。""" + _register_feishu_channel(webhook_client) + body = json.dumps(_make_event_body(text="hello")).encode("utf-8") + headers = _sign(encrypt_key="test_key", body=body, nonce="task_nonce_001") + headers["Content-Type"] = "application/json" + + # mock _process_inbound_message 防止实际执行 chat 链路 + with patch.object( + channels_routes, "_process_inbound_message", new_callable=AsyncMock + ) as mock_proc: + resp = webhook_client.post( + "/api/v1/channels/feishu-test/webhook", + content=body, + headers=headers, + ) + assert resp.status_code == 200 + assert resp.json() == {"code": 0} + + # 后台任务被调度(TestClient 同步运行事件循环) + assert mock_proc.call_count == 1 + # 验证传入的 message 内容 + call_args = mock_proc.call_args + # call_args.args: (app_state, adapter, message) + passed_message = call_args.args[2] + assert passed_message.content == "hello" + assert passed_message.chat_id == "oc_test_chat"