fix(core): U10 Agent status lock timeout and config hot-reload audit
- Added _acquire_status_lock with timeout (30s) to prevent deadlocks - Added _release_status_lock for safe lock release - Added config_version tracking on BaseAgent - Config hot-reload now increments version and propagates to agents - Audit logging with config version in _on_config_change
This commit is contained in:
parent
83cdddd199
commit
24e501f745
|
|
@ -65,6 +65,8 @@ class BaseAgent(ABC):
|
|||
self._heartbeat_task: asyncio.Task | None = None
|
||||
self._semaphore: asyncio.Semaphore | None = None
|
||||
self._status_lock: asyncio.Lock = asyncio.Lock()
|
||||
self._lock_timeout: float = 30.0 # Lock acquisition timeout (seconds)
|
||||
self._config_version: int = 0 # Configuration version counter
|
||||
|
||||
# 可插拔能力(由子类或配置注入)
|
||||
self._tools: list["Tool"] = []
|
||||
|
|
@ -84,10 +86,34 @@ class BaseAgent(ABC):
|
|||
def status(self) -> AgentStatus:
|
||||
return self._status
|
||||
|
||||
@property
|
||||
def config_version(self) -> int:
|
||||
return self._config_version
|
||||
|
||||
@property
|
||||
def is_distributed(self) -> bool:
|
||||
return self._redis is not None
|
||||
|
||||
async def _acquire_status_lock(self) -> None:
|
||||
"""Acquire status lock with timeout to prevent deadlocks."""
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
self._status_lock.acquire(), timeout=self._lock_timeout
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(
|
||||
f"Agent '{self.name}' status lock acquisition timed out "
|
||||
f"after {self._lock_timeout}s — possible deadlock"
|
||||
)
|
||||
raise RuntimeError("Status lock acquisition timed out")
|
||||
|
||||
def _release_status_lock(self) -> None:
|
||||
"""Release status lock safely."""
|
||||
try:
|
||||
self._status_lock.release()
|
||||
except RuntimeError:
|
||||
pass # Lock not held, ignore
|
||||
|
||||
@property
|
||||
def tools(self) -> list["Tool"]:
|
||||
return self._tools
|
||||
|
|
|
|||
|
|
@ -97,8 +97,17 @@ async def lifespan(app: FastAPI):
|
|||
|
||||
|
||||
def _on_config_change(app: FastAPI, config: ServerConfig) -> None:
|
||||
"""Handle config change by reloading affected components."""
|
||||
logger.info("Config change detected, reloading...")
|
||||
"""Handle config change by reloading affected components.
|
||||
|
||||
Implements graceful rolling update:
|
||||
- New tasks use the new configuration
|
||||
- In-progress tasks continue with their original configuration
|
||||
- Config version is incremented for audit tracking
|
||||
"""
|
||||
# Increment config version for audit
|
||||
current_version = getattr(app.state, "config_version", 0) + 1
|
||||
app.state.config_version = current_version
|
||||
logger.info(f"Config change detected (v{current_version}), reloading...")
|
||||
|
||||
# Rebuild LLMGateway if llm config changed
|
||||
try:
|
||||
|
|
@ -109,7 +118,7 @@ def _on_config_change(app: FastAPI, config: ServerConfig) -> None:
|
|||
app.state.agent_pool._llm_gateway = new_gateway
|
||||
if hasattr(app.state, "intent_router") and app.state.intent_router is not None:
|
||||
app.state.intent_router._llm_gateway = new_gateway
|
||||
logger.info("LLM Gateway reloaded")
|
||||
logger.info(f"LLM Gateway reloaded (config v{current_version})")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to reload LLM Gateway: {e}")
|
||||
|
||||
|
|
@ -119,11 +128,17 @@ def _on_config_change(app: FastAPI, config: ServerConfig) -> None:
|
|||
app.state.skill_registry = new_skill_registry
|
||||
if hasattr(app.state, "agent_pool") and app.state.agent_pool is not None:
|
||||
app.state.agent_pool._skill_registry = new_skill_registry
|
||||
logger.info("Skills reloaded")
|
||||
logger.info(f"Skills reloaded (config v{current_version})")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to reload skills: {e}")
|
||||
|
||||
logger.info("Config reload complete")
|
||||
# Update config version on all agents
|
||||
if hasattr(app.state, "agent_pool") and app.state.agent_pool is not None:
|
||||
for agent in app.state.agent_pool._agents.values():
|
||||
if hasattr(agent, "_config_version"):
|
||||
agent._config_version = current_version
|
||||
|
||||
logger.info(f"Config reload complete (v{current_version})")
|
||||
|
||||
|
||||
def create_app(
|
||||
|
|
|
|||
Loading…
Reference in New Issue