refactor: remove IntentRouter from tasks.py, delete legacy ConversationStore
- tasks.py: replace IntentRouter.route() with default agent fallback (REACT mode) - app.py: remove IntentRouter import and initialization - portal.py: delete legacy in-memory ConversationStore class (~120 lines), SqliteConversationStore is the sole implementation now - Remove unused SessionManager import from portal.py Tests: 622 passed, 0 failed
This commit is contained in:
parent
bbedfff597
commit
773a62ead2
|
|
@ -16,7 +16,6 @@ from agentkit.llm.providers.openai import OpenAICompatibleProvider
|
||||||
from agentkit.mcp.manager import MCPManager
|
from agentkit.mcp.manager import MCPManager
|
||||||
from agentkit.quality.gate import QualityGate
|
from agentkit.quality.gate import QualityGate
|
||||||
from agentkit.quality.output import OutputStandardizer
|
from agentkit.quality.output import OutputStandardizer
|
||||||
from agentkit.router.intent import IntentRouter
|
|
||||||
from agentkit.skills.base import Skill
|
from agentkit.skills.base import Skill
|
||||||
from agentkit.skills.registry import SkillRegistry
|
from agentkit.skills.registry import SkillRegistry
|
||||||
from agentkit.tools.registry import ToolRegistry
|
from agentkit.tools.registry import ToolRegistry
|
||||||
|
|
@ -385,8 +384,6 @@ def _on_config_change(app: FastAPI, config: ServerConfig) -> None:
|
||||||
# Also update the agent pool's gateway reference
|
# Also update the agent pool's gateway reference
|
||||||
if hasattr(app.state, "agent_pool") and app.state.agent_pool is not None:
|
if hasattr(app.state, "agent_pool") and app.state.agent_pool is not None:
|
||||||
app.state.agent_pool._llm_gateway = new_gateway
|
app.state.agent_pool._llm_gateway = new_gateway
|
||||||
if hasattr(app.state, "intent_router") and app.state.intent_router is not None:
|
|
||||||
app.state.intent_router._llm_gateway = new_gateway
|
|
||||||
logger.info(f"LLM Gateway reloaded (config v{current_version})")
|
logger.info(f"LLM Gateway reloaded (config v{current_version})")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to reload LLM Gateway: {e}")
|
logger.error(f"Failed to reload LLM Gateway: {e}")
|
||||||
|
|
@ -576,7 +573,6 @@ def create_app(
|
||||||
compressor=compressor,
|
compressor=compressor,
|
||||||
message_bus=message_bus,
|
message_bus=message_bus,
|
||||||
)
|
)
|
||||||
app.state.intent_router = IntentRouter(llm_gateway=app.state.llm_gateway)
|
|
||||||
app.state.quality_gate = QualityGate()
|
app.state.quality_gate = QualityGate()
|
||||||
app.state.output_standardizer = OutputStandardizer()
|
app.state.output_standardizer = OutputStandardizer()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,6 @@ from agentkit.server.routes.evolution_dashboard import (
|
||||||
)
|
)
|
||||||
from agentkit.core.fallback import EMPTY_LLM_RESPONSE
|
from agentkit.core.fallback import EMPTY_LLM_RESPONSE
|
||||||
from agentkit.chat.sqlite_conversation_store import SqliteConversationStore
|
from agentkit.chat.sqlite_conversation_store import SqliteConversationStore
|
||||||
from agentkit.session.manager import SessionManager
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
@ -96,128 +95,6 @@ class Conversation:
|
||||||
updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
updated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
|
||||||
|
|
||||||
|
|
||||||
class ConversationStore:
|
|
||||||
"""In-memory conversation store with optional SessionManager persistence.
|
|
||||||
|
|
||||||
When a session_manager is provided, messages are also persisted via
|
|
||||||
SessionManager (which supports file/redis backends). On startup,
|
|
||||||
conversations can be restored from SessionManager.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self, max_conversations: int = 1000, session_manager: SessionManager | None = None
|
|
||||||
):
|
|
||||||
self._conversations: dict[str, Conversation] = {}
|
|
||||||
self._max = max_conversations
|
|
||||||
self._session_manager = session_manager
|
|
||||||
|
|
||||||
def set_session_manager(self, sm: SessionManager | None) -> None:
|
|
||||||
"""Set or update the session manager for persistence."""
|
|
||||||
self._session_manager = sm
|
|
||||||
|
|
||||||
async def restore_from_store(
|
|
||||||
self,
|
|
||||||
max_sessions: int = 50,
|
|
||||||
max_messages_per_session: int = 100,
|
|
||||||
) -> None:
|
|
||||||
"""Restore recent conversations from SessionManager on startup.
|
|
||||||
|
|
||||||
Loads the most recent sessions and their messages so that
|
|
||||||
ConversationStore is populated after a server restart.
|
|
||||||
Limits are applied to prevent memory exhaustion on startup.
|
|
||||||
"""
|
|
||||||
if self._session_manager is None:
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
sessions = await self._session_manager.list_sessions(limit=max_sessions)
|
|
||||||
for session in sessions:
|
|
||||||
sid = session.session_id
|
|
||||||
if sid in self._conversations:
|
|
||||||
continue
|
|
||||||
# Reconstruct Conversation from persisted session
|
|
||||||
conv = Conversation(
|
|
||||||
id=sid,
|
|
||||||
created_at=session.created_at,
|
|
||||||
updated_at=session.updated_at,
|
|
||||||
)
|
|
||||||
messages = await self._session_manager.get_messages(
|
|
||||||
sid, limit=max_messages_per_session
|
|
||||||
)
|
|
||||||
for msg in messages:
|
|
||||||
conv.messages.append(
|
|
||||||
ChatMessage(
|
|
||||||
role=msg.role.value,
|
|
||||||
content=msg.content,
|
|
||||||
timestamp=msg.created_at,
|
|
||||||
metadata=msg.metadata,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
self._conversations[sid] = conv
|
|
||||||
logger.info(f"Restored {len(self._conversations)} conversations from SessionManager")
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Failed to restore conversations from SessionManager: {e}")
|
|
||||||
|
|
||||||
def get_or_create(self, conversation_id: str | None = None) -> Conversation:
|
|
||||||
if conversation_id and conversation_id in self._conversations:
|
|
||||||
conv = self._conversations[conversation_id]
|
|
||||||
conv.updated_at = datetime.now(timezone.utc)
|
|
||||||
return conv
|
|
||||||
|
|
||||||
cid = conversation_id or str(uuid.uuid4())
|
|
||||||
conv = Conversation(id=cid)
|
|
||||||
self._conversations[cid] = conv
|
|
||||||
# Evict oldest if over limit
|
|
||||||
if len(self._conversations) > self._max:
|
|
||||||
oldest_id = min(self._conversations, key=lambda k: self._conversations[k].updated_at)
|
|
||||||
del self._conversations[oldest_id]
|
|
||||||
return conv
|
|
||||||
|
|
||||||
async def add_message(
|
|
||||||
self, conversation_id: str, role: str, content: str, metadata: dict | None = None
|
|
||||||
) -> ChatMessage:
|
|
||||||
"""Add a message to conversation, with optional persistence."""
|
|
||||||
conv = self._conversations.get(conversation_id)
|
|
||||||
if conv is None:
|
|
||||||
raise KeyError(f"Conversation '{conversation_id}' not found")
|
|
||||||
msg = ChatMessage(
|
|
||||||
role=role,
|
|
||||||
content=content,
|
|
||||||
metadata=metadata or {},
|
|
||||||
)
|
|
||||||
conv.messages.append(msg)
|
|
||||||
conv.updated_at = datetime.now(timezone.utc)
|
|
||||||
|
|
||||||
# Persist to SessionManager if available
|
|
||||||
if self._session_manager is not None:
|
|
||||||
try:
|
|
||||||
from agentkit.session.models import MessageRole
|
|
||||||
|
|
||||||
sm = self._session_manager
|
|
||||||
role_enum = MessageRole.USER if role == "user" else MessageRole.ASSISTANT
|
|
||||||
await sm.append_message(
|
|
||||||
session_id=conversation_id,
|
|
||||||
role=role_enum,
|
|
||||||
content=content,
|
|
||||||
metadata=metadata,
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Failed to persist message to SessionManager: {e}")
|
|
||||||
|
|
||||||
return msg
|
|
||||||
|
|
||||||
def get_history(self, conversation_id: str, limit: int = 50) -> list[ChatMessage]:
|
|
||||||
conv = self._conversations.get(conversation_id)
|
|
||||||
if conv is None:
|
|
||||||
return []
|
|
||||||
return conv.messages[-limit:]
|
|
||||||
|
|
||||||
def list_conversations(self, limit: int = 20) -> list[Conversation]:
|
|
||||||
sorted_convs = sorted(
|
|
||||||
self._conversations.values(), key=lambda c: c.updated_at, reverse=True
|
|
||||||
)
|
|
||||||
return sorted_convs[:limit]
|
|
||||||
|
|
||||||
|
|
||||||
# Heartbeat timeout in seconds — 0 disables timeout (for testing)
|
# Heartbeat timeout in seconds — 0 disables timeout (for testing)
|
||||||
_WS_HEARTBEAT_TIMEOUT = float(os.environ.get("AGENTKIT_WS_TIMEOUT", "120"))
|
_WS_HEARTBEAT_TIMEOUT = float(os.environ.get("AGENTKIT_WS_TIMEOUT", "120"))
|
||||||
_conversation_store = SqliteConversationStore()
|
_conversation_store = SqliteConversationStore()
|
||||||
|
|
|
||||||
|
|
@ -60,7 +60,6 @@ async def submit_task(request: SubmitTaskRequest, req: Request):
|
||||||
|
|
||||||
pool = req.app.state.agent_pool
|
pool = req.app.state.agent_pool
|
||||||
skill_registry = req.app.state.skill_registry
|
skill_registry = req.app.state.skill_registry
|
||||||
intent_router = req.app.state.intent_router
|
|
||||||
quality_gate = req.app.state.quality_gate
|
quality_gate = req.app.state.quality_gate
|
||||||
output_standardizer = req.app.state.output_standardizer
|
output_standardizer = req.app.state.output_standardizer
|
||||||
|
|
||||||
|
|
@ -93,7 +92,7 @@ async def submit_task(request: SubmitTaskRequest, req: Request):
|
||||||
if agent is None:
|
if agent is None:
|
||||||
agent = await pool.create_agent_from_skill(request.skill_name)
|
agent = await pool.create_agent_from_skill(request.skill_name)
|
||||||
|
|
||||||
# 3. Otherwise, use Intent Router to find matching skill
|
# 3. Otherwise, use default agent (REACT mode — LLM decides autonomously)
|
||||||
else:
|
else:
|
||||||
all_skills = skill_registry.list_skills()
|
all_skills = skill_registry.list_skills()
|
||||||
if not all_skills:
|
if not all_skills:
|
||||||
|
|
@ -101,15 +100,10 @@ async def submit_task(request: SubmitTaskRequest, req: Request):
|
||||||
status_code=400,
|
status_code=400,
|
||||||
detail="No skills registered and no skill_name or agent_name specified",
|
detail="No skills registered and no skill_name or agent_name specified",
|
||||||
)
|
)
|
||||||
try:
|
agent = pool.get_agent("default")
|
||||||
routing_result = await intent_router.route(request.input_data, all_skills)
|
|
||||||
skill = skill_registry.get(routing_result.matched_skill)
|
|
||||||
# Get or create agent for this skill
|
|
||||||
agent = pool.get_agent(routing_result.matched_skill)
|
|
||||||
if agent is None:
|
if agent is None:
|
||||||
agent = await pool.create_agent_from_skill(routing_result.matched_skill)
|
# Fallback: create from first available skill
|
||||||
except (ValueError, RuntimeError) as e:
|
agent = await pool.create_agent_from_skill(all_skills[0].name)
|
||||||
raise HTTPException(status_code=400, detail=str(e))
|
|
||||||
|
|
||||||
# 4. Async mode: submit to background runner
|
# 4. Async mode: submit to background runner
|
||||||
if request.mode == "async":
|
if request.mode == "async":
|
||||||
|
|
@ -214,7 +208,6 @@ async def stream_task(request: SubmitTaskRequest, req: Request):
|
||||||
|
|
||||||
pool = req.app.state.agent_pool
|
pool = req.app.state.agent_pool
|
||||||
skill_registry = req.app.state.skill_registry
|
skill_registry = req.app.state.skill_registry
|
||||||
intent_router = req.app.state.intent_router
|
|
||||||
|
|
||||||
agent = None
|
agent = None
|
||||||
|
|
||||||
|
|
@ -244,14 +237,10 @@ async def stream_task(request: SubmitTaskRequest, req: Request):
|
||||||
status_code=400,
|
status_code=400,
|
||||||
detail="No skills registered and no skill_name or agent_name specified",
|
detail="No skills registered and no skill_name or agent_name specified",
|
||||||
)
|
)
|
||||||
try:
|
agent = pool.get_agent("default")
|
||||||
routing_result = await intent_router.route(request.input_data, all_skills)
|
|
||||||
skill_registry.get(routing_result.matched_skill)
|
|
||||||
agent = pool.get_agent(routing_result.matched_skill)
|
|
||||||
if agent is None:
|
if agent is None:
|
||||||
agent = await pool.create_agent_from_skill(routing_result.matched_skill)
|
# Fallback: create from first available skill
|
||||||
except (ValueError, RuntimeError) as e:
|
agent = await pool.create_agent_from_skill(all_skills[0].name)
|
||||||
raise HTTPException(status_code=400, detail=str(e))
|
|
||||||
|
|
||||||
async def event_generator():
|
async def event_generator():
|
||||||
import logging
|
import logging
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue