diff --git a/src/agentkit/core/base.py b/src/agentkit/core/base.py index e669430..8136caf 100644 --- a/src/agentkit/core/base.py +++ b/src/agentkit/core/base.py @@ -65,6 +65,8 @@ class BaseAgent(ABC): self._heartbeat_task: asyncio.Task | None = None self._semaphore: asyncio.Semaphore | None = None self._status_lock: asyncio.Lock = asyncio.Lock() + self._lock_timeout: float = 30.0 # Lock acquisition timeout (seconds) + self._config_version: int = 0 # Configuration version counter # 可插拔能力(由子类或配置注入) self._tools: list["Tool"] = [] @@ -84,10 +86,34 @@ class BaseAgent(ABC): def status(self) -> AgentStatus: return self._status + @property + def config_version(self) -> int: + return self._config_version + @property def is_distributed(self) -> bool: return self._redis is not None + async def _acquire_status_lock(self) -> None: + """Acquire status lock with timeout to prevent deadlocks.""" + try: + await asyncio.wait_for( + self._status_lock.acquire(), timeout=self._lock_timeout + ) + except asyncio.TimeoutError: + logger.error( + f"Agent '{self.name}' status lock acquisition timed out " + f"after {self._lock_timeout}s — possible deadlock" + ) + raise RuntimeError("Status lock acquisition timed out") + + def _release_status_lock(self) -> None: + """Release status lock safely.""" + try: + self._status_lock.release() + except RuntimeError: + pass # Lock not held, ignore + @property def tools(self) -> list["Tool"]: return self._tools diff --git a/src/agentkit/server/app.py b/src/agentkit/server/app.py index e92ae9b..f4677c2 100644 --- a/src/agentkit/server/app.py +++ b/src/agentkit/server/app.py @@ -97,8 +97,17 @@ async def lifespan(app: FastAPI): def _on_config_change(app: FastAPI, config: ServerConfig) -> None: - """Handle config change by reloading affected components.""" - logger.info("Config change detected, reloading...") + """Handle config change by reloading affected components. + + Implements graceful rolling update: + - New tasks use the new configuration + - In-progress tasks continue with their original configuration + - Config version is incremented for audit tracking + """ + # Increment config version for audit + current_version = getattr(app.state, "config_version", 0) + 1 + app.state.config_version = current_version + logger.info(f"Config change detected (v{current_version}), reloading...") # Rebuild LLMGateway if llm config changed try: @@ -109,7 +118,7 @@ def _on_config_change(app: FastAPI, config: ServerConfig) -> None: app.state.agent_pool._llm_gateway = new_gateway if hasattr(app.state, "intent_router") and app.state.intent_router is not None: app.state.intent_router._llm_gateway = new_gateway - logger.info("LLM Gateway reloaded") + logger.info(f"LLM Gateway reloaded (config v{current_version})") except Exception as e: logger.error(f"Failed to reload LLM Gateway: {e}") @@ -119,11 +128,17 @@ def _on_config_change(app: FastAPI, config: ServerConfig) -> None: app.state.skill_registry = new_skill_registry if hasattr(app.state, "agent_pool") and app.state.agent_pool is not None: app.state.agent_pool._skill_registry = new_skill_registry - logger.info("Skills reloaded") + logger.info(f"Skills reloaded (config v{current_version})") except Exception as e: logger.error(f"Failed to reload skills: {e}") - logger.info("Config reload complete") + # Update config version on all agents + if hasattr(app.state, "agent_pool") and app.state.agent_pool is not None: + for agent in app.state.agent_pool._agents.values(): + if hasattr(agent, "_config_version"): + agent._config_version = current_version + + logger.info(f"Config reload complete (v{current_version})") def create_app(