"""Tests for U7: configuration sync engine. Covers: - Server-side config sync API (version, all, skills, agents, workflows) - Client-side ConfigSync (start, poll, cache, offline) """ from __future__ import annotations import asyncio import json import sqlite3 from datetime import datetime, timezone from pathlib import Path from typing import Any import pytest from fastapi import FastAPI from httpx import ASGITransport, AsyncClient from agentkit.client.sync import ConfigSync from agentkit.server.routes import config_sync # ── Test fixtures ───────────────────────────────────────────────────── class _FakeSkillConfig: """Minimal skill config mock with to_dict().""" def __init__(self, name: str, version: str = "1.0.0"): self.name = name self.version = version def to_dict(self) -> dict[str, Any]: return {"name": self.name, "version": self.version, "description": f"Skill {self.name}"} class _FakeSkill: def __init__(self, name: str, version: str = "1.0.0"): self.config = _FakeSkillConfig(name, version) class _FakeSkillRegistry: """Minimal SkillRegistry mock.""" def __init__(self, skills: dict[str, _FakeSkill] | None = None): self._skills = skills or {} def list_skills(self) -> list[str]: return sorted(self._skills.keys()) def get(self, name: str) -> _FakeSkill | None: return self._skills.get(name) class _FakeWorkflow: """Minimal workflow definition mock with model_dump().""" def __init__(self, workflow_id: str, name: str, version: int = 1): self.workflow_id = workflow_id self.name = name self.version = version self.stages: list = [] self.triggers: list = [] self.created_at = "2026-01-01T00:00:00Z" self.updated_at = "2026-01-01T00:00:00Z" def model_dump(self) -> dict[str, Any]: return { "workflow_id": self.workflow_id, "name": self.name, "version": self.version, "stages": self.stages, "triggers": self.triggers, "created_at": self.created_at, "updated_at": self.updated_at, } class _FakeWorkflowStore: def __init__(self, workflows: list[_FakeWorkflow] | None = None): self._workflows = workflows or [] def list_workflows(self) -> list[_FakeWorkflow]: return list(self._workflows) @pytest.fixture def config_app_with_data() -> FastAPI: """App with mock skill registry + workflow store.""" app = FastAPI() app.state.skill_registry = _FakeSkillRegistry({ "react_agent": _FakeSkill("react_agent", "1.0.0"), "code_reviewer": _FakeSkill("code_reviewer", "2.1.0"), }) app.state.workflow_store = _FakeWorkflowStore([ _FakeWorkflow("wf-1", "CI Pipeline", 1), _FakeWorkflow("wf-2", "Deploy", 3), ]) app.include_router(config_sync.router, prefix="/api/v1") # Dev-admin middleware @app.middleware("http") async def _set_dev_admin_user(request, call_next): request.state.current_user = { "user_id": "dev-admin", "username": "dev-admin", "role": "admin", "dev_mode": True, } return await call_next(request) return app @pytest.fixture def config_app_empty() -> FastAPI: """App with no skills or workflows.""" app = FastAPI() app.state.skill_registry = _FakeSkillRegistry() app.state.workflow_store = _FakeWorkflowStore() app.include_router(config_sync.router, prefix="/api/v1") @app.middleware("http") async def _set_dev_admin_user(request, call_next): request.state.current_user = { "user_id": "dev-admin", "username": "dev-admin", "role": "admin", "dev_mode": True, } return await call_next(request) return app @pytest.fixture async def config_client(config_app_with_data: FastAPI): transport = ASGITransport(app=config_app_with_data) async with AsyncClient(transport=transport, base_url="http://test") as client: yield client # ── Server API tests ────────────────────────────────────────────────── class TestConfigVersionEndpoint: """Test GET /api/v1/config/version.""" @pytest.mark.asyncio async def test_version_returns_hash(self, config_client: AsyncClient): resp = await config_client.get("/api/v1/config/version") assert resp.status_code == 200 data = resp.json() assert "version" in data assert len(data["version"]) == 64 # SHA-256 hex assert data["skill_count"] == 2 assert data["workflow_count"] == 2 assert "computed_at" in data @pytest.mark.asyncio async def test_version_is_stable(self, config_client: AsyncClient): """Same configs → same version hash.""" resp1 = await config_client.get("/api/v1/config/version") resp2 = await config_client.get("/api/v1/config/version") assert resp1.json()["version"] == resp2.json()["version"] @pytest.mark.asyncio async def test_version_empty_app(self, config_app_empty: FastAPI): transport = ASGITransport(app=config_app_empty) async with AsyncClient(transport=transport, base_url="http://test") as client: resp = await client.get("/api/v1/config/version") assert resp.status_code == 200 data = resp.json() assert data["skill_count"] == 0 assert data["workflow_count"] == 0 assert len(data["version"]) == 64 class TestConfigAllEndpoint: """Test GET /api/v1/config/all.""" @pytest.mark.asyncio async def test_all_returns_skills_and_workflows(self, config_client: AsyncClient): resp = await config_client.get("/api/v1/config/all") assert resp.status_code == 200 data = resp.json() assert "version" in data assert len(data["version"]) == 64 assert "synced_at" in data skills = data["skills"] assert len(skills) == 2 skill_names = {s["name"] for s in skills} assert skill_names == {"react_agent", "code_reviewer"} workflows = data["workflows"] assert len(workflows) == 2 wf_ids = {w["workflow_id"] for w in workflows} assert wf_ids == {"wf-1", "wf-2"} @pytest.mark.asyncio async def test_all_version_matches_version_endpoint(self, config_client: AsyncClient): all_resp = await config_client.get("/api/v1/config/all") ver_resp = await config_client.get("/api/v1/config/version") assert all_resp.json()["version"] == ver_resp.json()["version"] class TestConfigSkillsEndpoint: """Test GET /api/v1/config/skills.""" @pytest.mark.asyncio async def test_skills_returns_list(self, config_client: AsyncClient): resp = await config_client.get("/api/v1/config/skills") assert resp.status_code == 200 data = resp.json() assert len(data["skills"]) == 2 assert data["count"] == 2 assert "synced_at" in data class TestConfigAgentsEndpoint: """Test GET /api/v1/config/agents.""" @pytest.mark.asyncio async def test_agents_returns_list(self, config_client: AsyncClient): resp = await config_client.get("/api/v1/config/agents") assert resp.status_code == 200 data = resp.json() assert len(data["agents"]) == 2 assert data["count"] == 2 class TestConfigWorkflowsEndpoint: """Test GET /api/v1/config/workflows.""" @pytest.mark.asyncio async def test_workflows_returns_list(self, config_client: AsyncClient): resp = await config_client.get("/api/v1/config/workflows") assert resp.status_code == 200 data = resp.json() assert len(data["workflows"]) == 2 assert data["count"] == 2 # ── Client-side ConfigSync tests ────────────────────────────────────── @pytest.fixture async def sync_server(config_app_with_data: FastAPI): """Start the config app as an ASGI server on a random port.""" import uvicorn config = uvicorn.Config( config_app_with_data, host="127.0.0.1", port=0, # Random port log_level="warning", ) server = uvicorn.Server(config) task = asyncio.create_task(server.serve()) # Wait for server to start while not server.started: await asyncio.sleep(0.01) # Get the actual port sockets = list(server.servers[0].sockets) port = sockets[0].getsockname()[1] yield f"http://127.0.0.1:{port}" server.should_exit = True await task class TestConfigSync: """Test the client-side ConfigSync engine.""" @pytest.mark.asyncio async def test_start_pulls_configs(self, sync_server: str, tmp_path: Path): """ConfigSync.start() pulls configs from the server.""" sync = ConfigSync( server_url=sync_server, token_provider=None, cache_db_path=tmp_path / "cache.db", ) try: success = await sync.start() assert success is True assert sync.get_version() is not None assert len(sync.get_version()) == 64 skills = sync.get_skills() assert len(skills) == 2 workflows = sync.get_workflows() assert len(workflows) == 2 finally: await sync.stop() @pytest.mark.asyncio async def test_get_skill_by_name(self, sync_server: str, tmp_path: Path): sync = ConfigSync( server_url=sync_server, cache_db_path=tmp_path / "cache.db", ) try: await sync.start() skill = sync.get_skill("react_agent") assert skill is not None assert skill["name"] == "react_agent" missing = sync.get_skill("nonexistent") assert missing is None finally: await sync.stop() @pytest.mark.asyncio async def test_get_workflow_by_id(self, sync_server: str, tmp_path: Path): sync = ConfigSync( server_url=sync_server, cache_db_path=tmp_path / "cache.db", ) try: await sync.start() wf = sync.get_workflow("wf-1") assert wf is not None assert wf["name"] == "CI Pipeline" missing = sync.get_workflow("nonexistent") assert missing is None finally: await sync.stop() @pytest.mark.asyncio async def test_cache_persists_across_instances(self, sync_server: str, tmp_path: Path): """Configs cached by one instance are loadable by another.""" cache_path = tmp_path / "cache.db" # First instance: pull and cache sync1 = ConfigSync(server_url=sync_server, cache_db_path=cache_path) await sync1.start() version1 = sync1.get_version() assert version1 is not None await sync1.stop() # Second instance: no server, load from cache sync2 = ConfigSync( server_url="http://127.0.0.1:1", # Unreachable port cache_db_path=cache_path, ) try: success = await sync2.start() assert success is False # Server unreachable # But cache should be loaded assert sync2.get_version() == version1 assert len(sync2.get_skills()) == 2 assert len(sync2.get_workflows()) == 2 finally: await sync2.stop() @pytest.mark.asyncio async def test_poll_detects_version_change( self, config_app_with_data: FastAPI, tmp_path: Path ): """Polling detects when the server's config version changes.""" import uvicorn config = uvicorn.Config( config_app_with_data, host="127.0.0.1", port=0, log_level="warning", ) server = uvicorn.Server(config) task = asyncio.create_task(server.serve()) while not server.started: await asyncio.sleep(0.01) port = list(server.servers[0].sockets)[0].getsockname()[1] server_url = f"http://127.0.0.1:{port}" try: sync = ConfigSync( server_url=server_url, cache_db_path=tmp_path / "cache.db", poll_interval=1, # 1 second for fast testing ) await sync.start() sync.start_polling() # Start background polling version_before = sync.get_version() # Change the server's config config_app_with_data.state.skill_registry._skills["new_skill"] = _FakeSkill("new_skill") # Wait for the poll to detect the change await asyncio.sleep(2.5) version_after = sync.get_version() assert version_after != version_before assert len(sync.get_skills()) == 3 # Original 2 + new_skill await sync.stop() finally: server.should_exit = True await task @pytest.mark.asyncio async def test_offline_uses_cache(self, tmp_path: Path): """When the server is unreachable, the client uses cached configs.""" cache_path = tmp_path / "cache.db" # Pre-populate the cache directly cache_path.parent.mkdir(parents=True, exist_ok=True) with sqlite3.connect(str(cache_path)) as conn: conn.executescript( "CREATE TABLE IF NOT EXISTS config_cache " "(key TEXT PRIMARY KEY, value TEXT NOT NULL, updated_at TEXT NOT NULL);" ) now = datetime.now(timezone.utc).isoformat() conn.executemany( "INSERT OR REPLACE INTO config_cache (key, value, updated_at) VALUES (?, ?, ?)", [ ("version", json.dumps("cached-version-hash"), now), ("skills", json.dumps([{"name": "cached_skill", "version": "1.0.0"}]), now), ("workflows", json.dumps([{"workflow_id": "cached-wf", "name": "Cached"}]), now), ("synced_at", json.dumps("2026-01-01T00:00:00Z"), now), ], ) conn.commit() # Create a sync instance pointing to an unreachable server sync = ConfigSync( server_url="http://127.0.0.1:1", # Unreachable cache_db_path=cache_path, ) try: success = await sync.start() assert success is False # Server unreachable # But cache loaded assert sync.get_version() == "cached-version-hash" skills = sync.get_skills() assert len(skills) == 1 assert skills[0]["name"] == "cached_skill" finally: await sync.stop() @pytest.mark.asyncio async def test_offline_no_cache_returns_empty(self, tmp_path: Path): """When the server is unreachable and no cache exists, returns empty.""" sync = ConfigSync( server_url="http://127.0.0.1:1", # Unreachable cache_db_path=tmp_path / "nonexistent.db", ) try: success = await sync.start() assert success is False assert sync.get_version() is None assert sync.get_skills() == [] assert sync.get_workflows() == [] finally: await sync.stop() @pytest.mark.asyncio async def test_token_provider_attaches_jwt(self, sync_server: str, tmp_path: Path): """The token_provider callable is used to attach JWT to requests.""" token_holder = {"token": "test-jwt-token"} sync = ConfigSync( server_url=sync_server, token_provider=lambda: token_holder["token"], cache_db_path=tmp_path / "cache.db", ) try: # Should work even with a fake token (dev mode doesn't check) success = await sync.start() assert success is True finally: await sync.stop() @pytest.mark.asyncio async def test_get_all_returns_combined_dict(self, sync_server: str, tmp_path: Path): sync = ConfigSync( server_url=sync_server, cache_db_path=tmp_path / "cache.db", ) try: await sync.start() all_configs = sync.get_all() assert "version" in all_configs assert "skills" in all_configs assert "workflows" in all_configs assert "synced_at" in all_configs assert len(all_configs["skills"]) == 2 assert len(all_configs["workflows"]) == 2 finally: await sync.stop()