#!/usr/bin/env python3 """轻量表单接收服务 - 收到咨询后推送飞书通知""" import json, os, re from http.server import HTTPServer, BaseHTTPRequestHandler from datetime import datetime # 飞书机器人 Webhook URL(请老板替换为实际地址) FEISHU_WEBHOOK = os.getenv("FEISHU_WEBHOOK_URL", "") # 本地存储路径 DATA_DIR = "/opt/ai-landing/data" os.makedirs(DATA_DIR, exist_ok=True) class Handler(BaseHTTPRequestHandler): def do_POST(self): if self.path != "/api/consult": self._send(404, {"error": "not found"}) return length = int(self.headers.get("Content-Length", 0)) if length > 50000: self._send(400, {"error": "payload too large"}) return body = self.rfile.read(length) try: data = json.loads(body) except Exception: self._send(400, {"error": "invalid json"}) return # 验证必填字段 name = data.get("name", "").strip() phone = data.get("phone", "").strip() email = data.get("email", "").strip() message = data.get("message", "").strip() if not name or not phone: self._send(400, {"error": "姓名和电话为必填项"}) return # 手机号简单校验 if not re.match(r"^1\d{10}$", phone): self._send(400, {"error": "手机号格式不正确"}) return # 保存到本地 record = { "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "name": name, "phone": phone, "email": email, "message": message } self._save(record) # 推送飞书 if FEISHU_WEBHOOK: self._push_feishu(record) self._send(200, {"success": True}) def do_GET(self): if self.path == "/api/consult/health": self._send(200, {"ok": True}) return self._send(405, {"error": "method not allowed"}) def _save(self, record): path = os.path.join(DATA_DIR, f"{datetime.now().strftime('%Y-%m')}.json") records = [] if os.path.exists(path): with open(path) as f: try: records = json.load(f) except: records = [] records.append(record) with open(path, "w") as f: json.dump(records, f, ensure_ascii=False, indent=2) def _push_feishu(self, record): try: import urllib.request msg = f"**新咨询收到**\n" msg += f"姓名:{record['name']}\n" msg += f"电话:{record['phone']}\n" if record['email']: msg += f"邮箱:{record['email']}\n" if record['message']: msg += f"留言:{record['message']}\n" msg += f"时间:{record['time']}" payload = json.dumps({"msg_type": "interactive", "card": { "header": {"title": {"tag": "plain_text", "content": "🔔 官网新咨询"}, "template": "turquoise"}, "elements": [{"tag": "markdown", "content": msg}] }}).encode() req = urllib.request.Request(FEISHU_WEBHOOK, data=payload, headers={"Content-Type": "application/json"}) urllib.request.urlopen(req, timeout=5) except Exception as e: print(f"飞书推送失败: {e}") def _send(self, code, data): self.send_response(code) self.send_header("Content-Type", "application/json") self.send_header("Access-Control-Allow-Origin", "*") self.send_header("Access-Control-Allow-Methods", "POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "Content-Type") self.end_headers() self.wfile.write(json.dumps(data).encode()) def log_message(self, fmt, *args): pass # 静默日志 if __name__ == "__main__": server = HTTPServer(("127.0.0.1", 4001), Handler) print("咨询接收服务启动在 :4001") server.serve_forever()