407 lines
14 KiB
Python
407 lines
14 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
菲西尔官网咨询表单接收服务
|
||
支持飞书多维表格(Bitable)集成
|
||
"""
|
||
import json
|
||
import os
|
||
import re
|
||
from datetime import datetime
|
||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||
from typing import Dict, Any, Optional
|
||
import urllib.request
|
||
import urllib.error
|
||
|
||
|
||
class FeishuConfig:
|
||
"""飞书配置"""
|
||
APP_ID = os.getenv("FEISHU_APP_ID", "")
|
||
APP_SECRET = os.getenv("FEISHU_APP_SECRET", "")
|
||
BITABLE_APP_TOKEN = os.getenv("FEISHU_BITABLE_APP_TOKEN", "")
|
||
BITABLE_TABLE_ID = os.getenv("FEISHU_BITABLE_TABLE_ID", "")
|
||
WEBHOOK_URL = os.getenv("FEISHU_WEBHOOK_URL", "")
|
||
|
||
|
||
class FeishuBitableClient:
|
||
"""飞书多维表格客户端"""
|
||
|
||
def __init__(self, config: FeishuConfig):
|
||
self.config = config
|
||
self.base_url = "https://open.feishu.cn/open-apis"
|
||
self._access_token: Optional[str] = None
|
||
|
||
def get_access_token(self) -> str:
|
||
"""获取 tenant_access_token"""
|
||
url = f"{self.base_url}/auth/v3/tenant_access_token/internal"
|
||
payload = json.dumps({
|
||
"app_id": self.config.APP_ID,
|
||
"app_secret": self.config.APP_SECRET
|
||
}).encode()
|
||
|
||
req = urllib.request.Request(url, data=payload, method="POST")
|
||
req.add_header("Content-Type", "application/json")
|
||
|
||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||
data = json.loads(resp.read())
|
||
|
||
if data.get("code") != 0:
|
||
raise Exception(f"获取飞书 access_token 失败: {data.get('msg')}")
|
||
|
||
self._access_token = data["tenant_access_token"]
|
||
print(f"✓ 成功获取飞书 access_token")
|
||
return self._access_token
|
||
|
||
def _request(self, method: str, path: str, data: Dict = None) -> Dict[str, Any]:
|
||
"""发送API请求"""
|
||
if not self._access_token:
|
||
self.get_access_token()
|
||
|
||
url = f"{self.base_url}{path}"
|
||
payload = json.dumps(data).encode() if data else None
|
||
|
||
req = urllib.request.Request(url, data=payload, method=method)
|
||
req.add_header("Authorization", f"Bearer {self._access_token}")
|
||
req.add_header("Content-Type", "application/json")
|
||
|
||
try:
|
||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||
result = json.loads(resp.read())
|
||
|
||
if result.get("code") != 0:
|
||
raise Exception(f"飞书 API 调用失败: {result.get('msg')}")
|
||
|
||
return result.get("data", {})
|
||
except urllib.error.HTTPError as e:
|
||
error_body = e.read().decode()
|
||
raise Exception(f"HTTP {e.code}: {error_body}")
|
||
|
||
def create_record(self, fields: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""创建单条记录
|
||
|
||
Args:
|
||
fields: 字段数据
|
||
|
||
Returns:
|
||
创建的记录信息
|
||
"""
|
||
path = f"/bitable/v1/apps/{self.config.BITABLE_APP_TOKEN}/tables/{self.config.BITABLE_TABLE_ID}/records"
|
||
return self._request("POST", path, {"fields": fields})
|
||
|
||
def batch_create_records(self, records: list) -> Dict[str, Any]:
|
||
"""批量创建记录
|
||
|
||
Args:
|
||
records: 记录列表,每条记录包含 fields
|
||
|
||
Returns:
|
||
创建结果
|
||
"""
|
||
path = f"/bitable/v1/apps/{self.config.BITABLE_APP_TOKEN}/tables/{self.config.BITABLE_TABLE_ID}/records/batch_create"
|
||
return self._request("POST", path, {"records": records})
|
||
|
||
|
||
class ConsultService:
|
||
"""咨询表单服务"""
|
||
|
||
def __init__(self):
|
||
self.feishu_config = FeishuConfig()
|
||
self.data_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data")
|
||
os.makedirs(self.data_dir, exist_ok=True)
|
||
|
||
# 初始化飞书客户端
|
||
self.bitable_client = None
|
||
if self.feishu_config.APP_ID and self.feishu_config.APP_SECRET and self.feishu_config.BITABLE_APP_TOKEN and self.feishu_config.BITABLE_TABLE_ID:
|
||
try:
|
||
self.bitable_client = FeishuBitableClient(self.feishu_config)
|
||
print("✓ 飞书多维表格客户端初始化成功")
|
||
except Exception as e:
|
||
print(f"⚠ 飞书多维表格初始化失败: {e}")
|
||
else:
|
||
print("⚠ 飞书配置未完成,将仅保存本地数据")
|
||
|
||
def validate_phone(self, phone: str) -> bool:
|
||
"""验证手机号"""
|
||
return bool(re.match(r"^1\d{10}$", phone.strip()))
|
||
|
||
def validate_email(self, email: str) -> bool:
|
||
"""验证邮箱"""
|
||
if not email:
|
||
return True # 邮箱为选填
|
||
return bool(re.match(r"^[^\s@]+@[^\s@]+\.[^\s@]+$", email.strip()))
|
||
|
||
def validate_name(self, name: str) -> bool:
|
||
"""验证姓名"""
|
||
name = name.strip()
|
||
return 2 <= len(name) <= 50
|
||
|
||
def process_consult(self, data: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""处理咨询数据
|
||
|
||
Args:
|
||
data: 表单提交的数据
|
||
|
||
Returns:
|
||
处理结果
|
||
"""
|
||
# 提取字段
|
||
name = data.get("name", "").strip()
|
||
title = data.get("title", "先生").strip()
|
||
phone = data.get("phone", "").strip()
|
||
email = data.get("email", "").strip()
|
||
company = data.get("company", "").strip()
|
||
concern = data.get("concern", []) # 关注目标
|
||
# 兼容旧字段
|
||
current_system = data.get("current_system", [])
|
||
system_name = data.get("system_name", "").strip()
|
||
message = data.get("message", "").strip()
|
||
|
||
# 验证必填字段
|
||
errors = []
|
||
if not name or not self.validate_name(name):
|
||
errors.append("姓名格式不正确(2-50个字符)")
|
||
if not phone or not self.validate_phone(phone):
|
||
errors.append("手机号格式不正确")
|
||
if email and not self.validate_email(email):
|
||
errors.append("邮箱格式不正确")
|
||
|
||
if errors:
|
||
return {
|
||
"success": False,
|
||
"error": "; ".join(errors)
|
||
}
|
||
|
||
# 构建记录
|
||
record = {
|
||
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||
"name": name,
|
||
"title": title,
|
||
"phone": phone,
|
||
"email": email,
|
||
"company": company,
|
||
"concern": ",".join(concern) if concern else "",
|
||
"current_system": ",".join(current_system) if current_system else "",
|
||
"system_name": system_name,
|
||
"message": message,
|
||
"source": "官网表单"
|
||
}
|
||
|
||
# 保存本地
|
||
self._save_local(record)
|
||
|
||
# 同步飞书
|
||
self._sync_feishu(record)
|
||
|
||
# 发送飞书通知
|
||
if self.feishu_config.WEBHOOK_URL:
|
||
self._send_feishu_notification(record)
|
||
|
||
return {"success": True, "message": "提交成功"}
|
||
|
||
def _save_local(self, record: Dict[str, Any]):
|
||
"""保存到本地文件"""
|
||
try:
|
||
month_file = os.path.join(
|
||
self.data_dir,
|
||
f"{datetime.now().strftime('%Y-%m')}.json"
|
||
)
|
||
|
||
records = []
|
||
if os.path.exists(month_file):
|
||
with open(month_file, 'r', encoding='utf-8') as f:
|
||
try:
|
||
records = json.load(f)
|
||
except json.JSONDecodeError:
|
||
records = []
|
||
|
||
records.append(record)
|
||
|
||
with open(month_file, 'w', encoding='utf-8') as f:
|
||
json.dump(records, f, ensure_ascii=False, indent=2)
|
||
|
||
print(f"✓ 本地保存成功: {month_file}")
|
||
except Exception as e:
|
||
print(f"⚠ 本地保存失败: {e}")
|
||
|
||
def _sync_feishu(self, record: Dict[str, Any]):
|
||
"""同步到飞书多维表格"""
|
||
if not self.bitable_client:
|
||
return
|
||
|
||
try:
|
||
# 构建飞书字段格式(适配新表单字段)
|
||
fields = {
|
||
"姓名": record["name"] + record["title"],
|
||
"手机": record["phone"],
|
||
"处理状态": "待联系" # 默认状态
|
||
}
|
||
|
||
if record["email"]:
|
||
fields["邮箱"] = record["email"]
|
||
|
||
if record["company"]:
|
||
fields["公司"] = record["company"]
|
||
|
||
if record["concern"]:
|
||
fields["关注目标"] = record["concern"].split(",")
|
||
|
||
# 创建记录
|
||
result = self.bitable_client.create_record(fields)
|
||
record_id = result.get("record", {}).get("record_id", "")
|
||
print(f"✓ 飞书多维表格记录创建成功: {record_id}")
|
||
|
||
except Exception as e:
|
||
print(f"⚠ 飞书多维表格同步失败: {e}")
|
||
|
||
def _send_feishu_notification(self, record: Dict[str, Any]):
|
||
"""发送飞书机器人通知"""
|
||
try:
|
||
concern_text = record["concern"] or "未填写"
|
||
company_text = record["company"] or "未填写"
|
||
|
||
msg = {
|
||
"msg_type": "interactive",
|
||
"card": {
|
||
"header": {
|
||
"title": {
|
||
"tag": "plain_text",
|
||
"content": "🔔 官网新咨询"
|
||
},
|
||
"template": "turquoise"
|
||
},
|
||
"elements": [
|
||
{
|
||
"tag": "markdown",
|
||
"content": (
|
||
f"**客户信息**\n"
|
||
f"- 姓名:{record['name']}{record['title']}\n"
|
||
f"- 电话:{record['phone']}\n"
|
||
f"- 公司:{company_text}\n\n"
|
||
f"**关注目标**\n{concern_text}\n\n"
|
||
f"⏰ 时间:{record['time']}"
|
||
)
|
||
}
|
||
]
|
||
}
|
||
}
|
||
|
||
payload = json.dumps(msg).encode()
|
||
req = urllib.request.Request(
|
||
self.feishu_config.WEBHOOK_URL,
|
||
data=payload,
|
||
headers={"Content-Type": "application/json"}
|
||
)
|
||
|
||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||
if resp.status == 200:
|
||
print("✓ 飞书通知发送成功")
|
||
else:
|
||
print(f"⚠ 飞书通知响应异常: {resp.status}")
|
||
|
||
except Exception as e:
|
||
print(f"⚠ 飞书通知发送失败: {e}")
|
||
|
||
|
||
class RequestHandler(BaseHTTPRequestHandler):
|
||
"""HTTP 请求处理器"""
|
||
|
||
def __init__(self, *args, **kwargs):
|
||
self.service = ConsultService()
|
||
super().__init__(*args, **kwargs)
|
||
|
||
def do_POST(self):
|
||
"""处理 POST 请求"""
|
||
if self.path != "/api/consult":
|
||
self._send(404, {"error": "not found"})
|
||
return
|
||
|
||
# 检查 Content-Length
|
||
length = int(self.headers.get("Content-Length", 0))
|
||
if length > 50000: # 限制 50KB
|
||
self._send(400, {"error": "payload too large"})
|
||
return
|
||
if length == 0:
|
||
self._send(400, {"error": "empty request body"})
|
||
return
|
||
|
||
# 读取请求体
|
||
body = self.rfile.read(length)
|
||
|
||
# 解析 JSON
|
||
try:
|
||
data = json.loads(body)
|
||
except json.JSONDecodeError:
|
||
self._send(400, {"error": "invalid json"})
|
||
return
|
||
|
||
# 处理咨询
|
||
result = self.service.process_consult(data)
|
||
|
||
if result.get("success"):
|
||
self._send(200, result)
|
||
else:
|
||
self._send(400, result)
|
||
|
||
def do_GET(self):
|
||
"""处理 GET 请求"""
|
||
if self.path == "/api/consult/health":
|
||
self._send(200, {"ok": True, "service": "consult-api"})
|
||
elif self.path == "/api/consult":
|
||
self._send(200, {
|
||
"service": "consult-api",
|
||
"version": "1.0.0",
|
||
"endpoints": {
|
||
"POST /api/consult": "提交咨询表单"
|
||
}
|
||
})
|
||
else:
|
||
self._send(404, {"error": "not found"})
|
||
|
||
def do_OPTIONS(self):
|
||
"""处理 CORS 预检请求"""
|
||
self.send_response(200)
|
||
self.send_header("Access-Control-Allow-Origin", "*")
|
||
self.send_header("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
|
||
self.send_header("Access-Control-Allow-Headers", "Content-Type")
|
||
self.send_header("Access-Control-Max-Age", "86400")
|
||
self.end_headers()
|
||
|
||
def _send(self, code: int, data: Dict[str, Any]):
|
||
"""发送响应"""
|
||
self.send_response(code)
|
||
self.send_header("Content-Type", "application/json")
|
||
self.send_header("Access-Control-Allow-Origin", "*")
|
||
self.send_header("Access-Control-Allow-Methods", "POST, GET, OPTIONS")
|
||
self.send_header("Access-Control-Allow-Headers", "Content-Type")
|
||
self.end_headers()
|
||
self.wfile.write(json.dumps(data, ensure_ascii=False).encode())
|
||
|
||
def log_message(self, fmt, *args):
|
||
"""自定义日志格式"""
|
||
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
print(f"[{timestamp}] {fmt % args}")
|
||
|
||
|
||
def main():
|
||
"""启动服务"""
|
||
port = int(os.getenv("PORT", 4001))
|
||
|
||
print("=" * 60)
|
||
print("菲西尔官网咨询表单接收服务")
|
||
print("=" * 60)
|
||
print(f"服务地址:http://0.0.0.0:{port}")
|
||
print(f"API 端点:POST /api/consult")
|
||
print(f"健康检查:GET /api/consult/health")
|
||
print("=" * 60)
|
||
|
||
server = HTTPServer(("0.0.0.0", port), RequestHandler)
|
||
|
||
try:
|
||
server.serve_forever()
|
||
except KeyboardInterrupt:
|
||
print("\n服务已停止")
|
||
server.shutdown()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|