feat(server): wire rag_platform components to app.state lifespan

Initialize in lifespan() (after bitable, before yield):
- KBStore + ensure_tables() → app.state.kb_store (if database_url available)
- RetrievalEngine + vector_store → app.state.retrieval_engine (if database_url available)
- HitProcessor → app.state.hit_processor (with llm_gateway)
- TaskManager → app.state.task_manager (degraded mode, InMemoryTaskStore)
- KBSettingsStore → app.state.kb_settings_store (singleton)

Each component wrapped in try/except — failures logged but don't block startup.
Follows same pattern as episodic memory initialization.
This commit is contained in:
chiguyong 2026-06-25 20:02:01 +08:00
parent 1f691ca178
commit 864bb95a30
1 changed files with 78 additions and 0 deletions

View File

@ -440,6 +440,84 @@ async def lifespan(app: FastAPI):
except Exception:
logger.exception("Failed to initialize bitable subsystem")
# RAG platform subsystem (P1): wire KBStore, RetrievalEngine, HitProcessor,
# TaskManager, KBSettingsStore into app.state. Each component is initialized
# independently so failures don't block app startup. KBStore + RetrievalEngine
# require database_url (PostgreSQL + pgvector); the rest work in degraded mode.
rag_database_url = None
if server_config and hasattr(server_config, "memory") and server_config.memory:
rag_database_url = server_config.memory.get("episodic", {}).get("database_url")
rag_database_url = rag_database_url or os.environ.get("DATABASE_URL")
if rag_database_url:
try:
from agentkit.rag_platform.store import create_session_factory
kb_session_factory = create_session_factory(rag_database_url)
# Ensure rag_platform tables exist (idempotent, async)
try:
from agentkit.rag_platform.store import ensure_tables
await ensure_tables(rag_database_url)
except Exception:
logger.exception("Failed to ensure rag_platform tables")
# KBStore — KB/Document persistence
try:
from agentkit.rag_platform.store import KBStore
app.state.kb_store = KBStore(session_factory=kb_session_factory)
logger.info("KBStore initialized")
except Exception:
logger.exception("Failed to initialize KBStore")
# RetrievalEngine — requires llama_index PGVectorStore
try:
from agentkit.rag_platform.indexing import create_vector_store
from agentkit.rag_platform.retrieval import RetrievalEngine
vector_store = create_vector_store(rag_database_url)
app.state.vector_store = vector_store
app.state.retrieval_engine = RetrievalEngine(
vector_store=vector_store,
session_factory=kb_session_factory,
)
logger.info("RetrievalEngine initialized (pgvector ready)")
except Exception:
logger.exception("Failed to initialize RetrievalEngine — vector search unavailable")
except Exception:
logger.exception("Failed to initialize rag_platform DB components")
# HitProcessor — LLM-based answer generation from retrieval results
try:
from agentkit.rag_platform.hit_processing import HitProcessor
app.state.hit_processor = HitProcessor(llm_gateway=app.state.llm_gateway)
logger.info("HitProcessor initialized")
except Exception:
logger.exception("Failed to initialize HitProcessor")
# TaskManager — async vectorize/batch-index tasks
# ponytail: degraded mode — InMemoryTaskStore, no TaskIQ broker.
# Upgrade path: create_broker(redis_url) + worker process when Redis configured.
try:
from agentkit.rag_platform.tasks import TaskManager
app.state.task_manager = TaskManager()
logger.info("TaskManager initialized (degraded mode: InMemoryTaskStore)")
except Exception:
logger.exception("Failed to initialize TaskManager")
# KBSettingsStore — per-KB retrieval/hit-processing defaults (process singleton)
try:
from agentkit.rag_platform.settings import get_settings_store
app.state.kb_settings_store = get_settings_store()
logger.info("KBSettingsStore initialized")
except Exception:
logger.exception("Failed to initialize KBSettingsStore")
yield
# Shutdown