fischer-agentkit/src/agentkit/server/routes/metrics.py

71 lines
2.0 KiB
Python

"""Metrics route — /api/v1/metrics"""
from fastapi import APIRouter, Request
router = APIRouter(tags=["metrics"])
@router.get("/metrics")
async def get_metrics(request: Request):
"""Get application metrics"""
app = request.app
# Task metrics from TaskStore
task_store = getattr(app.state, "task_store", None)
task_metrics = {
"total_tasks": 0,
"completed_tasks": 0,
"failed_tasks": 0,
"pending_tasks": 0,
}
if task_store:
try:
all_tasks = task_store.list_tasks(limit=10000)
task_metrics["total_tasks"] = len(all_tasks)
task_metrics["completed_tasks"] = len(
[t for t in all_tasks if t.status.value == "completed"]
)
task_metrics["failed_tasks"] = len(
[t for t in all_tasks if t.status.value == "failed"]
)
task_metrics["pending_tasks"] = len(
[t for t in all_tasks if t.status.value == "pending"]
)
except Exception:
pass
# Agent pool metrics
agent_pool = getattr(app.state, "agent_pool", None)
agent_metrics: dict = {
"total_agents": 0,
"agent_names": [],
}
if agent_pool:
try:
agents = agent_pool.list_agents()
agent_metrics["total_agents"] = len(agents)
agent_metrics["agent_names"] = [a.get("name", "") for a in agents]
except Exception:
pass
# Skill registry metrics
skill_registry = getattr(app.state, "skill_registry", None)
skill_metrics: dict = {
"total_skills": 0,
"skill_names": [],
}
if skill_registry:
try:
skills = skill_registry.list_skills()
skill_metrics["total_skills"] = len(skills)
skill_metrics["skill_names"] = [s.name for s in skills]
except Exception:
pass
return {
"tasks": task_metrics,
"agents": agent_metrics,
"skills": skill_metrics,
"version": "2.0.0",
}