fischer-site/consult-service.py

407 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
菲西尔官网咨询表单接收服务
支持飞书多维表格Bitable集成
"""
import json
import os
import re
from datetime import datetime
from http.server import HTTPServer, BaseHTTPRequestHandler
from typing import Dict, Any, Optional
import urllib.request
import urllib.error
class FeishuConfig:
"""飞书配置"""
APP_ID = os.getenv("FEISHU_APP_ID", "")
APP_SECRET = os.getenv("FEISHU_APP_SECRET", "")
BITABLE_APP_TOKEN = os.getenv("FEISHU_BITABLE_APP_TOKEN", "")
BITABLE_TABLE_ID = os.getenv("FEISHU_BITABLE_TABLE_ID", "")
WEBHOOK_URL = os.getenv("FEISHU_WEBHOOK_URL", "")
class FeishuBitableClient:
"""飞书多维表格客户端"""
def __init__(self, config: FeishuConfig):
self.config = config
self.base_url = "https://open.feishu.cn/open-apis"
self._access_token: Optional[str] = None
def get_access_token(self) -> str:
"""获取 tenant_access_token"""
url = f"{self.base_url}/auth/v3/tenant_access_token/internal"
payload = json.dumps({
"app_id": self.config.APP_ID,
"app_secret": self.config.APP_SECRET
}).encode()
req = urllib.request.Request(url, data=payload, method="POST")
req.add_header("Content-Type", "application/json")
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read())
if data.get("code") != 0:
raise Exception(f"获取飞书 access_token 失败: {data.get('msg')}")
self._access_token = data["tenant_access_token"]
print(f"✓ 成功获取飞书 access_token")
return self._access_token
def _request(self, method: str, path: str, data: Dict = None) -> Dict[str, Any]:
"""发送API请求"""
if not self._access_token:
self.get_access_token()
url = f"{self.base_url}{path}"
payload = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=payload, method=method)
req.add_header("Authorization", f"Bearer {self._access_token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=15) as resp:
result = json.loads(resp.read())
if result.get("code") != 0:
raise Exception(f"飞书 API 调用失败: {result.get('msg')}")
return result.get("data", {})
except urllib.error.HTTPError as e:
error_body = e.read().decode()
raise Exception(f"HTTP {e.code}: {error_body}")
def create_record(self, fields: Dict[str, Any]) -> Dict[str, Any]:
"""创建单条记录
Args:
fields: 字段数据
Returns:
创建的记录信息
"""
path = f"/bitable/v1/apps/{self.config.BITABLE_APP_TOKEN}/tables/{self.config.BITABLE_TABLE_ID}/records"
return self._request("POST", path, {"fields": fields})
def batch_create_records(self, records: list) -> Dict[str, Any]:
"""批量创建记录
Args:
records: 记录列表,每条记录包含 fields
Returns:
创建结果
"""
path = f"/bitable/v1/apps/{self.config.BITABLE_APP_TOKEN}/tables/{self.config.BITABLE_TABLE_ID}/records/batch_create"
return self._request("POST", path, {"records": records})
class ConsultService:
"""咨询表单服务"""
def __init__(self):
self.feishu_config = FeishuConfig()
self.data_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data")
os.makedirs(self.data_dir, exist_ok=True)
# 初始化飞书客户端
self.bitable_client = None
if self.feishu_config.APP_ID and self.feishu_config.APP_SECRET and self.feishu_config.BITABLE_APP_TOKEN and self.feishu_config.BITABLE_TABLE_ID:
try:
self.bitable_client = FeishuBitableClient(self.feishu_config)
print("✓ 飞书多维表格客户端初始化成功")
except Exception as e:
print(f"⚠ 飞书多维表格初始化失败: {e}")
else:
print("⚠ 飞书配置未完成,将仅保存本地数据")
def validate_phone(self, phone: str) -> bool:
"""验证手机号"""
return bool(re.match(r"^1\d{10}$", phone.strip()))
def validate_email(self, email: str) -> bool:
"""验证邮箱"""
if not email:
return True # 邮箱为选填
return bool(re.match(r"^[^\s@]+@[^\s@]+\.[^\s@]+$", email.strip()))
def validate_name(self, name: str) -> bool:
"""验证姓名"""
name = name.strip()
return 2 <= len(name) <= 50
def process_consult(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""处理咨询数据
Args:
data: 表单提交的数据
Returns:
处理结果
"""
# 提取字段
name = data.get("name", "").strip()
title = data.get("title", "先生").strip()
phone = data.get("phone", "").strip()
email = data.get("email", "").strip()
company = data.get("company", "").strip()
concern = data.get("concern", []) # 关注目标
# 兼容旧字段
current_system = data.get("current_system", [])
system_name = data.get("system_name", "").strip()
message = data.get("message", "").strip()
# 验证必填字段
errors = []
if not name or not self.validate_name(name):
errors.append("姓名格式不正确2-50个字符")
if not phone or not self.validate_phone(phone):
errors.append("手机号格式不正确")
if email and not self.validate_email(email):
errors.append("邮箱格式不正确")
if errors:
return {
"success": False,
"error": "; ".join(errors)
}
# 构建记录
record = {
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"name": name,
"title": title,
"phone": phone,
"email": email,
"company": company,
"concern": ",".join(concern) if concern else "",
"current_system": ",".join(current_system) if current_system else "",
"system_name": system_name,
"message": message,
"source": "官网表单"
}
# 保存本地
self._save_local(record)
# 同步飞书
self._sync_feishu(record)
# 发送飞书通知
if self.feishu_config.WEBHOOK_URL:
self._send_feishu_notification(record)
return {"success": True, "message": "提交成功"}
def _save_local(self, record: Dict[str, Any]):
"""保存到本地文件"""
try:
month_file = os.path.join(
self.data_dir,
f"{datetime.now().strftime('%Y-%m')}.json"
)
records = []
if os.path.exists(month_file):
with open(month_file, 'r', encoding='utf-8') as f:
try:
records = json.load(f)
except json.JSONDecodeError:
records = []
records.append(record)
with open(month_file, 'w', encoding='utf-8') as f:
json.dump(records, f, ensure_ascii=False, indent=2)
print(f"✓ 本地保存成功: {month_file}")
except Exception as e:
print(f"⚠ 本地保存失败: {e}")
def _sync_feishu(self, record: Dict[str, Any]):
"""同步到飞书多维表格"""
if not self.bitable_client:
return
try:
# 构建飞书字段格式(适配新表单字段)
fields = {
"姓名": record["name"] + record["title"],
"手机": record["phone"],
"处理状态": "待联系" # 默认状态
}
if record["email"]:
fields["邮箱"] = record["email"]
if record["company"]:
fields["公司"] = record["company"]
if record["concern"]:
fields["关注目标"] = record["concern"].split(",")
# 创建记录
result = self.bitable_client.create_record(fields)
record_id = result.get("record", {}).get("record_id", "")
print(f"✓ 飞书多维表格记录创建成功: {record_id}")
except Exception as e:
print(f"⚠ 飞书多维表格同步失败: {e}")
def _send_feishu_notification(self, record: Dict[str, Any]):
"""发送飞书机器人通知"""
try:
concern_text = record["concern"] or "未填写"
company_text = record["company"] or "未填写"
msg = {
"msg_type": "interactive",
"card": {
"header": {
"title": {
"tag": "plain_text",
"content": "🔔 官网新咨询"
},
"template": "turquoise"
},
"elements": [
{
"tag": "markdown",
"content": (
f"**客户信息**\n"
f"- 姓名:{record['name']}{record['title']}\n"
f"- 电话:{record['phone']}\n"
f"- 公司:{company_text}\n\n"
f"**关注目标**\n{concern_text}\n\n"
f"⏰ 时间:{record['time']}"
)
}
]
}
}
payload = json.dumps(msg).encode()
req = urllib.request.Request(
self.feishu_config.WEBHOOK_URL,
data=payload,
headers={"Content-Type": "application/json"}
)
with urllib.request.urlopen(req, timeout=5) as resp:
if resp.status == 200:
print("✓ 飞书通知发送成功")
else:
print(f"⚠ 飞书通知响应异常: {resp.status}")
except Exception as e:
print(f"⚠ 飞书通知发送失败: {e}")
class RequestHandler(BaseHTTPRequestHandler):
"""HTTP 请求处理器"""
def __init__(self, *args, **kwargs):
self.service = ConsultService()
super().__init__(*args, **kwargs)
def do_POST(self):
"""处理 POST 请求"""
if self.path != "/api/consult":
self._send(404, {"error": "not found"})
return
# 检查 Content-Length
length = int(self.headers.get("Content-Length", 0))
if length > 50000: # 限制 50KB
self._send(400, {"error": "payload too large"})
return
if length == 0:
self._send(400, {"error": "empty request body"})
return
# 读取请求体
body = self.rfile.read(length)
# 解析 JSON
try:
data = json.loads(body)
except json.JSONDecodeError:
self._send(400, {"error": "invalid json"})
return
# 处理咨询
result = self.service.process_consult(data)
if result.get("success"):
self._send(200, result)
else:
self._send(400, result)
def do_GET(self):
"""处理 GET 请求"""
if self.path == "/api/consult/health":
self._send(200, {"ok": True, "service": "consult-api"})
elif self.path == "/api/consult":
self._send(200, {
"service": "consult-api",
"version": "1.0.0",
"endpoints": {
"POST /api/consult": "提交咨询表单"
}
})
else:
self._send(404, {"error": "not found"})
def do_OPTIONS(self):
"""处理 CORS 预检请求"""
self.send_response(200)
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.send_header("Access-Control-Max-Age", "86400")
self.end_headers()
def _send(self, code: int, data: Dict[str, Any]):
"""发送响应"""
self.send_response(code)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "Content-Type")
self.end_headers()
self.wfile.write(json.dumps(data, ensure_ascii=False).encode())
def log_message(self, fmt, *args):
"""自定义日志格式"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] {fmt % args}")
def main():
"""启动服务"""
port = int(os.getenv("PORT", 4001))
print("=" * 60)
print("菲西尔官网咨询表单接收服务")
print("=" * 60)
print(f"服务地址http://0.0.0.0:{port}")
print(f"API 端点POST /api/consult")
print(f"健康检查GET /api/consult/health")
print("=" * 60)
server = HTTPServer(("0.0.0.0", port), RequestHandler)
try:
server.serve_forever()
except KeyboardInterrupt:
print("\n服务已停止")
server.shutdown()
if __name__ == "__main__":
main()