geo/backend/app/services/key_verifier.py

326 lines
12 KiB
Python

import logging
import os
from abc import ABC, abstractmethod
from enum import Enum
import httpx
logger = logging.getLogger(__name__)
class KeyStatus(str, Enum):
ACTIVE = "active"
INVALID = "invalid"
EXPIRED = "expired"
RATE_LIMITED = "rate_limited"
UNKNOWN = "unknown"
_VERIFY_TIMEOUT = 15.0
class KeyVerifier(ABC):
@abstractmethod
async def verify(self, api_key: str) -> KeyStatus:
pass
class DefaultKeyVerifier(KeyVerifier):
async def verify(self, api_key: str) -> KeyStatus:
return KeyStatus.UNKNOWN
class ChatGPTKeyVerifier(KeyVerifier):
_BASE_URL = "https://api.openai.com/v1"
async def verify(self, api_key: str) -> KeyStatus:
proxy = os.getenv("OPENAI_PROXY") or os.getenv("HTTPS_PROXY")
try:
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=10.0, read=_VERIFY_TIMEOUT, write=10.0, pool=10.0),
proxy=proxy,
) as client:
response = await client.post(
f"{self._BASE_URL}/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json={
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "hi"}],
"max_tokens": 5,
},
)
if response.status_code == 200:
return KeyStatus.ACTIVE
elif response.status_code == 401:
return KeyStatus.INVALID
elif response.status_code == 429:
return KeyStatus.RATE_LIMITED
elif response.status_code == 403:
return KeyStatus.EXPIRED
else:
logger.warning(f"[chatgpt] Unexpected status {response.status_code}")
return KeyStatus.UNKNOWN
except httpx.TimeoutException:
logger.warning("[chatgpt] Verification timeout")
return KeyStatus.UNKNOWN
except httpx.ConnectError as e:
logger.warning(f"[chatgpt] Connection error: {e}")
return KeyStatus.UNKNOWN
except Exception as e:
logger.error(f"[chatgpt] Verification error: {e}")
return KeyStatus.UNKNOWN
class DeepSeekKeyVerifier(KeyVerifier):
_BASE_URL = "https://api.deepseek.com/v1"
async def verify(self, api_key: str) -> KeyStatus:
proxy = os.getenv("DEEPSEEK_PROXY") or os.getenv("HTTPS_PROXY")
try:
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=10.0, read=_VERIFY_TIMEOUT, write=10.0, pool=10.0),
proxy=proxy,
) as client:
response = await client.post(
f"{self._BASE_URL}/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json={
"model": "deepseek-chat",
"messages": [{"role": "user", "content": "hi"}],
"max_tokens": 5,
},
)
if response.status_code == 200:
return KeyStatus.ACTIVE
elif response.status_code == 401:
return KeyStatus.INVALID
elif response.status_code == 429:
return KeyStatus.RATE_LIMITED
elif response.status_code == 403:
return KeyStatus.EXPIRED
else:
logger.warning(f"[deepseek] Unexpected status {response.status_code}")
return KeyStatus.UNKNOWN
except httpx.TimeoutException:
logger.warning("[deepseek] Verification timeout")
return KeyStatus.UNKNOWN
except httpx.ConnectError as e:
logger.warning(f"[deepseek] Connection error: {e}")
return KeyStatus.UNKNOWN
except Exception as e:
logger.error(f"[deepseek] Verification error: {e}")
return KeyStatus.UNKNOWN
class KimiKeyVerifier(KeyVerifier):
_BASE_URL = "https://api.moonshot.cn/v1"
async def verify(self, api_key: str) -> KeyStatus:
proxy = os.getenv("HTTPS_PROXY")
try:
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=10.0, read=_VERIFY_TIMEOUT, write=10.0, pool=10.0),
proxy=proxy,
) as client:
response = await client.post(
f"{self._BASE_URL}/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json={
"model": "moonshot-v1-8k",
"messages": [{"role": "user", "content": "hi"}],
"max_tokens": 5,
},
)
if response.status_code == 200:
return KeyStatus.ACTIVE
elif response.status_code == 401:
return KeyStatus.INVALID
elif response.status_code == 429:
return KeyStatus.RATE_LIMITED
elif response.status_code == 403:
return KeyStatus.EXPIRED
else:
logger.warning(f"[kimi] Unexpected status {response.status_code}")
return KeyStatus.UNKNOWN
except httpx.TimeoutException:
logger.warning("[kimi] Verification timeout")
return KeyStatus.UNKNOWN
except httpx.ConnectError as e:
logger.warning(f"[kimi] Connection error: {e}")
return KeyStatus.UNKNOWN
except Exception as e:
logger.error(f"[kimi] Verification error: {e}")
return KeyStatus.UNKNOWN
class QwenKeyVerifier(KeyVerifier):
_BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1"
async def verify(self, api_key: str) -> KeyStatus:
proxy = os.getenv("HTTPS_PROXY")
try:
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=10.0, read=_VERIFY_TIMEOUT, write=10.0, pool=10.0),
proxy=proxy,
) as client:
response = await client.post(
f"{self._BASE_URL}/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json={
"model": "qwen-plus",
"messages": [{"role": "user", "content": "hi"}],
"max_tokens": 5,
},
)
if response.status_code == 200:
return KeyStatus.ACTIVE
elif response.status_code == 401:
return KeyStatus.INVALID
elif response.status_code == 429:
return KeyStatus.RATE_LIMITED
elif response.status_code == 403:
return KeyStatus.EXPIRED
else:
logger.warning(f"[qwen] Unexpected status {response.status_code}")
return KeyStatus.UNKNOWN
except httpx.TimeoutException:
logger.warning("[qwen] Verification timeout")
return KeyStatus.UNKNOWN
except httpx.ConnectError as e:
logger.warning(f"[qwen] Connection error: {e}")
return KeyStatus.UNKNOWN
except Exception as e:
logger.error(f"[qwen] Verification error: {e}")
return KeyStatus.UNKNOWN
class PerplexityKeyVerifier(KeyVerifier):
_BASE_URL = "https://api.perplexity.ai"
async def verify(self, api_key: str) -> KeyStatus:
proxy = os.getenv("HTTPS_PROXY")
try:
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=10.0, read=_VERIFY_TIMEOUT, write=10.0, pool=10.0),
proxy=proxy,
) as client:
response = await client.post(
f"{self._BASE_URL}/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json={
"model": "llama-3.1-sonar-small-128k-online",
"messages": [{"role": "user", "content": "hi"}],
"max_tokens": 5,
},
)
if response.status_code == 200:
return KeyStatus.ACTIVE
elif response.status_code == 401:
return KeyStatus.INVALID
elif response.status_code == 429:
return KeyStatus.RATE_LIMITED
elif response.status_code == 403:
return KeyStatus.EXPIRED
else:
logger.warning(f"[perplexity] Unexpected status {response.status_code}")
return KeyStatus.UNKNOWN
except httpx.TimeoutException:
logger.warning("[perplexity] Verification timeout")
return KeyStatus.UNKNOWN
except httpx.ConnectError as e:
logger.warning(f"[perplexity] Connection error: {e}")
return KeyStatus.UNKNOWN
except Exception as e:
logger.error(f"[perplexity] Verification error: {e}")
return KeyStatus.UNKNOWN
class GeminiKeyVerifier(KeyVerifier):
_BASE_URL = "https://generativelanguage.googleapis.com/v1beta"
async def verify(self, api_key: str) -> KeyStatus:
proxy = os.getenv("HTTPS_PROXY")
try:
async with httpx.AsyncClient(
timeout=httpx.Timeout(connect=10.0, read=_VERIFY_TIMEOUT, write=10.0, pool=10.0),
proxy=proxy,
) as client:
response = await client.get(
f"{self._BASE_URL}/models",
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
params={"key": api_key},
)
if response.status_code == 200:
return KeyStatus.ACTIVE
elif response.status_code == 403:
return KeyStatus.INVALID
elif response.status_code == 429:
return KeyStatus.RATE_LIMITED
else:
logger.warning(f"[gemini] Unexpected status {response.status_code}")
return KeyStatus.UNKNOWN
except httpx.TimeoutException:
logger.warning("[gemini] Verification timeout")
return KeyStatus.UNKNOWN
except httpx.ConnectError as e:
logger.warning(f"[gemini] Connection error: {e}")
return KeyStatus.UNKNOWN
except Exception as e:
logger.error(f"[gemini] Verification error: {e}")
return KeyStatus.UNKNOWN
class WenxinKeyVerifier(KeyVerifier):
async def verify(self, api_key: str) -> KeyStatus:
return KeyStatus.UNKNOWN
class DoubaoKeyVerifier(KeyVerifier):
async def verify(self, api_key: str) -> KeyStatus:
return KeyStatus.UNKNOWN
class YuanbaoKeyVerifier(KeyVerifier):
async def verify(self, api_key: str) -> KeyStatus:
return KeyStatus.UNKNOWN
class KeyVerifierFactory:
_VERIFIERS = {
"chatgpt": ChatGPTKeyVerifier,
"perplexity": PerplexityKeyVerifier,
"kimi": KimiKeyVerifier,
"wenxin": WenxinKeyVerifier,
"doubao": DoubaoKeyVerifier,
"deepseek": DeepSeekKeyVerifier,
"qwen": QwenKeyVerifier,
"gemini": GeminiKeyVerifier,
"yuanbao": YuanbaoKeyVerifier,
}
@classmethod
def get_verifier(cls, engine_type: str) -> KeyVerifier:
verifier_class = cls._VERIFIERS.get(engine_type)
if not verifier_class:
return DefaultKeyVerifier()
return verifier_class()
@classmethod
async def verify(cls, engine_type: str, api_key: str) -> KeyStatus:
verifier = cls.get_verifier(engine_type)
return await verifier.verify(api_key)