1030 lines
45 KiB
Python
1030 lines
45 KiB
Python
"""FastAPI Application Factory"""
|
||
|
||
import asyncio
|
||
import logging
|
||
import os
|
||
from contextlib import asynccontextmanager
|
||
|
||
from fastapi import FastAPI
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
|
||
from agentkit.core.agent_pool import AgentPool
|
||
from agentkit.core.event_queue import EventQueue, SubmissionQueue
|
||
from agentkit.llm.gateway import LLMGateway
|
||
from agentkit.llm.providers.anthropic import AnthropicProvider
|
||
from agentkit.llm.providers.gemini import GeminiProvider
|
||
from agentkit.llm.providers.openai import OpenAICompatibleProvider
|
||
from agentkit.mcp.manager import MCPManager
|
||
from agentkit.quality.gate import QualityGate
|
||
from agentkit.quality.output import OutputStandardizer
|
||
from agentkit.skills.base import Skill
|
||
from agentkit.skills.registry import SkillRegistry
|
||
from agentkit.tools.registry import ToolRegistry
|
||
from agentkit.tools.skill_install import SkillInstallTool
|
||
from agentkit.tools.skill_search import SkillSearchTool
|
||
from agentkit.server.config import ServerConfig, load_dotenv
|
||
from agentkit.server.routes import (
|
||
agents,
|
||
tasks,
|
||
skills,
|
||
llm,
|
||
llm_gateway as llm_gateway_routes,
|
||
health,
|
||
metrics,
|
||
ws,
|
||
evolution,
|
||
memory,
|
||
portal,
|
||
evolution_dashboard,
|
||
kb_management,
|
||
skill_management,
|
||
workflows,
|
||
chat,
|
||
config_sync,
|
||
terminal,
|
||
terminal_server,
|
||
terminal_whitelist,
|
||
settings,
|
||
experts,
|
||
system,
|
||
auth as auth_routes,
|
||
documents,
|
||
admin as admin_routes_module,
|
||
calendar as calendar_routes,
|
||
)
|
||
from agentkit.server.auth.jwt_utils import get_jwt_secret
|
||
from agentkit.server.auth.middleware import AuthMiddleware
|
||
from agentkit.server.middleware import RateLimitMiddleware
|
||
from agentkit.server.task_store import create_task_store
|
||
from agentkit.server.runner import BackgroundRunner
|
||
from agentkit.core.logging import setup_structured_logging
|
||
from agentkit.telemetry.setup import setup_telemetry
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def _build_llm_gateway(config: ServerConfig) -> LLMGateway:
|
||
"""Build LLMGateway from ServerConfig, registering all providers."""
|
||
# Initialize UsageStore if configured
|
||
usage_store = None
|
||
if config.usage_store:
|
||
try:
|
||
from agentkit.llm.providers.usage_store import create_usage_store
|
||
|
||
usage_store = create_usage_store(
|
||
backend=config.usage_store.get("backend", "memory"),
|
||
redis_url=config.usage_store.get("redis_url", "redis://localhost:6379"),
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"Failed to initialize usage store: {e}, using in-memory")
|
||
|
||
gateway = LLMGateway(config=config.llm_config, usage_store=usage_store)
|
||
|
||
for name, pconf in config.llm_config.providers.items():
|
||
if not pconf.api_key:
|
||
continue # Skip providers without API keys
|
||
try:
|
||
provider = _create_provider(name, pconf)
|
||
gateway.register_provider(name, provider)
|
||
except Exception as e:
|
||
logger.warning(f"Failed to register LLM provider '{name}': {e}")
|
||
|
||
return gateway
|
||
|
||
|
||
def _create_provider(name: str, pconf) -> object:
|
||
"""Create an LLM provider instance from ProviderConfig.
|
||
|
||
Shared by server app and CLI chat to avoid duplicated initialization logic.
|
||
"""
|
||
if pconf.type == "anthropic":
|
||
return AnthropicProvider(
|
||
api_key=pconf.api_key,
|
||
model=list(pconf.models.keys())[0] if pconf.models else "claude-sonnet-4-20250514",
|
||
max_tokens=pconf.max_tokens,
|
||
base_url=pconf.base_url or "https://api.anthropic.com",
|
||
timeout=pconf.timeout,
|
||
max_connections=pconf.max_connections,
|
||
max_keepalive_connections=pconf.max_keepalive_connections,
|
||
keepalive_expiry=pconf.keepalive_expiry,
|
||
)
|
||
elif pconf.type == "gemini":
|
||
return GeminiProvider(
|
||
api_key=pconf.api_key,
|
||
model=list(pconf.models.keys())[0] if pconf.models else "gemini-2.0-flash",
|
||
max_output_tokens=pconf.max_tokens,
|
||
base_url=pconf.base_url or "https://generativelanguage.googleapis.com",
|
||
timeout=pconf.timeout,
|
||
max_connections=pconf.max_connections,
|
||
max_keepalive_connections=pconf.max_keepalive_connections,
|
||
keepalive_expiry=pconf.keepalive_expiry,
|
||
)
|
||
else:
|
||
if not pconf.base_url:
|
||
raise ValueError(
|
||
f"Provider '{name}' is missing base_url. "
|
||
f"OpenAI-compatible providers require an explicit base_url in config."
|
||
)
|
||
return OpenAICompatibleProvider(
|
||
api_key=pconf.api_key,
|
||
base_url=pconf.base_url,
|
||
max_connections=pconf.max_connections,
|
||
max_keepalive_connections=pconf.max_keepalive_connections,
|
||
keepalive_expiry=pconf.keepalive_expiry,
|
||
timeout=pconf.timeout,
|
||
)
|
||
|
||
|
||
def _build_skill_registry(config: ServerConfig) -> SkillRegistry:
|
||
"""Build SkillRegistry from ServerConfig, loading all skill configs."""
|
||
registry = SkillRegistry()
|
||
skill_configs = config.load_skill_configs()
|
||
for skill_config in skill_configs:
|
||
skill = Skill(config=skill_config)
|
||
registry.register(skill)
|
||
return registry
|
||
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
# Startup
|
||
task_store = app.state.task_store
|
||
await task_store.start_cleanup()
|
||
|
||
# Start config watcher if server_config is available
|
||
server_config = getattr(app.state, "server_config", None)
|
||
if server_config is not None and server_config._config_path:
|
||
server_config.on_change = lambda cfg: _on_config_change(app, cfg)
|
||
server_config.watch_config()
|
||
# Store event loop reference for thread-safe config reload
|
||
app.state._event_loop = asyncio.get_running_loop()
|
||
logger.info("Config hot-reload enabled")
|
||
|
||
# Start MCP servers if configured
|
||
mcp_manager = getattr(app.state, "mcp_manager", None)
|
||
|
||
if mcp_manager is not None:
|
||
await mcp_manager.start_all()
|
||
|
||
# Restore conversation history from persistent store (async, in lifespan)
|
||
from agentkit.server.routes.portal import _conversation_store
|
||
|
||
await _conversation_store.restore_from_store()
|
||
|
||
# Ensure a default chat agent exists with memory + tools (both GUI and serve modes)
|
||
# Previously this only ran in GUI mode, which caused 'Agent default not found' errors
|
||
# when the frontend tried to create a chat session via REST/WebSocket on a serve-mode server.
|
||
if not app.state.agent_pool.list_agents():
|
||
from agentkit.core.config_driven import AgentConfig
|
||
from agentkit.memory.profile import MemoryStore
|
||
from agentkit.tools.memory_tool import MemoryTool
|
||
from agentkit.tools.shell import ShellTool
|
||
from agentkit.tools.web_search import WebSearchTool
|
||
from agentkit.tools.web_crawl import WebCrawlTool
|
||
from agentkit.tools.baidu_search import BaiduSearchTool
|
||
from agentkit.tools.document_tool import DocumentTool
|
||
from agentkit.documents.service import DocumentService
|
||
from agentkit.documents.db import init_documents_db
|
||
from agentkit.documents.renderers.word_renderer import WordRenderer
|
||
from agentkit.documents.renderers.excel_renderer import ExcelRenderer
|
||
from agentkit.documents.renderers.pdf_renderer import PDFRenderer
|
||
|
||
# Initialize memory store and build system prompt
|
||
memory_store = MemoryStore()
|
||
memory_store.ensure_defaults()
|
||
memory_snapshot = memory_store.load_all()
|
||
base_prompt = (
|
||
"你是一个有帮助的AI助手。请记住我们对话的上下文,并在后续对话中引用之前的内容。回答要清晰简洁,请使用中文回复。\n\n"
|
||
"重要提示:当你不确定事实信息、时事新闻或任何你不确信的话题时,"
|
||
"你必须先使用搜索工具查找准确和最新的信息,然后再回答。"
|
||
"中文内容优先使用 baidu_search 工具,英文/国际内容使用 web_search。"
|
||
"在能够搜索到真相的情况下,绝不猜测或编造答案。"
|
||
"始终优先搜索而不是给出可能不正确的信息。\n\n"
|
||
"技能安装:当需要查找或安装技能时,先用 skill_search 搜索确认技能名称和来源,"
|
||
"再用 skill_install 安装。不要用 shell 执行 npm install 或 npx skills install。"
|
||
)
|
||
effective_system_prompt = memory_store.build_system_prompt(memory_snapshot, base_prompt)
|
||
|
||
# Register on_change callback to refresh all agents' system prompts
|
||
# when MemoryTool writes to memory files
|
||
def _on_memory_change(new_prompt: str) -> None:
|
||
pool = app.state.agent_pool
|
||
updated = 0
|
||
for agent_name in pool.list_agents():
|
||
try:
|
||
agent = pool.get_agent(agent_name)
|
||
if agent is not None:
|
||
agent._system_prompt = new_prompt
|
||
updated += 1
|
||
except Exception:
|
||
logger.warning(
|
||
f"Failed to update system prompt for agent '{agent_name}'", exc_info=True
|
||
)
|
||
logger.info(
|
||
f"Memory changed: refreshed system prompt for {updated}/{len(pool.list_agents())} agents"
|
||
)
|
||
|
||
memory_store._on_change = _on_memory_change
|
||
|
||
# Store memory_store on app.state for chat routes to use
|
||
app.state.memory_store = memory_store
|
||
|
||
default_config = AgentConfig(
|
||
name="default",
|
||
agent_type="chat",
|
||
task_mode="llm_generate",
|
||
description="Default chat agent for GUI",
|
||
prompt={"system": effective_system_prompt},
|
||
)
|
||
try:
|
||
agent = await app.state.agent_pool.create_agent(default_config)
|
||
|
||
# Register tools into the agent's tool registry
|
||
search_api_keys = {
|
||
"tavily_api_key": os.environ.get("TAVILY_API_KEY"),
|
||
"serper_api_key": os.environ.get("SERPER_API_KEY"),
|
||
}
|
||
agent._tool_registry.register(MemoryTool(memory_store=memory_store))
|
||
agent._tool_registry.register(ShellTool())
|
||
agent._tool_registry.register(
|
||
SkillInstallTool(
|
||
skill_registry=app.state.skill_registry,
|
||
tool_registry=app.state.tool_registry,
|
||
)
|
||
)
|
||
agent._tool_registry.register(SkillSearchTool())
|
||
agent._tool_registry.register(BaiduSearchTool())
|
||
agent._tool_registry.register(WebSearchTool(**search_api_keys))
|
||
agent._tool_registry.register(WebCrawlTool())
|
||
|
||
# Document processing tool (U6): DocumentService with all renderers.
|
||
# On failure the tool is simply unavailable — app.state.document_service
|
||
# remains unset. Callers must check hasattr(app.state, 'document_service').
|
||
try:
|
||
await init_documents_db()
|
||
doc_service = DocumentService()
|
||
doc_service.register_renderer("word", WordRenderer())
|
||
doc_service.register_renderer("excel", ExcelRenderer())
|
||
doc_service.register_renderer("pdf", PDFRenderer())
|
||
agent._tool_registry.register(DocumentTool(service=doc_service))
|
||
app.state.document_service = doc_service
|
||
logger.info("DocumentTool registered with word/excel/pdf renderers")
|
||
except Exception:
|
||
logger.exception("Failed to register DocumentTool")
|
||
|
||
# Override system prompt with memory-injected version
|
||
agent._system_prompt = effective_system_prompt
|
||
|
||
logger.info("GUI mode: created default chat agent with memory + tools")
|
||
except Exception as e:
|
||
logger.warning(f"GUI mode: failed to create default agent: {e}")
|
||
|
||
# Load skills from config and register into SkillRegistry
|
||
try:
|
||
from agentkit.skills.loader import SkillLoader
|
||
|
||
skill_registry = app.state.skill_registry
|
||
tool_registry = app.state.tool_registry
|
||
|
||
# Register GUI tools into the shared tool registry so skills can bind them
|
||
for tool in agent._tool_registry.list_tools():
|
||
try:
|
||
tool_registry.register(tool)
|
||
except Exception:
|
||
pass # Already registered
|
||
|
||
# Load skills from configured paths
|
||
server_config = getattr(app.state, "server_config", None)
|
||
if server_config and server_config.skill_paths:
|
||
loader = SkillLoader(
|
||
skill_registry=skill_registry,
|
||
tool_registry=tool_registry,
|
||
)
|
||
for skill_path in server_config.skill_paths:
|
||
from pathlib import Path as _P
|
||
|
||
p = _P(skill_path)
|
||
if p.is_dir():
|
||
loaded = loader.load_from_directory(str(p))
|
||
logger.info(f"GUI mode: loaded {len(loaded)} skills from {p}")
|
||
elif p.is_file() and p.suffix in (".yaml", ".yml"):
|
||
try:
|
||
loader.load_from_file(str(p))
|
||
logger.info(f"GUI mode: loaded skill from {p}")
|
||
except Exception as se:
|
||
logger.warning(f"GUI mode: failed to load skill from {p}: {se}")
|
||
|
||
logger.info(f"GUI mode: {len(skill_registry.list_skills())} skills registered")
|
||
except Exception as e:
|
||
logger.warning(f"GUI mode: failed to load skills: {e}")
|
||
elif gui_mode:
|
||
# Agent already exists (e.g. from config), still ensure memory store is available
|
||
if not hasattr(app.state, "memory_store") or app.state.memory_store is None:
|
||
from agentkit.memory.profile import MemoryStore
|
||
|
||
memory_store = MemoryStore()
|
||
memory_store.ensure_defaults()
|
||
# Initialize _base_prompt so refresh_system_prompt works correctly
|
||
snapshot = memory_store.load_all()
|
||
base_prompt = (
|
||
"你是一个有帮助的AI助手。请记住我们对话的上下文,并在后续对话中引用之前的内容。回答要清晰简洁,请使用中文回复。\n\n"
|
||
"重要提示:当你不确定事实信息、时事新闻或任何你不确信的话题时,"
|
||
"你必须先使用搜索工具查找准确和最新的信息,然后再回答。"
|
||
"中文内容优先使用 baidu_search 工具,英文/国际内容使用 web_search。"
|
||
"在能够搜索到真相的情况下,绝不猜测或编造答案。"
|
||
"始终优先搜索而不是给出可能不正确的信息。\n\n"
|
||
"技能安装:当需要安装技能时,使用 skill_install 工具,不要用 shell 执行 npm install。"
|
||
"skill_install 的 source 参数格式为 owner/repo@skill,例如 vercel-labs/skills@find-skills。"
|
||
"如果不知道完整 source,先用 shell 执行 `npx skills search <name>` 搜索。"
|
||
)
|
||
memory_store.build_system_prompt(snapshot, base_prompt)
|
||
|
||
# Register on_change callback for existing agents
|
||
def _on_memory_change(new_prompt: str) -> None:
|
||
pool = app.state.agent_pool
|
||
updated = 0
|
||
for agent_name in pool.list_agents():
|
||
try:
|
||
agent = pool.get_agent(agent_name)
|
||
if agent is not None:
|
||
agent._system_prompt = new_prompt
|
||
updated += 1
|
||
except Exception:
|
||
logger.warning(
|
||
f"Failed to update system prompt for agent '{agent_name}'",
|
||
exc_info=True,
|
||
)
|
||
logger.info(
|
||
f"Memory changed: refreshed system prompt for {updated}/{len(pool.list_agents())} agents"
|
||
)
|
||
|
||
memory_store._on_change = _on_memory_change
|
||
app.state.memory_store = memory_store
|
||
|
||
# Load ExpertTemplates from configured paths (supports @board meeting mode)
|
||
# This runs regardless of GUI mode so @board works in API-only mode too.
|
||
try:
|
||
from agentkit.experts.registry import ExpertTemplateRegistry
|
||
|
||
expert_registry = ExpertTemplateRegistry()
|
||
|
||
# Always try to load from the default configs/experts/ directory
|
||
default_experts_dir = os.path.join(
|
||
os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))),
|
||
"configs",
|
||
"experts",
|
||
)
|
||
expert_dirs: list[str] = [default_experts_dir]
|
||
|
||
# Append user-configured paths from agentkit.yaml
|
||
if server_config and getattr(server_config, "expert_paths", None):
|
||
expert_dirs.extend(server_config.expert_paths)
|
||
|
||
total_loaded = 0
|
||
for experts_dir in expert_dirs:
|
||
if not experts_dir:
|
||
continue
|
||
from pathlib import Path as _P
|
||
|
||
p = _P(experts_dir)
|
||
if p.is_dir():
|
||
loaded = expert_registry.load_from_directory(str(p))
|
||
if loaded:
|
||
logger.info(f"Loaded {len(loaded)} ExpertTemplates from {p}")
|
||
total_loaded += len(loaded)
|
||
|
||
app.state.expert_template_registry = expert_registry
|
||
if total_loaded:
|
||
logger.info(f"Total {total_loaded} ExpertTemplates registered for @board mode")
|
||
except Exception as e:
|
||
logger.warning(f"Failed to load ExpertTemplates: {e}")
|
||
# Ensure app.state.expert_template_registry always exists (empty registry)
|
||
from agentkit.experts.registry import ExpertTemplateRegistry
|
||
|
||
app.state.expert_template_registry = ExpertTemplateRegistry()
|
||
|
||
# Calendar subsystem (U1-U12): init DB, service, reminder scheduler, agent tool.
|
||
calendar_scheduler = None
|
||
try:
|
||
from agentkit.calendar.db import init_calendar_db
|
||
from agentkit.calendar.scheduler import ReminderScheduler
|
||
from agentkit.calendar.service import CalendarService
|
||
from agentkit.tools.calendar_tool import CalendarTool
|
||
|
||
await init_calendar_db()
|
||
cal_service = CalendarService()
|
||
app.state.calendar_service = cal_service
|
||
calendar_scheduler = ReminderScheduler()
|
||
await calendar_scheduler.start()
|
||
app.state.calendar_scheduler = calendar_scheduler
|
||
# Register CalendarTool so ReAct agents can create/query events.
|
||
try:
|
||
app.state.tool_registry.register(CalendarTool(service=cal_service))
|
||
logger.info("CalendarTool registered for ReAct integration")
|
||
except Exception:
|
||
pass # Already registered
|
||
logger.info("Calendar subsystem initialized (service + reminder scheduler)")
|
||
except Exception:
|
||
logger.exception("Failed to initialize calendar subsystem — calendar API unavailable")
|
||
|
||
yield
|
||
|
||
# Shutdown
|
||
# Stop MCP servers
|
||
if mcp_manager is not None:
|
||
await mcp_manager.stop_all()
|
||
|
||
# Close Redis client for working memory
|
||
working_redis = getattr(app.state, "working_redis_client", None)
|
||
if working_redis is not None:
|
||
await working_redis.aclose()
|
||
|
||
if server_config is not None:
|
||
server_config.stop_watching()
|
||
|
||
# Close SQ/EQ dual-queue system (Phase 4 integration)
|
||
# SubmissionQueue.close() marks the queue as closed (no new submissions).
|
||
# EventQueue.close() sends sentinel to all subscribers so they can exit gracefully.
|
||
event_queue = getattr(app.state, "event_queue", None)
|
||
if event_queue is not None:
|
||
event_queue.close()
|
||
submission_queue = getattr(app.state, "submission_queue", None)
|
||
if submission_queue is not None:
|
||
submission_queue.close()
|
||
|
||
await task_store.stop_cleanup()
|
||
|
||
# Stop calendar reminder scheduler
|
||
cal_scheduler = getattr(app.state, "calendar_scheduler", None)
|
||
if cal_scheduler is not None:
|
||
await cal_scheduler.stop()
|
||
|
||
|
||
def _on_config_change(app: FastAPI, config: ServerConfig) -> None:
|
||
"""Handle config change by reloading affected components.
|
||
|
||
Implements graceful rolling update:
|
||
- New tasks use the new configuration
|
||
- In-progress tasks continue with their original configuration
|
||
- Config version is incremented for audit tracking
|
||
|
||
Uses a lock to prevent concurrent config reloads from racing.
|
||
Thread-safe: schedules reload onto the asyncio event loop via
|
||
asyncio.run_coroutine_threadsafe() since watchfiles calls this
|
||
from a non-asyncio thread.
|
||
"""
|
||
import threading
|
||
|
||
lock = app.state._config_reload_lock
|
||
|
||
# Thread-safe: set pending flag via threading.Event
|
||
if not hasattr(app.state, "_config_reload_event"):
|
||
app.state._config_reload_event = threading.Event()
|
||
|
||
app.state._config_reload_event.set()
|
||
|
||
async def _reload():
|
||
if lock.locked():
|
||
return # Another reload running; it will check pending flag
|
||
async with lock:
|
||
while app.state._config_reload_event.is_set():
|
||
app.state._config_reload_event.clear()
|
||
# Increment config version for audit
|
||
current_version = getattr(app.state, "config_version", 0) + 1
|
||
app.state.config_version = current_version
|
||
logger.info(f"Config change detected (v{current_version}), reloading...")
|
||
|
||
# Rebuild LLMGateway if llm config changed
|
||
try:
|
||
new_gateway = _build_llm_gateway(config)
|
||
app.state.llm_gateway = new_gateway
|
||
# Also update the agent pool's gateway reference
|
||
if hasattr(app.state, "agent_pool") and app.state.agent_pool is not None:
|
||
app.state.agent_pool._llm_gateway = new_gateway
|
||
logger.info(f"LLM Gateway reloaded (config v{current_version})")
|
||
except Exception as e:
|
||
logger.error(f"Failed to reload LLM Gateway: {e}")
|
||
|
||
# Reload skills if skill paths changed
|
||
try:
|
||
new_skill_registry = _build_skill_registry(config)
|
||
# Re-bind tools from the shared tool_registry so skills don't lose their bindings
|
||
tool_registry = getattr(app.state, "tool_registry", None)
|
||
if tool_registry:
|
||
from agentkit.skills.loader import SkillLoader
|
||
|
||
loader = SkillLoader(
|
||
skill_registry=new_skill_registry,
|
||
tool_registry=tool_registry,
|
||
)
|
||
for skill_path in config.skill_paths or []:
|
||
from pathlib import Path as _P
|
||
|
||
p = _P(skill_path)
|
||
if p.is_dir():
|
||
loader.load_from_directory(str(p))
|
||
elif p.is_file() and p.suffix in (".yaml", ".yml"):
|
||
try:
|
||
loader.load_from_file(str(p))
|
||
except Exception:
|
||
pass
|
||
app.state.skill_registry = new_skill_registry
|
||
if hasattr(app.state, "agent_pool") and app.state.agent_pool is not None:
|
||
app.state.agent_pool._skill_registry = new_skill_registry
|
||
logger.info(f"Skills reloaded (config v{current_version})")
|
||
except Exception as e:
|
||
logger.error(f"Failed to reload skills: {e}")
|
||
|
||
# Update config version on all agents
|
||
if hasattr(app.state, "agent_pool") and app.state.agent_pool is not None:
|
||
for agent in app.state.agent_pool._agents.values():
|
||
if hasattr(agent, "_config_version"):
|
||
agent._config_version = current_version
|
||
|
||
logger.info(f"Config reload complete (v{current_version})")
|
||
|
||
# Schedule the reload onto the event loop via run_coroutine_threadsafe
|
||
# (watchfiles calls _on_config_change from a non-asyncio thread)
|
||
try:
|
||
loop = asyncio.get_running_loop()
|
||
asyncio.run_coroutine_threadsafe(_reload(), loop)
|
||
except RuntimeError:
|
||
# No running loop — try getting the stored event loop reference
|
||
_loop = getattr(app.state, "_event_loop", None)
|
||
if _loop is not None and not _loop.is_closed():
|
||
asyncio.run_coroutine_threadsafe(_reload(), _loop)
|
||
else:
|
||
logger.warning("No running event loop, config reload deferred")
|
||
|
||
|
||
def create_app(
|
||
llm_gateway: LLMGateway | None = None,
|
||
skill_registry: SkillRegistry | None = None,
|
||
tool_registry: ToolRegistry | None = None,
|
||
api_key: str | None = None,
|
||
rate_limit: int | None = None,
|
||
server_config: ServerConfig | None = None,
|
||
) -> FastAPI:
|
||
"""Create and configure the FastAPI application
|
||
|
||
When called by uvicorn (factory=True), automatically loads ServerConfig
|
||
from AGENTKIT_CONFIG_PATH env var if server_config is not provided.
|
||
"""
|
||
# Auto-load config from env var if not provided (uvicorn factory mode)
|
||
if server_config is None:
|
||
from pathlib import Path as _P
|
||
|
||
config_path = os.environ.get("AGENTKIT_CONFIG_PATH")
|
||
if not config_path or not os.path.exists(config_path):
|
||
# Auto-discover agentkit.yaml in CWD
|
||
_cwd_yaml = _P.cwd() / "agentkit.yaml"
|
||
if _cwd_yaml.exists():
|
||
config_path = str(_cwd_yaml)
|
||
|
||
if config_path and os.path.exists(config_path):
|
||
# Load .env before parsing config (so ${ENV_VAR} substitutions work)
|
||
from pathlib import Path as _P
|
||
|
||
_dotenv = _P(config_path).parent / ".env"
|
||
load_dotenv(_dotenv)
|
||
server_config = ServerConfig.from_yaml(config_path)
|
||
app = FastAPI(title="AgentKit Server", version="2.0.0", lifespan=lifespan)
|
||
|
||
# Initialize structured logging
|
||
setup_structured_logging()
|
||
|
||
# Initialize OpenTelemetry (no-op if not installed or not configured)
|
||
if server_config:
|
||
setup_telemetry(app, server_config.telemetry)
|
||
|
||
# Resolve effective API key and rate limit
|
||
effective_api_key = api_key
|
||
effective_rate_limit = rate_limit
|
||
if server_config:
|
||
if effective_api_key is None:
|
||
effective_api_key = server_config.api_key
|
||
if effective_rate_limit is None:
|
||
effective_rate_limit = server_config.rate_limit
|
||
|
||
# CORS 配置
|
||
cors_origins = ["*"]
|
||
if server_config:
|
||
cors_origins = server_config.cors_origins
|
||
if cors_origins == ["*"]:
|
||
import logging
|
||
|
||
logging.getLogger(__name__).warning(
|
||
"CORS allows all origins (allow_origins=['*']). "
|
||
"Set server.cors_origins in agentkit.yaml for production."
|
||
)
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=cors_origins,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
# Auth middleware
|
||
# AuthMiddleware (JWT + API Key dual-track) runs *before* the legacy
|
||
# APIKeyAuthMiddleware so browser sessions (JWT) and programmatic clients
|
||
# (API key) can coexist during the U2 → U5 migration. Both layers are
|
||
# kept; AuthMiddleware short-circuits on success, so APIKeyAuthMiddleware
|
||
# only sees requests that AuthMiddleware could not authenticate.
|
||
#
|
||
# JWT auth is only active when AGENTKIT_JWT_SECRET is explicitly set.
|
||
# When unset, the middleware runs in dev mode (no JWT enforcement) but
|
||
# the auth routes still issue tokens signed with an ephemeral secret.
|
||
jwt_secret = get_jwt_secret()
|
||
client_keys: dict[str, str] = {}
|
||
try:
|
||
from agentkit.server.middleware import _load_client_keys
|
||
|
||
client_keys = _load_client_keys()
|
||
except Exception as e:
|
||
logger.warning(f"Failed to load client keys for AuthMiddleware: {e}")
|
||
|
||
app.add_middleware(
|
||
AuthMiddleware,
|
||
jwt_secret=jwt_secret or "",
|
||
api_key=effective_api_key,
|
||
client_keys=client_keys,
|
||
)
|
||
# NOTE: APIKeyAuthMiddleware is intentionally NOT added here.
|
||
# AuthMiddleware above already handles X-API-Key headers (with
|
||
# constant-time comparison and proper current_user assignment).
|
||
# Adding APIKeyAuthMiddleware would make it the outermost middleware
|
||
# (Starlette add_middleware inserts at position 0), causing it to run
|
||
# BEFORE AuthMiddleware and reject JWT-only requests in production.
|
||
|
||
# Expose JWT secret on app.state for routes (None = dev mode, routes will
|
||
# generate an ephemeral secret via get_or_create_jwt_secret()).
|
||
app.state.jwt_secret = jwt_secret
|
||
|
||
# Rate limiting middleware
|
||
if effective_rate_limit is not None:
|
||
os.environ["AGENTKIT_RATE_LIMIT_PER_MINUTE"] = str(effective_rate_limit)
|
||
app.add_middleware(RateLimitMiddleware)
|
||
|
||
# Build LLM Gateway from config if not provided
|
||
if llm_gateway is None and server_config:
|
||
llm_gateway = _build_llm_gateway(server_config)
|
||
|
||
# Build Skill Registry from config if not provided
|
||
if skill_registry is None and server_config:
|
||
skill_registry = _build_skill_registry(server_config)
|
||
|
||
# Initialize shared state
|
||
app.state.llm_gateway = llm_gateway or LLMGateway()
|
||
app.state.skill_registry = skill_registry or SkillRegistry()
|
||
app.state.tool_registry = tool_registry or ToolRegistry()
|
||
# Initialize MCPManager if MCP servers are configured
|
||
if server_config and server_config.mcp_servers:
|
||
mcp_manager = MCPManager(
|
||
configs=server_config.mcp_servers,
|
||
tool_registry=app.state.tool_registry,
|
||
)
|
||
app.state.mcp_manager = mcp_manager
|
||
else:
|
||
app.state.mcp_manager = None
|
||
# Initialize compressor if compression is configured
|
||
from agentkit.core.compressor import create_compressor
|
||
|
||
compressor = create_compressor(server_config.compression) if server_config else None
|
||
app.state.compressor = compressor
|
||
# Register headroom_retrieve tool if HeadroomCompressor is active
|
||
if compressor is not None:
|
||
try:
|
||
from agentkit.core.headroom_compressor import HeadroomCompressor
|
||
|
||
if isinstance(compressor, HeadroomCompressor) and compressor.is_available():
|
||
from agentkit.tools.headroom_retrieve import HeadroomRetrieveTool
|
||
|
||
retrieve_tool = HeadroomRetrieveTool(compressor=compressor)
|
||
app.state.tool_registry.register(retrieve_tool)
|
||
logger.info("HeadroomRetrieveTool registered (CCR retrieval enabled)")
|
||
except ImportError:
|
||
pass
|
||
# Initialize MessageBus for inter-agent communication
|
||
from agentkit.bus.redis_bus import create_message_bus
|
||
|
||
bus_config = {}
|
||
if server_config and hasattr(server_config, "bus") and server_config.bus:
|
||
bus_config = server_config.bus
|
||
message_bus = create_message_bus(
|
||
backend=bus_config.get("backend", "memory"),
|
||
redis_url=bus_config.get("redis_url", "redis://localhost:6379/0"),
|
||
)
|
||
app.state.message_bus = message_bus
|
||
|
||
app.state.agent_pool = AgentPool(
|
||
llm_gateway=app.state.llm_gateway,
|
||
skill_registry=app.state.skill_registry,
|
||
tool_registry=app.state.tool_registry,
|
||
compressor=compressor,
|
||
message_bus=message_bus,
|
||
)
|
||
app.state.quality_gate = QualityGate()
|
||
app.state.output_standardizer = OutputStandardizer()
|
||
|
||
# Initialize RequestPreprocessor (minimal preprocessing: @skill prefix + greeting regex + REACT)
|
||
from agentkit.chat.request_preprocessor import RequestPreprocessor
|
||
|
||
request_preprocessor = RequestPreprocessor(
|
||
skill_registry=app.state.skill_registry,
|
||
)
|
||
app.state.request_preprocessor = request_preprocessor
|
||
|
||
# Initialize ExpertTemplateRegistry (populated in lifespan with YAML configs)
|
||
from agentkit.experts.registry import ExpertTemplateRegistry
|
||
|
||
app.state.expert_template_registry = ExpertTemplateRegistry()
|
||
|
||
# Initialize OrganizationContext from AgentPool + SkillRegistry
|
||
from agentkit.org.context import OrganizationContext
|
||
|
||
org_context = OrganizationContext.from_agent_pool(
|
||
agent_pool=app.state.agent_pool,
|
||
skill_registry=app.state.skill_registry,
|
||
)
|
||
app.state.org_context = org_context
|
||
|
||
# Initialize AlignmentGuard from config
|
||
from agentkit.quality.alignment import AlignmentGuard, AlignmentConfig
|
||
|
||
alignment_config_data = {}
|
||
if server_config and hasattr(server_config, "alignment") and server_config.alignment:
|
||
alignment_config_data = server_config.alignment
|
||
alignment_config = AlignmentConfig.from_dict(alignment_config_data)
|
||
alignment_guard = AlignmentGuard(config=alignment_config, llm_gateway=app.state.llm_gateway)
|
||
app.state.alignment_guard = alignment_guard
|
||
|
||
# Initialize task store from config
|
||
ts_config = server_config.task_store if server_config else {}
|
||
# Merge CLI overrides from AGENTKIT_TASK_STORE env var
|
||
ts_env = os.environ.get("AGENTKIT_TASK_STORE")
|
||
if ts_env:
|
||
import json as _json
|
||
|
||
try:
|
||
ts_config = {**ts_config, **_json.loads(ts_env)}
|
||
except Exception:
|
||
pass
|
||
task_store = create_task_store(
|
||
backend=ts_config.get("backend", "memory"),
|
||
redis_url=ts_config.get("redis_url", "redis://localhost:6379/0"),
|
||
ttl_seconds=ts_config.get("ttl_seconds", 3600),
|
||
max_records=ts_config.get("max_records", 10000),
|
||
)
|
||
app.state.task_store = task_store
|
||
app.state.runner = BackgroundRunner(task_store=app.state.task_store)
|
||
app.state.server_config = server_config
|
||
app.state.api_key = effective_api_key
|
||
app.state._config_reload_lock = asyncio.Lock()
|
||
|
||
# Initialize SQ/EQ dual-queue system (Phase 4 integration)
|
||
# - SubmissionQueue (SQ): receives user input, returns task_id
|
||
# - EventQueue (EQ): pushes Agent events, supports multi-subscriber broadcast
|
||
# Both are closed on app shutdown (see lifespan()).
|
||
app.state.submission_queue = SubmissionQueue()
|
||
app.state.event_queue = EventQueue()
|
||
|
||
# Initialize session manager for Chat mode
|
||
from agentkit.session.manager import SessionManager
|
||
from agentkit.session.store import create_session_store
|
||
|
||
session_config = {}
|
||
if server_config and hasattr(server_config, "session") and server_config.session:
|
||
session_config = server_config.session
|
||
# GUI mode defaults to file-backed sessions for persistence
|
||
session_backend = session_config.get(
|
||
"backend", "file" if os.environ.get("AGENTKIT_GUI_MODE") else "memory"
|
||
)
|
||
session_store = create_session_store(
|
||
backend=session_backend,
|
||
redis_url=session_config.get("redis_url", "redis://localhost:6379/0"),
|
||
ttl_seconds=session_config.get("ttl_seconds", 86400),
|
||
)
|
||
app.state.session_manager = SessionManager(store=session_store)
|
||
|
||
# Inject SessionManager into Portal's ConversationStore for persistence (legacy only)
|
||
from agentkit.server.routes.portal import _conversation_store
|
||
|
||
if hasattr(_conversation_store, "set_session_manager"):
|
||
_conversation_store.set_session_manager(app.state.session_manager)
|
||
|
||
# Initialize evolution store if configured
|
||
if server_config and hasattr(server_config, "evolution") and server_config.evolution:
|
||
try:
|
||
from agentkit.evolution.evolution_store import create_evolution_store
|
||
|
||
evo_conf = server_config.evolution
|
||
app.state.evolution_store = create_evolution_store(
|
||
backend=evo_conf.get("backend", "memory"),
|
||
db_path=evo_conf.get("db_path", "~/.agentkit/evolution.db"),
|
||
database_url=evo_conf.get("database_url"),
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"Failed to initialize evolution store: {e}")
|
||
app.state.evolution_store = None
|
||
else:
|
||
app.state.evolution_store = None
|
||
|
||
# Initialize cascade state store if configured
|
||
if server_config and hasattr(server_config, "cascade_store") and server_config.cascade_store:
|
||
try:
|
||
from agentkit.quality.cascade_state_store import create_cascade_state_store
|
||
|
||
cs_conf = server_config.cascade_store
|
||
app.state.cascade_state_store = create_cascade_state_store(
|
||
backend=cs_conf.get("backend", "memory"),
|
||
redis_url=cs_conf.get("redis_url", "redis://localhost:6379"),
|
||
session_ttl=cs_conf.get("session_ttl", 86400),
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"Failed to initialize cascade state store: {e}")
|
||
app.state.cascade_state_store = None
|
||
else:
|
||
app.state.cascade_state_store = None
|
||
|
||
# Initialize memory components if configured
|
||
if server_config and hasattr(server_config, "memory") and server_config.memory:
|
||
try:
|
||
from agentkit.memory.retriever import MemoryRetriever
|
||
from agentkit.memory.working import WorkingMemory
|
||
from agentkit.memory.semantic import SemanticMemory
|
||
from agentkit.memory.http_rag import HttpRAGService
|
||
|
||
working = None
|
||
episodic = None
|
||
semantic = None
|
||
|
||
if server_config.memory.get("working", {}).get("enabled"):
|
||
import redis.asyncio as aioredis
|
||
|
||
redis_url = server_config.memory["working"].get(
|
||
"redis_url", "redis://localhost:6379"
|
||
)
|
||
redis_client = aioredis.from_url(redis_url, decode_responses=True)
|
||
working = WorkingMemory(redis=redis_client)
|
||
app.state.working_redis_client = redis_client
|
||
|
||
if server_config.memory.get("semantic", {}).get("enabled"):
|
||
sem_conf = server_config.memory["semantic"]
|
||
rag_service = HttpRAGService(
|
||
base_url=sem_conf["base_url"],
|
||
api_key=sem_conf.get("api_key"),
|
||
knowledge_base_ids=sem_conf.get("knowledge_base_ids", []),
|
||
timeout=sem_conf.get("timeout", 30),
|
||
)
|
||
semantic = SemanticMemory(
|
||
rag_service=rag_service,
|
||
knowledge_base_ids=sem_conf.get("knowledge_base_ids", []),
|
||
search_mode=sem_conf.get("search_mode", "standard"),
|
||
use_rerank=sem_conf.get("use_rerank", True),
|
||
use_compression=sem_conf.get("use_compression", False),
|
||
kb_weights=sem_conf.get("kb_weights"),
|
||
)
|
||
|
||
if server_config.memory.get("episodic", {}).get("enabled"):
|
||
try:
|
||
from agentkit.memory.episodic import EpisodicMemory
|
||
from agentkit.memory.embedder import OpenAIEmbedder, EmbeddingCache
|
||
from agentkit.memory.models import EpisodeModel, create_episodic_session_factory
|
||
|
||
epi_conf = server_config.memory["episodic"]
|
||
embedder = None
|
||
if epi_conf.get("embedder_api_key") or os.environ.get("OPENAI_API_KEY"):
|
||
cache = EmbeddingCache(
|
||
max_size=epi_conf.get("cache_max_size", 1000),
|
||
ttl=epi_conf.get("cache_ttl", 3600),
|
||
)
|
||
embedder = OpenAIEmbedder(
|
||
api_key=epi_conf.get("embedder_api_key"),
|
||
model=epi_conf.get("embedder_model", "text-embedding-3-small"),
|
||
base_url=epi_conf.get("embedder_base_url"),
|
||
cache=cache,
|
||
)
|
||
# Resolve session_factory and model from database_url if configured
|
||
epi_session_factory = None
|
||
epi_model = None
|
||
database_url = epi_conf.get("database_url") or os.environ.get("DATABASE_URL")
|
||
if database_url:
|
||
try:
|
||
epi_session_factory = create_episodic_session_factory(database_url)
|
||
epi_model = EpisodeModel
|
||
except Exception as db_err:
|
||
import logging as _log
|
||
|
||
_log.getLogger(__name__).warning(
|
||
f"Failed to create episodic DB session: {db_err}"
|
||
)
|
||
|
||
episodic = EpisodicMemory(
|
||
session_factory=epi_session_factory,
|
||
episodic_model=epi_model,
|
||
embedder=embedder,
|
||
decay_rate=epi_conf.get("decay_rate", 0.01),
|
||
alpha=epi_conf.get("alpha", 0.7),
|
||
retrieve_limit=epi_conf.get("retrieve_limit", 200),
|
||
pgvector_enabled=epi_conf.get("pgvector_enabled", True),
|
||
table_name=epi_conf.get("table_name", "episodic_memories"),
|
||
)
|
||
except Exception as e:
|
||
import logging
|
||
|
||
logging.getLogger(__name__).warning(
|
||
f"Failed to initialize episodic memory: {e}"
|
||
)
|
||
|
||
memory_retriever = MemoryRetriever(
|
||
working_memory=working,
|
||
episodic_memory=episodic,
|
||
semantic_memory=semantic,
|
||
)
|
||
app.state.memory_retriever = memory_retriever
|
||
|
||
# Auto-register retrieve_knowledge tool if semantic memory is configured
|
||
if memory_retriever:
|
||
retrieve_tool = memory_retriever.create_retrieve_tool()
|
||
if retrieve_tool:
|
||
app.state.retrieve_knowledge_tool = retrieve_tool
|
||
except Exception as e:
|
||
import logging
|
||
|
||
logging.getLogger(__name__).warning(f"Failed to initialize memory components: {e}")
|
||
app.state.memory_retriever = None
|
||
|
||
# Include routes
|
||
app.include_router(agents.router, prefix="/api/v1")
|
||
app.include_router(tasks.router, prefix="/api/v1")
|
||
app.include_router(skills.router, prefix="/api/v1")
|
||
app.include_router(llm.router, prefix="/api/v1")
|
||
app.include_router(llm_gateway_routes.router, prefix="/api/v1")
|
||
app.include_router(health.router, prefix="/api/v1")
|
||
app.include_router(metrics.router, prefix="/api/v1")
|
||
app.include_router(system.router, prefix="/api/v1")
|
||
app.include_router(ws.router, prefix="/api/v1")
|
||
app.include_router(evolution.router, prefix="/api/v1")
|
||
app.include_router(memory.router, prefix="/api/v1")
|
||
app.include_router(portal.router, prefix="/api/v1")
|
||
app.include_router(evolution_dashboard.router, prefix="/api/v1")
|
||
app.include_router(kb_management.router, prefix="/api/v1")
|
||
app.include_router(skill_management.router, prefix="/api/v1")
|
||
app.include_router(workflows.router, prefix="/api/v1")
|
||
app.include_router(chat.router, prefix="/api/v1")
|
||
app.include_router(config_sync.router, prefix="/api/v1")
|
||
app.include_router(settings.router, prefix="/api/v1")
|
||
app.include_router(terminal.router, prefix="/api/v1")
|
||
app.include_router(terminal_server.router, prefix="/api/v1")
|
||
app.include_router(terminal_whitelist.router, prefix="/api/v1")
|
||
app.include_router(experts.router, prefix="/api/v1")
|
||
app.include_router(auth_routes.router, prefix="/api/v1")
|
||
app.include_router(auth_routes.admin_router, prefix="/api/v1")
|
||
app.include_router(admin_routes_module.admin_router, prefix="/api/v1")
|
||
app.include_router(documents.router, prefix="/api/v1")
|
||
app.include_router(calendar_routes.router, prefix="/api/v1")
|
||
|
||
# Serve GUI when in GUI mode
|
||
gui_mode = os.environ.get("AGENTKIT_GUI_MODE")
|
||
if gui_mode:
|
||
from pathlib import Path as _Path
|
||
from fastapi.responses import HTMLResponse, FileResponse
|
||
from fastapi.staticfiles import StaticFiles
|
||
|
||
_static_dir = _Path(__file__).parent / "static"
|
||
|
||
@app.get("/", response_class=HTMLResponse, include_in_schema=False)
|
||
async def gui_index():
|
||
"""Serve the GUI index page."""
|
||
index_path = _static_dir / "index.html"
|
||
if index_path.exists():
|
||
return FileResponse(str(index_path))
|
||
return HTMLResponse("<h1>AgentKit GUI not found</h1>", status_code=404)
|
||
|
||
# SPA fallback: serve index.html for all non-API, non-static routes
|
||
@app.get("/{path:path}", response_class=HTMLResponse, include_in_schema=False)
|
||
async def spa_fallback(path: str):
|
||
"""Serve index.html for SPA client-side routing."""
|
||
# Don't intercept API routes or docs
|
||
if path.startswith("api/") or path.startswith("docs") or path == "openapi.json":
|
||
return HTMLResponse("<h1>Not Found</h1>", status_code=404)
|
||
# Try to serve a real static file first (with path traversal protection)
|
||
file_path = (_static_dir / path).resolve()
|
||
if not str(file_path).startswith(str(_static_dir.resolve())):
|
||
return HTMLResponse("<h1>Forbidden</h1>", status_code=403)
|
||
if file_path.is_file():
|
||
return FileResponse(str(file_path))
|
||
# Fallback to index.html for SPA routing
|
||
index_path = _static_dir / "index.html"
|
||
if index_path.exists():
|
||
return FileResponse(str(index_path))
|
||
return HTMLResponse("<h1>Not Found</h1>", status_code=404)
|
||
|
||
# Mount static assets last (js, css, images, etc.)
|
||
# mount() is checked after route matching, so API routes take priority
|
||
assets_dir = _static_dir / "assets"
|
||
if assets_dir.exists():
|
||
app.mount("/assets", StaticFiles(directory=str(assets_dir)), name="static-assets")
|
||
|
||
return app
|