feat(mcp): U13 — refactor MCPServer to route factory + mount at /api/v1/mcp with auth

This commit is contained in:
chiguyong 2026-06-25 20:58:41 +08:00
parent 8998f94c42
commit 16c33be295
4 changed files with 579 additions and 6 deletions

View File

@ -1,18 +1,176 @@
"""MCP Server - 将 Agent 能力暴露为 MCP 工具
基于 FastAPI 实现支持 Streamable HTTP 传输
U13: 重构为路由工厂 ``create_mcp_router()``可挂载到主 FastAPI app
``/api/v1/mcp/`` 前缀下复用主 app 的认证中间件JWT + API Key
旧的独立 ``MCPServer`` 类保留作为向后兼容独立 port 8080 部署
但在 ``_create_app()`` 时发出 DeprecationWarning
"""
from __future__ import annotations
import logging
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Request
from agentkit.server.auth.dependencies import require_permission
from agentkit.server.auth.permissions import Permission
logger = logging.getLogger(__name__)
# MCP 端点要求至少 member 权限CHAT。API Key 与 JWT 均可。
# 认证由主 app 的 AuthMiddleware 处理,这里只做权限校验。
_mcp_member_auth = require_permission(Permission.CHAT)
def create_mcp_router(tool_registry: Any = None) -> APIRouter:
"""构造 MCP 路由,挂载到主 app 的 ``/api/v1/mcp/`` 前缀下。
所有端点要求至少 member 权限Permission.CHAT认证由主 app
AuthMiddleware 处理JWT + API Key 双轨
Args:
tool_registry: ToolRegistry 实例若为 None所有工具相关
端点返回空列表或错误
Returns:
APIRouter包含以下路由
- GET /tools/list 列出 MCP 工具
- POST /tools/call 调用 MCP 工具
- GET /health 健康检查
- POST / JSON-RPC 2.0 端点MCP 协议兼容
"""
router = APIRouter(tags=["mcp"])
@router.get("/tools/list")
async def list_tools(_user: dict = Depends(_mcp_member_auth)) -> dict[str, Any]:
"""列出所有可用的 MCP 工具。"""
if tool_registry is None:
return {"tools": []}
tools = tool_registry.list_tools()
return {
"tools": [
{
"name": t.name,
"description": t.description,
"inputSchema": t.input_schema or {},
}
for t in tools
]
}
@router.post("/tools/call")
async def call_tool(
request: dict,
_user: dict = Depends(_mcp_member_auth),
) -> dict[str, Any]:
"""调用指定的 MCP 工具。"""
tool_name = request.get("name")
arguments = request.get("arguments", {})
if not tool_name or tool_registry is None:
raise HTTPException(
status_code=400,
detail="Tool not specified or registry not configured",
)
try:
tool = tool_registry.get(tool_name)
except Exception:
raise HTTPException(status_code=404, detail=f"Tool '{tool_name}' not found")
try:
result = await tool.safe_execute(**arguments)
return {"content": [{"type": "text", "text": str(result)}]}
except Exception as e:
logger.exception(f"MCP tool '{tool_name}' execution failed")
return {"isError": True, "content": [{"type": "text", "text": str(e)}]}
@router.get("/health")
async def health(_user: dict = Depends(_mcp_member_auth)) -> dict[str, str]:
"""MCP 健康检查。"""
return {"status": "ok"}
@router.post("/")
async def jsonrpc_endpoint(
request: Request,
_user: dict = Depends(_mcp_member_auth),
) -> dict[str, Any]:
"""JSON-RPC 2.0 端点 — MCP 协议兼容。
支持 methods: initialize, tools/list, tools/call
"""
try:
body = await request.json()
except Exception:
return {
"jsonrpc": "2.0",
"error": {"code": -32700, "message": "Parse error"},
"id": None,
}
method = body.get("method", "")
params = body.get("params", {})
req_id = body.get("id")
if method == "initialize":
result = {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"serverInfo": {"name": "agentkit-mcp-server", "version": "2.0.0"},
}
elif method == "tools/list":
if tool_registry is None:
result = {"tools": []}
else:
tools = tool_registry.list_tools()
result = {
"tools": [
{
"name": t.name,
"description": t.description,
"inputSchema": t.input_schema or {},
}
for t in tools
]
}
elif method == "tools/call":
tool_name = params.get("name", "")
arguments = params.get("arguments", {})
if not tool_name or tool_registry is None:
result = {
"isError": True,
"content": [{"type": "text", "text": "Tool not found"}],
}
else:
try:
tool = tool_registry.get(tool_name)
tool_result = await tool.safe_execute(**arguments)
result = {"content": [{"type": "text", "text": str(tool_result)}]}
except Exception as e:
result = {"isError": True, "content": [{"type": "text", "text": str(e)}]}
else:
return {
"jsonrpc": "2.0",
"error": {"code": -32601, "message": f"Method not found: {method}"},
"id": req_id,
}
return {"jsonrpc": "2.0", "result": result, "id": req_id}
return router
# Backward-compat: legacy MCPServer class
class MCPServer:
"""MCP Server - 暴露 Agent 能力为 MCP 工具
"""[DEPRECATED] 独立 MCP Server — 使用 create_mcp_router() 替代。
自动将 ToolRegistry 中注册的工具暴露为 MCP 工具端点
保留用于向后兼容独立 port 8080 部署零认证 仅适合隔离网络
将在下一版本移除新部署应通过 :func:`create_mcp_router` 挂载到主 app
``/api/v1/mcp/`` 前缀下复用主 app JWT + API Key 认证
"""
def __init__(self, tool_registry: Any = None, host: str = "0.0.0.0", port: int = 8080):
@ -22,10 +180,22 @@ class MCPServer:
self._app = None
def _create_app(self):
"""创建 FastAPI 应用"""
"""创建 FastAPI 应用(无认证 — 仅向后兼容)。
发出 :class:`DeprecationWarning` 提示迁移到 ``create_mcp_router()``
"""
import warnings
warnings.warn(
"MCPServer 独立 app 无认证保护,建议迁移到 create_mcp_router() "
"并挂载到主 app 的 /api/v1/mcp/。详见 U13。",
DeprecationWarning,
stacklevel=2,
)
try:
from fastapi import FastAPI
from fastapi import Request
from fastapi import Request # noqa: F401 — 用于 jsonrpc_endpoint 的类型注解
except ImportError:
raise ImportError("MCP Server requires fastapi: pip install fischer-agentkit[mcp]")
@ -72,8 +242,6 @@ class MCPServer:
Handles requests from HTTPTransport which sends JSON-RPC format.
"""
import json
try:
body = await request.json()
except Exception:

View File

@ -1085,6 +1085,13 @@ def create_app(
app.include_router(bitable_routes.router, prefix="/api/v1")
app.include_router(channels_routes.router, prefix="/api/v1")
# U13: 将 MCP Server 合并至主 app挂载在 /api/v1/mcp/ 前缀下,
# 复用主 app 的 JWT + API Key 认证中间件。
from agentkit.mcp.server import create_mcp_router
mcp_router = create_mcp_router(tool_registry=getattr(app.state, "tool_registry", None))
app.include_router(mcp_router, prefix="/api/v1/mcp")
# Serve GUI when in GUI mode
gui_mode = os.environ.get("AGENTKIT_GUI_MODE")
if gui_mode:

View File

View File

@ -0,0 +1,398 @@
"""U13 — MCP Server 认证 + 合并至主 app 的单元测试。
覆盖场景
- 无认证 / 无效凭据 401
- 有效 API Key / 有效 JWTmember 200 + 工具列表
- 有效 JWT 但无权限guest 403
- tools/call 成功 / 未知工具404 / 执行错误
- JSON-RPC 2.0 端点initialize / tools/list / tools/call / 未知方法 / 解析错误
- MCPServer 类向后兼容DeprecationWarning
- create_mcp_router() 返回 APIRouter / registry
"""
from __future__ import annotations
import warnings
from typing import Any
from unittest.mock import AsyncMock, MagicMock
import httpx
import jwt
from fastapi import FastAPI
from fastapi.routing import APIRouter
from agentkit.mcp.server import MCPServer, create_mcp_router
from agentkit.server.auth.middleware import AuthMiddleware
# 测试用的固定凭据 — 仅限单元测试,不可用于生产。
JWT_SECRET = "u13-test-jwt-secret-xxxxxxxxxxxxx"
API_KEY = "u13-test-api-key-yyy"
# ── 测试辅助函数 ──────────────────────────────────────────
def _make_app(tool_registry: Any = None, *, dev_mode: bool = False) -> FastAPI:
"""构造测试用 FastAPI app挂载 MCP router + AuthMiddleware。
Args:
tool_registry: ToolRegistry 实例 mockNone 表示未配置
dev_mode: True = 不添加 AuthMiddleware开发模式所有请求放行
"""
app = FastAPI()
app.state.tool_registry = tool_registry
if not dev_mode:
app.add_middleware(AuthMiddleware, jwt_secret=JWT_SECRET, api_key=API_KEY)
mcp_router = create_mcp_router(tool_registry=tool_registry)
app.include_router(mcp_router, prefix="/api/v1/mcp")
return app
def _make_mock_tool(
name: str = "echo",
description: str = "Echo tool",
input_schema: dict | None = None,
result: str = "echo: hi",
) -> MagicMock:
"""构造一个 mock 工具,模拟 Tool 接口。"""
tool = MagicMock()
tool.name = name
tool.description = description
tool.input_schema = input_schema or {"type": "object", "properties": {}}
tool.safe_execute = AsyncMock(return_value=result)
return tool
def _make_mock_registry(tools: list) -> MagicMock:
"""构造一个 mock ToolRegistry支持 list_tools() 和 get(name)。"""
registry = MagicMock()
registry.list_tools.return_value = tools
def _get(name: str):
for t in tools:
if t.name == name:
return t
raise KeyError(name)
registry.get = _get
return registry
def _make_jwt(
role: str = "member",
user_id: str = "u1",
username: str = "alice",
) -> str:
"""签发一个测试用 access JWTtype=accessHS256"""
payload = {
"sub": user_id,
"username": username,
"role": role,
"type": "access",
"iat": 1700000000,
"exp": 9999999999, # 远未来,避免过期
}
token = jwt.encode(payload, JWT_SECRET, algorithm="HS256")
return token.decode("utf-8") if isinstance(token, bytes) else token
def _make_registry_with_two_tools() -> MagicMock:
"""构造含两个 mock 工具的 registryecho + reverse"""
echo = _make_mock_tool(name="echo", description="Echo input", result="echo: hi")
reverse = _make_mock_tool(
name="reverse",
description="Reverse text",
result="dcba",
)
return _make_mock_registry([echo, reverse])
# ── 1-2: 无认证 / 无效凭据 ────────────────────────────────
class TestNoAuth:
"""无认证或无效凭据应返回 401。"""
async def test_no_auth_returns_401(self):
"""场景 1: 不带 Authorization 头 → 401。"""
app = _make_app(tool_registry=_make_registry_with_two_tools())
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get("/api/v1/mcp/tools/list")
assert resp.status_code == 401
async def test_invalid_api_key_returns_401(self):
"""场景 2: 无效 X-API-Key → 401。"""
app = _make_app(tool_registry=_make_registry_with_two_tools())
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get(
"/api/v1/mcp/tools/list",
headers={"X-API-Key": "invalid-key"},
)
assert resp.status_code == 401
# ── 3-5: 有效凭据 + 权限校验 ──────────────────────────────
class TestValidAuth:
"""有效凭据应通过认证,权限校验决定 200/403。"""
async def test_valid_api_key_returns_tool_list(self):
"""场景 3: 有效 X-API-Key → 200 + 工具列表。"""
app = _make_app(tool_registry=_make_registry_with_two_tools())
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get(
"/api/v1/mcp/tools/list",
headers={"X-API-Key": API_KEY},
)
assert resp.status_code == 200
body = resp.json()
assert "tools" in body
names = {t["name"] for t in body["tools"]}
assert names == {"echo", "reverse"}
async def test_valid_jwt_member_returns_tool_list(self):
"""场景 4: 有效 JWTmember 角色)→ 200 + 工具列表。"""
app = _make_app(tool_registry=_make_registry_with_two_tools())
token = _make_jwt(role="member")
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get(
"/api/v1/mcp/tools/list",
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 200
body = resp.json()
assert len(body["tools"]) == 2
async def test_valid_jwt_guest_no_permission_returns_403(self):
"""场景 5: 有效 JWT 但 role=guest无 CHAT 权限)→ 403。
guest 角色不在 ROLE_PERMISSIONS has_permission 返回 False
"""
app = _make_app(tool_registry=_make_registry_with_two_tools())
token = _make_jwt(role="guest")
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get(
"/api/v1/mcp/tools/list",
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 403
# ── 6-8: tools/call 端点 ─────────────────────────────────
class TestCallTool:
"""POST /api/v1/mcp/tools/call 的成功 / 404 / 执行错误。"""
async def test_call_tool_success(self):
"""场景 6: 有效认证 + 有效工具 → 200 + 结果。"""
registry = _make_registry_with_two_tools()
app = _make_app(tool_registry=registry)
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.post(
"/api/v1/mcp/tools/call",
json={"name": "echo", "arguments": {"text": "hi"}},
headers={"X-API-Key": API_KEY},
)
assert resp.status_code == 200
body = resp.json()
assert body["content"][0]["type"] == "text"
assert "echo: hi" in body["content"][0]["text"]
async def test_call_tool_unknown_returns_404(self):
"""场景 7: 有效认证 + 未知工具 → 404。"""
registry = _make_registry_with_two_tools()
app = _make_app(tool_registry=registry)
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.post(
"/api/v1/mcp/tools/call",
json={"name": "nonexistent", "arguments": {}},
headers={"X-API-Key": API_KEY},
)
assert resp.status_code == 404
async def test_call_tool_execution_error_returns_isError(self):
"""场景 8: 有效认证 + 工具抛异常 → 200 + isError=True。"""
failing_tool = _make_mock_tool(name="boom", result="should-not-reach")
failing_tool.safe_execute = AsyncMock(side_effect=RuntimeError("boom!"))
registry = _make_mock_registry([failing_tool])
app = _make_app(tool_registry=registry)
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.post(
"/api/v1/mcp/tools/call",
json={"name": "boom", "arguments": {}},
headers={"X-API-Key": API_KEY},
)
assert resp.status_code == 200
body = resp.json()
assert body.get("isError") is True
assert "boom!" in body["content"][0]["text"]
# ── 9: health 端点 ───────────────────────────────────────
class TestHealthEndpoint:
"""GET /api/v1/mcp/health — 有效认证 → 200。"""
async def test_health_with_valid_auth(self):
"""场景 9: 有效认证 → 200 + {"status": "ok"}。"""
app = _make_app(tool_registry=None)
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get(
"/api/v1/mcp/health",
headers={"X-API-Key": API_KEY},
)
assert resp.status_code == 200
assert resp.json() == {"status": "ok"}
# ── 10-14: JSON-RPC 2.0 端点 ─────────────────────────────
class TestJsonRpcEndpoint:
"""POST /api/v1/mcp/ — MCP 协议兼容的 JSON-RPC 2.0 端点。"""
async def test_jsonrpc_initialize(self):
"""场景 10: initialize → 200 + protocolVersion。"""
app = _make_app(tool_registry=_make_registry_with_two_tools())
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.post(
"/api/v1/mcp/",
json={"jsonrpc": "2.0", "method": "initialize", "id": 1},
headers={"X-API-Key": API_KEY},
)
assert resp.status_code == 200
body = resp.json()
assert body["jsonrpc"] == "2.0"
assert body["id"] == 1
assert body["result"]["protocolVersion"] == "2024-11-05"
assert body["result"]["serverInfo"]["name"] == "agentkit-mcp-server"
async def test_jsonrpc_tools_list(self):
"""场景 11: tools/list via JSON-RPC → 200 + tools 数组。"""
app = _make_app(tool_registry=_make_registry_with_two_tools())
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.post(
"/api/v1/mcp/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 2},
headers={"X-API-Key": API_KEY},
)
assert resp.status_code == 200
body = resp.json()
assert body["id"] == 2
assert len(body["result"]["tools"]) == 2
async def test_jsonrpc_tools_call(self):
"""场景 12: tools/call via JSON-RPC → 200 + 结果。"""
app = _make_app(tool_registry=_make_registry_with_two_tools())
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.post(
"/api/v1/mcp/",
json={
"jsonrpc": "2.0",
"method": "tools/call",
"params": {"name": "echo", "arguments": {}},
"id": 3,
},
headers={"X-API-Key": API_KEY},
)
assert resp.status_code == 200
body = resp.json()
assert body["id"] == 3
assert "content" in body["result"]
async def test_jsonrpc_unknown_method_returns_error_32601(self):
"""场景 13: 未知 method → error code -32601。"""
app = _make_app(tool_registry=None)
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.post(
"/api/v1/mcp/",
json={"jsonrpc": "2.0", "method": "unknown", "id": 4},
headers={"X-API-Key": API_KEY},
)
assert resp.status_code == 200
body = resp.json()
assert body["error"]["code"] == -32601
assert body["id"] == 4
async def test_jsonrpc_parse_error_returns_error_32700(self):
"""场景 14: 无效 JSON → error code -32700。"""
app = _make_app(tool_registry=None)
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.post(
"/api/v1/mcp/",
content=b"this is not json",
headers={
"X-API-Key": API_KEY,
"Content-Type": "application/json",
},
)
assert resp.status_code == 200
body = resp.json()
assert body["error"]["code"] == -32700
assert body["id"] is None
# ── 15: MCPServer 向后兼容 ───────────────────────────────
class TestMCPServerDeprecation:
"""旧 MCPServer 类应发出 DeprecationWarning。"""
def test_create_app_emits_deprecation_warning(self):
"""场景 15: MCPServer()._create_app() 发出 DeprecationWarning。"""
server = MCPServer()
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
server._create_app()
dep_warnings = [w for w in caught if issubclass(w.category, DeprecationWarning)]
assert len(dep_warnings) >= 1
assert "create_mcp_router" in str(dep_warnings[0].message)
# ── 16-17: create_mcp_router 工厂 ────────────────────────
class TestCreateMcpRouter:
"""create_mcp_router() 工厂行为。"""
def test_create_mcp_router_returns_apirouter(self):
"""场景 16: create_mcp_router() 返回 APIRouter 实例。"""
router = create_mcp_router(tool_registry=None)
assert isinstance(router, APIRouter)
# 验证路由数量tools/list, tools/call, health, /JSON-RPC
paths = {route.path for route in router.routes}
assert "/tools/list" in paths
assert "/tools/call" in paths
assert "/health" in paths
assert "/" in paths
async def test_empty_registry_returns_empty_list(self):
"""场景 17: tool_registry=None → /tools/list 返回 {"tools": []}。"""
app = _make_app(tool_registry=None)
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
resp = await client.get(
"/api/v1/mcp/tools/list",
headers={"X-API-Key": API_KEY},
)
assert resp.status_code == 200
assert resp.json() == {"tools": []}