#!/usr/bin/env python3 """ 菲西尔官网咨询表单接收服务 支持飞书多维表格(Bitable)集成 """ import json import os import re from datetime import datetime from http.server import HTTPServer, BaseHTTPRequestHandler from typing import Dict, Any, Optional import urllib.request import urllib.error class FeishuConfig: """飞书配置""" APP_ID = os.getenv("FEISHU_APP_ID", "") APP_SECRET = os.getenv("FEISHU_APP_SECRET", "") BITABLE_APP_TOKEN = os.getenv("FEISHU_BITABLE_APP_TOKEN", "") BITABLE_TABLE_ID = os.getenv("FEISHU_BITABLE_TABLE_ID", "") WEBHOOK_URL = os.getenv("FEISHU_WEBHOOK_URL", "") class FeishuBitableClient: """飞书多维表格客户端""" def __init__(self, config: FeishuConfig): self.config = config self.base_url = "https://open.feishu.cn/open-apis" self._access_token: Optional[str] = None def get_access_token(self) -> str: """获取 tenant_access_token""" url = f"{self.base_url}/auth/v3/tenant_access_token/internal" payload = json.dumps({ "app_id": self.config.APP_ID, "app_secret": self.config.APP_SECRET }).encode() req = urllib.request.Request(url, data=payload, method="POST") req.add_header("Content-Type", "application/json") with urllib.request.urlopen(req, timeout=10) as resp: data = json.loads(resp.read()) if data.get("code") != 0: raise Exception(f"获取飞书 access_token 失败: {data.get('msg')}") self._access_token = data["tenant_access_token"] print(f"✓ 成功获取飞书 access_token") return self._access_token def _request(self, method: str, path: str, data: Dict = None) -> Dict[str, Any]: """发送API请求""" if not self._access_token: self.get_access_token() url = f"{self.base_url}{path}" payload = json.dumps(data).encode() if data else None req = urllib.request.Request(url, data=payload, method=method) req.add_header("Authorization", f"Bearer {self._access_token}") req.add_header("Content-Type", "application/json") try: with urllib.request.urlopen(req, timeout=15) as resp: result = json.loads(resp.read()) if result.get("code") != 0: raise Exception(f"飞书 API 调用失败: {result.get('msg')}") return result.get("data", {}) except urllib.error.HTTPError as e: error_body = e.read().decode() raise Exception(f"HTTP {e.code}: {error_body}") def create_record(self, fields: Dict[str, Any]) -> Dict[str, Any]: """创建单条记录 Args: fields: 字段数据 Returns: 创建的记录信息 """ path = f"/bitable/v1/apps/{self.config.BITABLE_APP_TOKEN}/tables/{self.config.BITABLE_TABLE_ID}/records" return self._request("POST", path, {"fields": fields}) def batch_create_records(self, records: list) -> Dict[str, Any]: """批量创建记录 Args: records: 记录列表,每条记录包含 fields Returns: 创建结果 """ path = f"/bitable/v1/apps/{self.config.BITABLE_APP_TOKEN}/tables/{self.config.BITABLE_TABLE_ID}/records/batch_create" return self._request("POST", path, {"records": records}) class ConsultService: """咨询表单服务""" def __init__(self): self.feishu_config = FeishuConfig() self.data_dir = "/opt/ai-landing/data" os.makedirs(self.data_dir, exist_ok=True) # 初始化飞书客户端 self.bitable_client = None if self.feishu_config.APP_ID and self.feishu_config.BITABLE_APP_TOKEN: try: self.bitable_client = FeishuBitableClient(self.feishu_config) print("✓ 飞书多维表格客户端初始化成功") except Exception as e: print(f"⚠ 飞书多维表格初始化失败: {e}") else: print("⚠ 飞书配置未完成,将仅保存本地数据") def validate_phone(self, phone: str) -> bool: """验证手机号""" return bool(re.match(r"^1\d{10}$", phone.strip())) def validate_email(self, email: str) -> bool: """验证邮箱""" if not email: return True # 邮箱为选填 return bool(re.match(r"^[^\s@]+@[^\s@]+\.[^\s@]+$", email.strip())) def validate_name(self, name: str) -> bool: """验证姓名""" name = name.strip() return 2 <= len(name) <= 50 def process_consult(self, data: Dict[str, Any]) -> Dict[str, Any]: """处理咨询数据 Args: data: 表单提交的数据 Returns: 处理结果 """ # 提取字段 name = data.get("name", "").strip() phone = data.get("phone", "").strip() email = data.get("email", "").strip() current_system = data.get("current_system", []) system_name = data.get("system_name", "").strip() message = data.get("message", "").strip() # 验证必填字段 errors = [] if not name or not self.validate_name(name): errors.append("姓名格式不正确(2-50个字符)") if not phone or not self.validate_phone(phone): errors.append("手机号格式不正确") if email and not self.validate_email(email): errors.append("邮箱格式不正确") if errors: return { "success": False, "error": "; ".join(errors) } # 构建记录 record = { "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "name": name, "phone": phone, "email": email, "current_system": ",".join(current_system) if current_system else "", "system_name": system_name, "message": message, "source": "官网表单" } # 保存本地 self._save_local(record) # 同步飞书 self._sync_feishu(record) # 发送飞书通知 if self.feishu_config.WEBHOOK_URL: self._send_feishu_notification(record) return {"success": True, "message": "提交成功"} def _save_local(self, record: Dict[str, Any]): """保存到本地文件""" try: month_file = os.path.join( self.data_dir, f"{datetime.now().strftime('%Y-%m')}.json" ) records = [] if os.path.exists(month_file): with open(month_file, 'r', encoding='utf-8') as f: try: records = json.load(f) except json.JSONDecodeError: records = [] records.append(record) with open(month_file, 'w', encoding='utf-8') as f: json.dump(records, f, ensure_ascii=False, indent=2) print(f"✓ 本地保存成功: {month_file}") except Exception as e: print(f"⚠ 本地保存失败: {e}") def _sync_feishu(self, record: Dict[str, Any]): """同步到飞书多维表格""" if not self.bitable_client: return try: # 构建飞书字段格式 fields = { "姓名": record["name"], "手机": record["phone"], "咨询内容": record["message"], "处理状态": "待联系" # 默认状态 } if record["email"]: fields["邮箱"] = record["email"] if record["current_system"]: fields["当前使用系统"] = record["current_system"].split(",") if record["system_name"]: fields["系统名称"] = record["system_name"] # 创建记录 result = self.bitable_client.create_record(fields) record_id = result.get("record", {}).get("record_id", "") print(f"✓ 飞书多维表格记录创建成功: {record_id}") except Exception as e: print(f"⚠ 飞书多维表格同步失败: {e}") def _send_feishu_notification(self, record: Dict[str, Any]): """发送飞书机器人通知""" try: systems_text = record["current_system"] or "未填写" msg = { "msg_type": "interactive", "card": { "header": { "title": { "tag": "plain_text", "content": "🔔 官网新咨询" }, "template": "turquoise" }, "elements": [ { "tag": "markdown", "content": ( f"**客户信息**\n" f"- 姓名:{record['name']}\n" f"- 电话:{record['phone']}\n" f"- 邮箱:{record['email'] or '未填写'}\n\n" f"**业务信息**\n" f"- 当前系统:{systems_text}\n" f"- 系统名称:{record['system_name'] or '无'}\n\n" f"**咨询内容**\n{record['message'] or '无'}\n\n" f"⏰ 时间:{record['time']}" ) }, { "tag": "action", "actions": [ { "tag": "button", "text": {"tag": "plain_text", "content": "📞 立即联系"}, "type": "primary" } ] } ] } } payload = json.dumps(msg).encode() req = urllib.request.Request( self.feishu_config.WEBHOOK_URL, data=payload, headers={"Content-Type": "application/json"} ) with urllib.request.urlopen(req, timeout=5) as resp: if resp.status == 200: print("✓ 飞书通知发送成功") else: print(f"⚠ 飞书通知响应异常: {resp.status}") except Exception as e: print(f"⚠ 飞书通知发送失败: {e}") class RequestHandler(BaseHTTPRequestHandler): """HTTP 请求处理器""" def __init__(self, *args, **kwargs): self.service = ConsultService() super().__init__(*args, **kwargs) def do_POST(self): """处理 POST 请求""" if self.path != "/api/consult": self._send(404, {"error": "not found"}) return # 检查 Content-Length length = int(self.headers.get("Content-Length", 0)) if length > 50000: # 限制 50KB self._send(400, {"error": "payload too large"}) return if length == 0: self._send(400, {"error": "empty request body"}) return # 读取请求体 body = self.rfile.read(length) # 解析 JSON try: data = json.loads(body) except json.JSONDecodeError: self._send(400, {"error": "invalid json"}) return # 处理咨询 result = self.service.process_consult(data) if result.get("success"): self._send(200, result) else: self._send(400, result) def do_GET(self): """处理 GET 请求""" if self.path == "/api/consult/health": self._send(200, {"ok": True, "service": "consult-api"}) elif self.path == "/api/consult": self._send(200, { "service": "consult-api", "version": "1.0.0", "endpoints": { "POST /api/consult": "提交咨询表单" } }) else: self._send(404, {"error": "not found"}) def do_OPTIONS(self): """处理 CORS 预检请求""" self.send_response(200) self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "POST, GET, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") self.send_header("Access-Control-Max-Age", "86400") self.end_headers() def _send(self, code: int, data: Dict[str, Any]): """发送响应""" self.send_response(code) self.send_header("Content-Type", "application/json") self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "POST, GET, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") self.end_headers() self.wfile.write(json.dumps(data, ensure_ascii=False).encode()) def log_message(self, fmt, *args): """自定义日志格式""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] {fmt % args}") def main(): """启动服务""" port = int(os.getenv("PORT", 4001)) print("=" * 60) print("菲西尔官网咨询表单接收服务") print("=" * 60) print(f"服务地址:http://0.0.0.0:{port}") print(f"API 端点:POST /api/consult") print(f"健康检查:GET /api/consult/health") print("=" * 60) server = HTTPServer(("0.0.0.0", port), RequestHandler) try: server.serve_forever() except KeyboardInterrupt: print("\n服务已停止") server.shutdown() if __name__ == "__main__": main()