fischer-agentkit/src/agentkit/cli/admin.py

1450 lines
50 KiB
Python

"""Admin management CLI commands.
This Typer sub-app exposes all admin REST endpoints under
``/api/v1/admin/*`` as CLI commands. It is registered on the main
``agentkit`` app as ``agentkit admin <group> <command>``.
Authentication is handled by :class:`AdminHttpClient`, which reads
credentials from args, env vars, or ``~/.agentkit/admin_config.yaml``.
Run ``agentkit admin login`` once to populate the config file.
"""
from __future__ import annotations
import json as _json
from pathlib import Path
from typing import Optional
import httpx
import typer
import yaml
from rich import print as rprint
from rich.console import Console
from rich.table import Table
from agentkit.cli.admin_client import DEFAULT_CONFIG_PATH, AdminHttpClient
admin_app = typer.Typer(
name="admin",
help="Admin management commands (departments, users, LLM, skills, KB, usage).",
no_args_is_help=True,
)
console = Console()
# ---------------------------------------------------------------------------
# Shared option definitions (used as defaults via `= typer.Option(...)`)
# ---------------------------------------------------------------------------
ServerUrlOption = typer.Option(
None, "--server-url", "-s", help="Server URL (default: http://localhost:8001)"
)
TokenOption = typer.Option(None, "--token", "-t", help="JWT access token")
ApiKeyOption = typer.Option(None, "--api-key", "-k", help="API key")
JsonFlag = typer.Option(False, "--json", help="Output raw JSON instead of a Rich table")
# ---------------------------------------------------------------------------
# Error handling helper
# ---------------------------------------------------------------------------
def _handle_http_error(e: httpx.HTTPStatusError, server_url: str) -> None:
"""Translate an HTTP error into a friendly message and exit."""
status = e.response.status_code
if status == 401:
rprint("[red]Error: Authentication failed. Run 'agentkit admin login' first.[/red]")
elif status == 403:
rprint("[red]Error: Admin permission required.[/red]")
elif status == 404:
rprint("[red]Error: Resource not found.[/red]")
elif status == 409:
try:
detail = e.response.json().get("detail", "")
except ValueError:
detail = e.response.text
rprint(f"[red]Error: Conflict — {detail}[/red]")
else:
rprint(f"[red]Error: {status}{e.response.text}[/red]")
raise typer.Exit(1)
def _handle_connect_error(server_url: str) -> None:
rprint(f"[red]Error: Cannot connect to server at {server_url}[/red]")
rprint("[dim]Is the server running? Start it with: agentkit serve[/dim]")
raise typer.Exit(1)
def _build_client(
server_url: Optional[str],
token: Optional[str],
api_key: Optional[str],
) -> AdminHttpClient:
"""Construct an :class:`AdminHttpClient` from CLI options."""
return AdminHttpClient.from_config(
server_url=server_url,
token=token,
api_key=api_key,
)
def _emit_json(data: object) -> None:
rprint(_json.dumps(data, indent=2, ensure_ascii=False, default=str))
def _safe_get(
client: AdminHttpClient, path: str, server_url: str, params: dict | None = None
) -> object:
try:
return client.get(path, params=params)
except httpx.ConnectError:
_handle_connect_error(server_url)
except httpx.HTTPStatusError as e:
_handle_http_error(e, server_url)
def _safe_post(
client: AdminHttpClient, path: str, server_url: str, body: dict | None = None
) -> object:
try:
return client.post(path, json=body)
except httpx.ConnectError:
_handle_connect_error(server_url)
except httpx.HTTPStatusError as e:
_handle_http_error(e, server_url)
def _safe_patch(
client: AdminHttpClient, path: str, server_url: str, body: dict | None = None
) -> object:
try:
return client.patch(path, json=body)
except httpx.ConnectError:
_handle_connect_error(server_url)
except httpx.HTTPStatusError as e:
_handle_http_error(e, server_url)
def _safe_put(client: AdminHttpClient, path: str, server_url: str, body: dict | None = None) -> object:
try:
return client.put(path, json=body)
except httpx.ConnectError:
_handle_connect_error(server_url)
except httpx.HTTPStatusError as e:
_handle_http_error(e, server_url)
def _safe_delete(
client: AdminHttpClient, path: str, server_url: str, params: dict | None = None
) -> object:
try:
return client.delete(path, params=params)
except httpx.ConnectError:
_handle_connect_error(server_url)
except httpx.HTTPStatusError as e:
_handle_http_error(e, server_url)
# ---------------------------------------------------------------------------
# Login command (top-level)
# ---------------------------------------------------------------------------
@admin_app.command("login")
def admin_login(
username: str = typer.Option(..., "--username", "-u", help="Admin username"),
password: str = typer.Option(
..., "--password", "-p", help="Admin password", prompt=True, hide_input=True
),
server_url: Optional[str] = ServerUrlOption,
config_path: Optional[str] = typer.Option(
None, "--config-path", help="Path to admin config file"
),
) -> None:
"""Login and save the access token to ``~/.agentkit/admin_config.yaml``."""
resolved_url = server_url or "http://localhost:8001"
client = AdminHttpClient(resolved_url)
try:
token = client.login(username, password)
except httpx.ConnectError:
_handle_connect_error(resolved_url)
except httpx.HTTPStatusError as e:
_handle_http_error(e, resolved_url)
path = Path(config_path) if config_path else DEFAULT_CONFIG_PATH
path.parent.mkdir(parents=True, exist_ok=True)
config = {"server_url": resolved_url, "token": token}
path.write_text(yaml.dump(config, default_flow_style=False, allow_unicode=True))
rprint(f"[green]Login successful. Token saved to {path}[/green]")
# ---------------------------------------------------------------------------
# Department sub-group
# ---------------------------------------------------------------------------
dept_app = typer.Typer(name="department", help="Department management", no_args_is_help=True)
admin_app.add_typer(dept_app, name="department")
@dept_app.command("list")
def dept_list(
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""List all departments."""
client = _build_client(server_url, token, api_key)
depts = _safe_get(client, "/api/v1/admin/departments", client.base_url)
if json_output:
_emit_json(depts)
return
if not depts:
rprint("[dim]No departments found[/dim]")
return
table = Table(title="Departments")
table.add_column("ID", style="cyan")
table.add_column("Name")
table.add_column("Description")
table.add_column("Active")
table.add_column("Created")
for d in depts:
table.add_row(
str(d.get("id", "")),
str(d.get("name", "")),
str(d.get("description", "")),
"" if d.get("is_active") else "",
str(d.get("created_at", "")),
)
console.print(table)
@dept_app.command("create")
def dept_create(
name: str = typer.Option(..., "--name", "-n", help="Department name"),
description: str = typer.Option("", "--description", "-d", help="Department description"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Create a department."""
client = _build_client(server_url, token, api_key)
result = _safe_post(
client,
"/api/v1/admin/departments",
client.base_url,
{"name": name, "description": description},
)
if json_output:
_emit_json(result)
return
rprint(f"[green]Department created:[/green] {result.get('name')} (id={result.get('id')})")
@dept_app.command("update")
def dept_update(
dept_id: str = typer.Argument(..., help="Department ID"),
name: Optional[str] = typer.Option(None, "--name", "-n", help="New name"),
description: Optional[str] = typer.Option(None, "--description", "-d", help="New description"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Update a department's name or description."""
body: dict[str, object] = {}
if name is not None:
body["name"] = name
if description is not None:
body["description"] = description
if not body:
rprint("[red]Error: Provide --name or --description to update[/red]")
raise typer.Exit(1)
client = _build_client(server_url, token, api_key)
result = _safe_patch(client, f"/api/v1/admin/departments/{dept_id}", client.base_url, body)
if json_output:
_emit_json(result)
return
rprint(f"[green]Department updated:[/green] {result.get('name')} (id={result.get('id')})")
@dept_app.command("delete")
def dept_delete(
dept_id: str = typer.Argument(..., help="Department ID"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Delete a department."""
client = _build_client(server_url, token, api_key)
result = _safe_delete(client, f"/api/v1/admin/departments/{dept_id}", client.base_url)
if json_output:
_emit_json(result)
return
rprint(f"[green]Department deleted:[/green] {dept_id}")
@dept_app.command("enable")
def dept_enable(
dept_id: str = typer.Argument(..., help="Department ID"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Enable a disabled department."""
client = _build_client(server_url, token, api_key)
result = _safe_post(client, f"/api/v1/admin/departments/{dept_id}/enable", client.base_url)
if json_output:
_emit_json(result)
return
rprint(f"[green]Department enabled:[/green] {dept_id}")
@dept_app.command("disable")
def dept_disable(
dept_id: str = typer.Argument(..., help="Department ID"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Disable a department."""
client = _build_client(server_url, token, api_key)
result = _safe_post(client, f"/api/v1/admin/departments/{dept_id}/disable", client.base_url)
if json_output:
_emit_json(result)
return
rprint(f"[green]Department disabled:[/green] {dept_id}")
@dept_app.command("bind-skill")
def dept_bind_skill(
dept_id: str = typer.Argument(..., help="Department ID"),
skill_name: str = typer.Argument(..., help="Skill name"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Bind a skill to a department."""
client = _build_client(server_url, token, api_key)
result = _safe_post(
client,
f"/api/v1/admin/departments/{dept_id}/skills/{skill_name}",
client.base_url,
)
if json_output:
_emit_json(result)
return
rprint(f"[green]Skill '{skill_name}' bound to department {dept_id}[/green]")
@dept_app.command("unbind-skill")
def dept_unbind_skill(
dept_id: str = typer.Argument(..., help="Department ID"),
skill_name: str = typer.Argument(..., help="Skill name"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Unbind a skill from a department."""
client = _build_client(server_url, token, api_key)
result = _safe_delete(
client,
f"/api/v1/admin/departments/{dept_id}/skills/{skill_name}",
client.base_url,
)
if json_output:
_emit_json(result)
return
rprint(f"[green]Skill '{skill_name}' unbound from department {dept_id}[/green]")
@dept_app.command("bind-kb")
def dept_bind_kb(
dept_id: str = typer.Argument(..., help="Department ID"),
source_id: str = typer.Argument(..., help="KB source ID"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Bind a KB source to a department."""
client = _build_client(server_url, token, api_key)
result = _safe_post(
client,
f"/api/v1/admin/departments/{dept_id}/kb/{source_id}",
client.base_url,
)
if json_output:
_emit_json(result)
return
rprint(f"[green]KB source '{source_id}' bound to department {dept_id}[/green]")
@dept_app.command("unbind-kb")
def dept_unbind_kb(
dept_id: str = typer.Argument(..., help="Department ID"),
source_id: str = typer.Argument(..., help="KB source ID"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Unbind a KB source from a department."""
client = _build_client(server_url, token, api_key)
result = _safe_delete(
client,
f"/api/v1/admin/departments/{dept_id}/kb/{source_id}",
client.base_url,
)
if json_output:
_emit_json(result)
return
rprint(f"[green]KB source '{source_id}' unbound from department {dept_id}[/green]")
@dept_app.command("list-skills")
def dept_list_skills(
dept_id: str = typer.Argument(..., help="Department ID"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""List skills bound to a department."""
client = _build_client(server_url, token, api_key)
skills = _safe_get(
client,
f"/api/v1/admin/departments/{dept_id}/skills",
client.base_url,
)
if json_output:
_emit_json(skills)
return
if not skills:
rprint(f"[dim]No skills bound to department {dept_id}[/dim]")
return
table = Table(title=f"Skills for department {dept_id}")
table.add_column("Skill Name", style="cyan")
for s in skills:
table.add_row(str(s))
console.print(table)
@dept_app.command("list-kb")
def dept_list_kb(
dept_id: str = typer.Argument(..., help="Department ID"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""List KB sources bound to a department."""
client = _build_client(server_url, token, api_key)
sources = _safe_get(
client,
f"/api/v1/admin/departments/{dept_id}/kb",
client.base_url,
)
if json_output:
_emit_json(sources)
return
if not sources:
rprint(f"[dim]No KB sources bound to department {dept_id}[/dim]")
return
table = Table(title=f"KB sources for department {dept_id}")
table.add_column("Source ID", style="cyan")
for s in sources:
table.add_row(str(s))
console.print(table)
@dept_app.command("list-quotas")
def dept_list_quotas(
dept_id: str = typer.Argument(..., help="Department ID"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""List quotas for a department."""
client = _build_client(server_url, token, api_key)
quotas = _safe_get(
client,
f"/api/v1/admin/departments/{dept_id}/quotas",
client.base_url,
)
if json_output:
_emit_json(quotas)
return
if not quotas:
rprint(f"[dim]No quotas set for department {dept_id}[/dim]")
return
table = Table(title=f"Quotas for department {dept_id}")
table.add_column("Type", style="cyan")
table.add_column("Limit")
table.add_column("Period")
for q in quotas:
table.add_row(
str(q.get("quota_type", "")),
str(q.get("limit_value", "")),
str(q.get("period", "")),
)
console.print(table)
@dept_app.command("set-quota")
def dept_set_quota(
dept_id: str = typer.Argument(..., help="Department ID"),
quota_type: str = typer.Option(..., "--type", help="Quota type (token/cost/model_whitelist)"),
limit_value: str = typer.Option(
..., "--limit", help="Limit value (int for token/cost, comma-separated for model_whitelist)"
),
period: str = typer.Option("daily", "--period", help="Quota period (daily/monthly)"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Set (upsert) a quota for a department."""
if quota_type == "model_whitelist":
limit: int | list[str] = [s.strip() for s in limit_value.split(",") if s.strip()]
else:
try:
limit = int(limit_value)
except ValueError:
rprint(f"[red]Error: --limit must be an integer for quota_type={quota_type}[/red]")
raise typer.Exit(1)
client = _build_client(server_url, token, api_key)
result = _safe_put(
client,
f"/api/v1/admin/departments/{dept_id}/quotas",
client.base_url,
{"quota_type": quota_type, "limit_value": limit, "period": period},
)
if json_output:
_emit_json(result)
return
rprint(f"[green]Quota set:[/green] {quota_type}={limit} ({period}) for {dept_id}")
@dept_app.command("delete-quota")
def dept_delete_quota(
dept_id: str = typer.Argument(..., help="Department ID"),
quota_type: str = typer.Option(..., "--type", help="Quota type to delete"),
period: str = typer.Option("daily", "--period", help="Quota period"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Delete a quota for a department."""
client = _build_client(server_url, token, api_key)
result = _safe_delete(
client,
f"/api/v1/admin/departments/{dept_id}/quotas",
client.base_url,
{"quota_type": quota_type, "period": period},
)
if json_output:
_emit_json(result)
return
rprint(f"[green]Quota deleted:[/green] {quota_type} ({period}) for {dept_id}")
# ---------------------------------------------------------------------------
# User sub-group
# ---------------------------------------------------------------------------
user_app = typer.Typer(name="user", help="User management", no_args_is_help=True)
admin_app.add_typer(user_app, name="user")
@user_app.command("list")
def user_list(
department_id: Optional[str] = typer.Option(
None, "--department-id", "-d", help="Filter by department ID"
),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""List users, optionally filtered by department."""
client = _build_client(server_url, token, api_key)
params: dict[str, object] = {}
if department_id:
params["department_id"] = department_id
users = _safe_get(client, "/api/v1/admin/users", client.base_url, params=params or None)
if json_output:
_emit_json(users)
return
if not users:
rprint("[dim]No users found[/dim]")
return
table = Table(title="Users")
table.add_column("ID", style="cyan")
table.add_column("Username")
table.add_column("Email")
table.add_column("Role")
table.add_column("Active")
for u in users:
table.add_row(
str(u.get("id", "")),
str(u.get("username", "")),
str(u.get("email", "")),
str(u.get("role", "")),
"" if u.get("is_active") else "",
)
console.print(table)
@user_app.command("create")
def user_create(
username: str = typer.Option(..., "--username", "-u", help="Username"),
email: str = typer.Option(..., "--email", "-e", help="Email"),
password: str = typer.Option(
..., "--password", "-p", help="Password", prompt=True, hide_input=True
),
role: str = typer.Option("member", "--role", "-r", help="Role (member/operator/admin)"),
department_ids: Optional[str] = typer.Option(
None, "--department-ids", help="Comma-separated department IDs"
),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Create a new user."""
body: dict[str, object] = {
"username": username,
"email": email,
"password": password,
"role": role,
}
if department_ids:
body["department_ids"] = [s.strip() for s in department_ids.split(",") if s.strip()]
client = _build_client(server_url, token, api_key)
result = _safe_post(client, "/api/v1/admin/users", client.base_url, body)
if json_output:
_emit_json(result)
return
rprint(f"[green]User created:[/green] {result.get('username')} (id={result.get('id')})")
@user_app.command("update")
def user_update(
user_id: str = typer.Argument(..., help="User ID"),
role: Optional[str] = typer.Option(None, "--role", "-r", help="New role"),
is_active: Optional[bool] = typer.Option(None, "--active/--inactive", help="Active flag"),
is_terminal_authorized: Optional[bool] = typer.Option(
None, "--terminal-authorized/--no-terminal-authorized"
),
is_server_terminal_authorized: Optional[bool] = typer.Option(
None, "--server-terminal-authorized/--no-server-terminal-authorized"
),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Update a user's role or active flag."""
body: dict[str, object] = {}
if role is not None:
body["role"] = role
if is_active is not None:
body["is_active"] = is_active
if is_terminal_authorized is not None:
body["is_terminal_authorized"] = is_terminal_authorized
if is_server_terminal_authorized is not None:
body["is_server_terminal_authorized"] = is_server_terminal_authorized
if not body:
rprint("[red]Error: Provide at least one field to update[/red]")
raise typer.Exit(1)
client = _build_client(server_url, token, api_key)
result = _safe_patch(client, f"/api/v1/admin/users/{user_id}", client.base_url, body)
if json_output:
_emit_json(result)
return
rprint(f"[green]User updated:[/green] {result.get('username')} (id={result.get('id')})")
@user_app.command("delete")
def user_delete(
user_id: str = typer.Argument(..., help="User ID"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Soft-delete a user (sets is_active=False)."""
client = _build_client(server_url, token, api_key)
result = _safe_delete(client, f"/api/v1/admin/users/{user_id}", client.base_url)
if json_output:
_emit_json(result)
return
rprint(f"[green]User deleted:[/green] {user_id}")
@user_app.command("reset-password")
def user_reset_password(
user_id: str = typer.Argument(..., help="User ID"),
new_password: str = typer.Option(
..., "--password", "-p", help="New password", prompt=True, hide_input=True
),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Reset a user's password (revokes all their sessions)."""
client = _build_client(server_url, token, api_key)
result = _safe_post(
client,
f"/api/v1/admin/users/{user_id}/reset-password",
client.base_url,
{"new_password": new_password},
)
if json_output:
_emit_json(result)
return
rprint(f"[green]Password reset for user:[/green] {user_id}")
@user_app.command("assign-department")
def user_assign_department(
user_id: str = typer.Argument(..., help="User ID"),
dept_id: str = typer.Argument(..., help="Department ID"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Assign a user to a department."""
client = _build_client(server_url, token, api_key)
result = _safe_post(
client,
f"/api/v1/admin/users/{user_id}/departments/{dept_id}",
client.base_url,
)
if json_output:
_emit_json(result)
return
rprint(f"[green]User {user_id} assigned to department {dept_id}[/green]")
@user_app.command("remove-department")
def user_remove_department(
user_id: str = typer.Argument(..., help="User ID"),
dept_id: str = typer.Argument(..., help="Department ID"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Remove a user from a department."""
client = _build_client(server_url, token, api_key)
result = _safe_delete(
client,
f"/api/v1/admin/users/{user_id}/departments/{dept_id}",
client.base_url,
)
if json_output:
_emit_json(result)
return
rprint(f"[green]User {user_id} removed from department {dept_id}[/green]")
# ---------------------------------------------------------------------------
# LLM sub-group
# ---------------------------------------------------------------------------
llm_app = typer.Typer(name="llm", help="LLM configuration management", no_args_is_help=True)
admin_app.add_typer(llm_app, name="llm")
@llm_app.command("list-providers")
def llm_list_providers(
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""List all LLM providers (API keys masked)."""
client = _build_client(server_url, token, api_key)
providers = _safe_get(client, "/api/v1/admin/llm/providers", client.base_url)
if json_output:
_emit_json(providers)
return
if not providers:
rprint("[dim]No LLM providers configured[/dim]")
return
table = Table(title="LLM Providers")
table.add_column("Name", style="cyan")
table.add_column("Type")
table.add_column("Base URL")
table.add_column("API Key")
table.add_column("Max Tokens")
table.add_column("Timeout")
for p in providers:
table.add_row(
str(p.get("name", "")),
str(p.get("type", "")),
str(p.get("base_url", "")),
str(p.get("api_key", "")),
str(p.get("max_tokens", "")),
str(p.get("timeout", "")),
)
console.print(table)
@llm_app.command("add-provider")
def llm_add_provider(
name: str = typer.Option(..., "--name", "-n", help="Provider name"),
type: str = typer.Option("openai", "--type", help="Provider type"),
api_key: str = typer.Option(..., "--api-key", help="API key"),
base_url: str = typer.Option("", "--base-url", help="Base URL override"),
max_tokens: int = typer.Option(4096, "--max-tokens", help="Max tokens"),
timeout: float = typer.Option(60.0, "--timeout", help="Timeout in seconds"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key_opt: Optional[str] = typer.Option(
None, "--api-key-auth", "-k", help="Admin API key (auth)"
),
json_output: bool = JsonFlag,
) -> None:
"""Create a new LLM provider."""
body: dict[str, object] = {
"name": name,
"type": type,
"api_key": api_key,
"base_url": base_url,
"max_tokens": max_tokens,
"timeout": timeout,
}
client = _build_client(server_url, token, api_key_opt)
result = _safe_post(client, "/api/v1/admin/llm/providers", client.base_url, body)
if json_output:
_emit_json(result)
return
rprint(f"[green]Provider created:[/green] {result.get('name')}")
@llm_app.command("update-provider")
def llm_update_provider(
name: str = typer.Argument(..., help="Provider name"),
type: Optional[str] = typer.Option(None, "--type", help="Provider type"),
api_key: Optional[str] = typer.Option(None, "--api-key", help="API key"),
base_url: Optional[str] = typer.Option(None, "--base-url", help="Base URL"),
max_tokens: Optional[int] = typer.Option(None, "--max-tokens", help="Max tokens"),
timeout: Optional[float] = typer.Option(None, "--timeout", help="Timeout"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key_opt: Optional[str] = typer.Option(
None, "--api-key-auth", "-k", help="Admin API key (auth)"
),
json_output: bool = JsonFlag,
) -> None:
"""Update an LLM provider's configuration."""
body: dict[str, object] = {}
if type is not None:
body["type"] = type
if api_key is not None:
body["api_key"] = api_key
if base_url is not None:
body["base_url"] = base_url
if max_tokens is not None:
body["max_tokens"] = max_tokens
if timeout is not None:
body["timeout"] = timeout
if not body:
rprint("[red]Error: Provide at least one field to update[/red]")
raise typer.Exit(1)
client = _build_client(server_url, token, api_key_opt)
result = _safe_patch(client, f"/api/v1/admin/llm/providers/{name}", client.base_url, body)
if json_output:
_emit_json(result)
return
rprint(f"[green]Provider updated:[/green] {result.get('name')}")
@llm_app.command("delete-provider")
def llm_delete_provider(
name: str = typer.Argument(..., help="Provider name"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Delete an LLM provider."""
client = _build_client(server_url, token, api_key)
result = _safe_delete(client, f"/api/v1/admin/llm/providers/{name}", client.base_url)
if json_output:
_emit_json(result)
return
rprint(f"[green]Provider deleted:[/green] {name}")
@llm_app.command("set-api-key")
def llm_set_api_key(
name: str = typer.Argument(..., help="Provider name"),
api_key: str = typer.Option(..., "--api-key", help="New API key"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key_opt: Optional[str] = typer.Option(
None, "--api-key-auth", "-k", help="Admin API key (auth)"
),
json_output: bool = JsonFlag,
) -> None:
"""Set the API key for a provider (stored in .env, not YAML)."""
client = _build_client(server_url, token, api_key_opt)
result = _safe_post(
client,
f"/api/v1/admin/llm/providers/{name}/api-key",
client.base_url,
{"api_key": api_key},
)
if json_output:
_emit_json(result)
return
rprint(f"[green]API key set for provider:[/green] {name}")
@llm_app.command("list-fallbacks")
def llm_list_fallbacks(
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""List all fallback chains."""
client = _build_client(server_url, token, api_key)
fallbacks = _safe_get(client, "/api/v1/admin/llm/fallbacks", client.base_url)
if json_output:
_emit_json(fallbacks)
return
if not fallbacks:
rprint("[dim]No fallback chains configured[/dim]")
return
table = Table(title="LLM Fallback Chains")
table.add_column("Model", style="cyan")
table.add_column("Chain")
for model, chain in fallbacks.items():
table.add_row(str(model), "".join(str(c) for c in chain))
console.print(table)
@llm_app.command("set-fallback")
def llm_set_fallback(
model: str = typer.Argument(..., help="Model name"),
chain: str = typer.Option(
...,
"--chain",
"-c",
help="Comma-separated fallback chain (e.g. provider1/model1,provider2/model2)",
),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Set the fallback chain for a model."""
chain_list = [s.strip() for s in chain.split(",") if s.strip()]
if not chain_list:
rprint("[red]Error: --chain must contain at least one entry[/red]")
raise typer.Exit(1)
client = _build_client(server_url, token, api_key)
result = _safe_put(
client,
f"/api/v1/admin/llm/fallbacks/{model}",
client.base_url,
{"chain": chain_list},
)
if json_output:
_emit_json(result)
return
rprint(f"[green]Fallback set for {model}:[/green] {''.join(chain_list)}")
@llm_app.command("delete-fallback")
def llm_delete_fallback(
model: str = typer.Argument(..., help="Model name"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Delete the fallback chain for a model."""
client = _build_client(server_url, token, api_key)
result = _safe_delete(client, f"/api/v1/admin/llm/fallbacks/{model}", client.base_url)
if json_output:
_emit_json(result)
return
rprint(f"[green]Fallback deleted for model:[/green] {model}")
# ---------------------------------------------------------------------------
# Skill sub-group
# ---------------------------------------------------------------------------
skill_app = typer.Typer(name="skill", help="Skill management (admin)", no_args_is_help=True)
admin_app.add_typer(skill_app, name="skill")
@skill_app.command("list")
def skill_list(
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""List all skills (calls GET /skills, not /admin/skills)."""
client = _build_client(server_url, token, api_key)
skills = _safe_get(client, "/api/v1/skills", client.base_url)
if json_output:
_emit_json(skills)
return
if not skills:
rprint("[dim]No skills registered[/dim]")
return
table = Table(title="Skills")
table.add_column("Name", style="cyan")
table.add_column("Type")
table.add_column("Description")
for s in skills:
table.add_row(
str(s.get("name", "")),
str(s.get("agent_type", "")),
str(s.get("description", "")),
)
console.print(table)
@skill_app.command("enable")
def skill_enable(
name: str = typer.Argument(..., help="Skill name"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Enable a previously-disabled skill."""
client = _build_client(server_url, token, api_key)
result = _safe_post(client, f"/api/v1/admin/skills/{name}/enable", client.base_url)
if json_output:
_emit_json(result)
return
rprint(f"[green]Skill enabled:[/green] {name}")
@skill_app.command("disable")
def skill_disable(
name: str = typer.Argument(..., help="Skill name"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Disable a skill (hides it from GET /skills)."""
client = _build_client(server_url, token, api_key)
result = _safe_post(client, f"/api/v1/admin/skills/{name}/disable", client.base_url)
if json_output:
_emit_json(result)
return
rprint(f"[green]Skill disabled:[/green] {name}")
@skill_app.command("import")
def skill_import(
yaml_file: str = typer.Argument(..., help="Path to skill YAML file"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Import a skill from a YAML file."""
path = Path(yaml_file)
if not path.exists():
rprint(f"[red]Error: File not found: {yaml_file}[/red]")
raise typer.Exit(1)
yaml_content = path.read_text(encoding="utf-8")
client = _build_client(server_url, token, api_key)
result = _safe_post(
client,
"/api/v1/admin/skills/import",
client.base_url,
{"yaml_content": yaml_content},
)
if json_output:
_emit_json(result)
return
rprint(f"[green]Skill imported:[/green] {result.get('name', yaml_file)}")
@skill_app.command("reload")
def skill_reload(
name: str = typer.Argument(..., help="Skill name"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Reload a skill from its YAML file."""
client = _build_client(server_url, token, api_key)
result = _safe_post(client, f"/api/v1/admin/skills/{name}/reload", client.base_url)
if json_output:
_emit_json(result)
return
rprint(f"[green]Skill reloaded:[/green] {name}")
# ---------------------------------------------------------------------------
# KB sub-group
# ---------------------------------------------------------------------------
kb_app = typer.Typer(name="kb", help="Knowledge base management", no_args_is_help=True)
admin_app.add_typer(kb_app, name="kb")
@kb_app.command("list-documents")
def kb_list_documents(
source_id: Optional[str] = typer.Option(None, "--source-id", help="Filter by source ID"),
department_id: Optional[str] = typer.Option(
None, "--department-id", help="Filter by department ID"
),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""List KB documents."""
client = _build_client(server_url, token, api_key)
params: dict[str, object] = {}
if source_id:
params["source_id"] = source_id
if department_id:
params["department_id"] = department_id
data = _safe_get(client, "/api/v1/admin/kb/documents", client.base_url, params=params or None)
documents = data.get("documents", []) if isinstance(data, dict) else data
if json_output:
_emit_json(documents)
return
if not documents:
rprint("[dim]No KB documents found[/dim]")
return
table = Table(title="KB Documents")
table.add_column("ID", style="cyan")
table.add_column("Filename")
table.add_column("Source ID")
table.add_column("Department ID")
table.add_column("Chunks")
table.add_column("Created")
for d in documents:
table.add_row(
str(d.get("document_id", "")),
str(d.get("filename", "")),
str(d.get("source_id", "")),
str(d.get("department_id", "")),
str(d.get("chunks", "")),
str(d.get("created_at", "")),
)
console.print(table)
@kb_app.command("upload")
def kb_upload(
filename: str = typer.Option(..., "--filename", "-f", help="Document filename"),
content_file: str = typer.Option(
..., "--content-file", "-c", help="Path to file with document content"
),
source_id: str = typer.Option("", "--source-id", help="KB source ID"),
department_id: Optional[str] = typer.Option(
None, "--department-id", help="Department ID to bind to"
),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Upload a KB document."""
path = Path(content_file)
if not path.exists():
rprint(f"[red]Error: File not found: {content_file}[/red]")
raise typer.Exit(1)
content = path.read_text(encoding="utf-8")
body: dict[str, object] = {
"filename": filename,
"content": content,
"source_id": source_id,
}
if department_id is not None:
body["department_id"] = department_id
client = _build_client(server_url, token, api_key)
result = _safe_post(client, "/api/v1/admin/kb/documents", client.base_url, body)
if json_output:
_emit_json(result)
return
rprint(f"[green]Document uploaded:[/green] {filename} (id={result.get('id')})")
@kb_app.command("delete")
def kb_delete(
document_id: str = typer.Argument(..., help="Document ID"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Delete a KB document."""
client = _build_client(server_url, token, api_key)
result = _safe_delete(client, f"/api/v1/admin/kb/documents/{document_id}", client.base_url)
if json_output:
_emit_json(result)
return
rprint(f"[green]Document deleted:[/green] {document_id}")
@kb_app.command("sync")
def kb_sync(
source_id: str = typer.Argument(..., help="KB source ID"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Trigger a sync for a KB source."""
client = _build_client(server_url, token, api_key)
result = _safe_post(client, f"/api/v1/admin/kb/sources/{source_id}/sync", client.base_url)
if json_output:
_emit_json(result)
return
rprint(f"[green]Sync triggered for source:[/green] {source_id}")
@kb_app.command("rebuild")
def kb_rebuild(
source_id: str = typer.Argument(..., help="KB source ID"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Rebuild the index for a KB source."""
client = _build_client(server_url, token, api_key)
result = _safe_post(client, f"/api/v1/admin/kb/sources/{source_id}/rebuild", client.base_url)
if json_output:
_emit_json(result)
return
rprint(f"[green]Rebuild triggered for source:[/green] {source_id}")
# ---------------------------------------------------------------------------
# Usage sub-group
# ---------------------------------------------------------------------------
usage_app = typer.Typer(name="usage", help="Usage dashboard", no_args_is_help=True)
admin_app.add_typer(usage_app, name="usage")
def _usage_params(
department_id: Optional[str],
user_id: Optional[str],
start: Optional[str],
end: Optional[str],
) -> dict[str, object]:
params: dict[str, object] = {}
if department_id:
params["department_id"] = department_id
if user_id:
params["user_id"] = user_id
if start:
params["start"] = start
if end:
params["end"] = end
return params
@usage_app.command("summary")
def usage_summary(
department_id: Optional[str] = typer.Option(None, "--department-id", "-d"),
user_id: Optional[str] = typer.Option(None, "--user-id", "-u"),
start: Optional[str] = typer.Option(None, "--start", help="ISO 8601 start timestamp"),
end: Optional[str] = typer.Option(None, "--end", help="ISO 8601 end timestamp"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Show aggregated usage summary."""
client = _build_client(server_url, token, api_key)
params = _usage_params(department_id, user_id, start, end)
data = _safe_get(client, "/api/v1/admin/usage/summary", client.base_url, params=params or None)
if json_output:
_emit_json(data)
return
table = Table(title="Usage Summary")
table.add_column("Metric", style="cyan")
table.add_column("Value")
for key, value in (data or {}).items():
if isinstance(value, float):
table.add_row(str(key), f"{value:.4f}")
elif isinstance(value, list | dict):
table.add_row(str(key), _json.dumps(value, default=str))
else:
table.add_row(str(key), str(value))
console.print(table)
@usage_app.command("timeseries")
def usage_timeseries(
department_id: Optional[str] = typer.Option(None, "--department-id", "-d"),
user_id: Optional[str] = typer.Option(None, "--user-id", "-u"),
start: Optional[str] = typer.Option(None, "--start"),
end: Optional[str] = typer.Option(None, "--end"),
interval: str = typer.Option("day", "--interval", help="day or hour"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Show time-bucketed usage series."""
client = _build_client(server_url, token, api_key)
params = _usage_params(department_id, user_id, start, end)
params["interval"] = interval
data = _safe_get(client, "/api/v1/admin/usage/timeseries", client.base_url, params=params)
if json_output:
_emit_json(data)
return
if not data:
rprint("[dim]No usage data[/dim]")
return
table = Table(title=f"Usage Timeseries (interval={interval})")
table.add_column("Bucket", style="cyan")
table.add_column("Requests")
table.add_column("Tokens")
table.add_column("Cost")
for row in data:
table.add_row(
str(row.get("bucket", "")),
str(row.get("requests", "")),
str(row.get("tokens", "")),
str(row.get("cost", "")),
)
console.print(table)
@usage_app.command("by-model")
def usage_by_model(
department_id: Optional[str] = typer.Option(None, "--department-id", "-d"),
user_id: Optional[str] = typer.Option(None, "--user-id", "-u"),
start: Optional[str] = typer.Option(None, "--start"),
end: Optional[str] = typer.Option(None, "--end"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Show per-model usage breakdown."""
client = _build_client(server_url, token, api_key)
params = _usage_params(department_id, user_id, start, end)
data = _safe_get(client, "/api/v1/admin/usage/by-model", client.base_url, params=params or None)
if json_output:
_emit_json(data)
return
if not data:
rprint("[dim]No usage data[/dim]")
return
table = Table(title="Usage by Model")
table.add_column("Model", style="cyan")
table.add_column("Requests")
table.add_column("Tokens")
table.add_column("Cost")
for row in data:
table.add_row(
str(row.get("model", "")),
str(row.get("requests", "")),
str(row.get("tokens", "")),
str(row.get("cost", "")),
)
console.print(table)
@usage_app.command("top-users")
def usage_top_users(
department_id: Optional[str] = typer.Option(None, "--department-id", "-d"),
start: Optional[str] = typer.Option(None, "--start"),
end: Optional[str] = typer.Option(None, "--end"),
limit: int = typer.Option(10, "--limit", "-n", help="Top N users"),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
json_output: bool = JsonFlag,
) -> None:
"""Show top-N users by token usage."""
client = _build_client(server_url, token, api_key)
params: dict[str, object] = {"limit": limit}
if department_id:
params["department_id"] = department_id
if start:
params["start"] = start
if end:
params["end"] = end
data = _safe_get(client, "/api/v1/admin/usage/top-users", client.base_url, params=params)
if json_output:
_emit_json(data)
return
if not data:
rprint("[dim]No usage data[/dim]")
return
table = Table(title=f"Top {limit} Users by Tokens")
table.add_column("Rank", style="cyan")
table.add_column("User ID")
table.add_column("Requests")
table.add_column("Tokens")
table.add_column("Cost")
for i, row in enumerate(data, 1):
table.add_row(
str(i),
str(row.get("user_id", "")),
str(row.get("requests", "")),
str(row.get("tokens", "")),
str(row.get("cost", "")),
)
console.print(table)
@usage_app.command("export")
def usage_export(
department_id: Optional[str] = typer.Option(None, "--department-id", "-d"),
user_id: Optional[str] = typer.Option(None, "--user-id", "-u"),
start: Optional[str] = typer.Option(None, "--start"),
end: Optional[str] = typer.Option(None, "--end"),
format: str = typer.Option("csv", "--format", help="csv or json"),
output: Optional[str] = typer.Option(
None, "--output", "-o", help="Output file (default: stdout)"
),
server_url: Optional[str] = ServerUrlOption,
token: Optional[str] = TokenOption,
api_key: Optional[str] = ApiKeyOption,
) -> None:
"""Export raw usage records as CSV or JSON."""
client = _build_client(server_url, token, api_key)
params = _usage_params(department_id, user_id, start, end)
params["format"] = format
try:
body = client.get_text("/api/v1/admin/usage/export", params=params)
except httpx.ConnectError:
_handle_connect_error(client.base_url)
except httpx.HTTPStatusError as e:
_handle_http_error(e, client.base_url)
if output:
Path(output).write_text(body, encoding="utf-8")
rprint(f"[green]Export written to:[/green] {output}")
else:
# Print raw — no Rich formatting so the output is scriptable.
print(body)