ai-voicebot/server/core/health.py

467 lines
17 KiB
Python

"""
Health Check System
Provides comprehensive health monitoring for all system components including
database connectivity, WebSocket connections, external services, and application state.
Features:
- Deep health checks for all dependencies
- Readiness and liveness probes
- Graceful degradation strategies
- Health status aggregation
- Kubernetes-compatible endpoints
"""
import asyncio
import time
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Callable, NamedTuple
from enum import Enum
import json
from logger import logger
class HealthStatus(Enum):
"""Health status levels."""
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
UNKNOWN = "unknown"
class HealthCheckResult(NamedTuple):
"""Result of a health check."""
component: str
status: HealthStatus
message: str
duration_ms: float
details: Dict[str, Any]
timestamp: datetime
class HealthCheckComponent:
"""Base class for health check components."""
def __init__(self, name: str, timeout_seconds: float = 5.0):
self.name = name
self.timeout_seconds = timeout_seconds
self.last_check: Optional[HealthCheckResult] = None
self.check_count = 0
self.failure_count = 0
async def check_health(self) -> HealthCheckResult:
"""Perform health check with timeout."""
start_time = time.time()
self.check_count += 1
try:
# Run the actual health check with timeout
result = await asyncio.wait_for(
self._perform_check(),
timeout=self.timeout_seconds
)
duration_ms = (time.time() - start_time) * 1000
self.last_check = HealthCheckResult(
component=self.name,
status=result.get('status', HealthStatus.UNKNOWN),
message=result.get('message', ''),
duration_ms=duration_ms,
details=result.get('details', {}),
timestamp=datetime.now()
)
if self.last_check.status != HealthStatus.HEALTHY:
self.failure_count += 1
return self.last_check
except asyncio.TimeoutError:
self.failure_count += 1
duration_ms = (time.time() - start_time) * 1000
self.last_check = HealthCheckResult(
component=self.name,
status=HealthStatus.UNHEALTHY,
message=f"Health check timeout after {self.timeout_seconds}s",
duration_ms=duration_ms,
details={"error": "timeout"},
timestamp=datetime.now()
)
return self.last_check
except Exception as e:
self.failure_count += 1
duration_ms = (time.time() - start_time) * 1000
self.last_check = HealthCheckResult(
component=self.name,
status=HealthStatus.UNHEALTHY,
message=f"Health check failed: {str(e)}",
duration_ms=duration_ms,
details={"error": str(e), "error_type": type(e).__name__},
timestamp=datetime.now()
)
return self.last_check
async def _perform_check(self) -> Dict[str, Any]:
"""Override this method to implement specific health check logic."""
raise NotImplementedError("Subclasses must implement _perform_check")
def get_failure_rate(self) -> float:
"""Get failure rate as percentage."""
if self.check_count == 0:
return 0.0
return (self.failure_count / self.check_count) * 100
class DatabaseHealthCheck(HealthCheckComponent):
"""Health check for database connectivity."""
def __init__(self, session_manager, timeout_seconds: float = 3.0):
super().__init__("database", timeout_seconds)
self.session_manager = session_manager
async def _perform_check(self) -> Dict[str, Any]:
"""Check database connectivity and basic operations."""
try:
# Test basic session operations
session_count = len(self.session_manager.sessions)
# Test session file read/write
test_session_id = "health_check_test"
if test_session_id in self.session_manager.sessions:
del self.session_manager.sessions[test_session_id]
return {
'status': HealthStatus.HEALTHY,
'message': f"Database operational, {session_count} sessions",
'details': {
'session_count': session_count,
'test_completed': True
}
}
except Exception as e:
return {
'status': HealthStatus.UNHEALTHY,
'message': f"Database check failed: {str(e)}",
'details': {'error': str(e)}
}
class WebSocketHealthCheck(HealthCheckComponent):
"""Health check for WebSocket connections."""
def __init__(self, session_manager, timeout_seconds: float = 2.0):
super().__init__("websocket", timeout_seconds)
self.session_manager = session_manager
async def _perform_check(self) -> Dict[str, Any]:
"""Check WebSocket connection health."""
try:
# Count active WebSocket connections
active_connections = 0
total_sessions = len(self.session_manager.sessions)
for session in self.session_manager.sessions.values():
if hasattr(session, 'websocket') and session.websocket:
active_connections += 1
# Determine health based on connection ratio
if total_sessions > 0:
connection_ratio = active_connections / total_sessions
if connection_ratio > 0.8:
status = HealthStatus.HEALTHY
message = f"WebSocket connections healthy ({active_connections}/{total_sessions})"
elif connection_ratio > 0.5:
status = HealthStatus.DEGRADED
message = f"Some WebSocket connections lost ({active_connections}/{total_sessions})"
else:
status = HealthStatus.UNHEALTHY
message = f"Many WebSocket connections lost ({active_connections}/{total_sessions})"
else:
status = HealthStatus.HEALTHY
message = "No active sessions"
return {
'status': status,
'message': message,
'details': {
'active_connections': active_connections,
'total_sessions': total_sessions,
'connection_ratio': active_connections / max(total_sessions, 1)
}
}
except Exception as e:
return {
'status': HealthStatus.UNHEALTHY,
'message': f"WebSocket check failed: {str(e)}",
'details': {'error': str(e)}
}
class LobbyHealthCheck(HealthCheckComponent):
"""Health check for lobby management."""
def __init__(self, lobby_manager, timeout_seconds: float = 2.0):
super().__init__("lobby", timeout_seconds)
self.lobby_manager = lobby_manager
async def _perform_check(self) -> Dict[str, Any]:
"""Check lobby management health."""
try:
lobby_count = len(self.lobby_manager.lobbies)
active_lobbies = sum(1 for lobby in self.lobby_manager.lobbies.values()
if len(lobby.sessions) > 0)
return {
'status': HealthStatus.HEALTHY,
'message': f"Lobby system operational, {active_lobbies}/{lobby_count} active",
'details': {
'total_lobbies': lobby_count,
'active_lobbies': active_lobbies,
'empty_lobbies': lobby_count - active_lobbies
}
}
except Exception as e:
return {
'status': HealthStatus.UNHEALTHY,
'message': f"Lobby check failed: {str(e)}",
'details': {'error': str(e)}
}
class SystemResourceHealthCheck(HealthCheckComponent):
"""Health check for system resources."""
def __init__(self, metrics_collector, timeout_seconds: float = 1.0):
super().__init__("system_resources", timeout_seconds)
self.metrics_collector = metrics_collector
async def _perform_check(self) -> Dict[str, Any]:
"""Check system resource utilization."""
try:
current_metrics = self.metrics_collector.get_current_metrics()
cpu_usage = current_metrics['gauges'].get('cpu_usage_percent', 0)
memory_usage = current_metrics['gauges'].get('memory_usage_percent', 0)
# Determine status based on resource usage
if cpu_usage > 90 or memory_usage > 90:
status = HealthStatus.UNHEALTHY
message = f"High resource usage: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%"
elif cpu_usage > 70 or memory_usage > 70:
status = HealthStatus.DEGRADED
message = f"Moderate resource usage: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%"
else:
status = HealthStatus.HEALTHY
message = f"Resource usage normal: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%"
return {
'status': status,
'message': message,
'details': {
'cpu_usage_percent': cpu_usage,
'memory_usage_percent': memory_usage,
'memory_usage_mb': current_metrics['gauges'].get('memory_usage_mb', 0)
}
}
except Exception as e:
return {
'status': HealthStatus.UNHEALTHY,
'message': f"System resource check failed: {str(e)}",
'details': {'error': str(e)}
}
class HealthMonitor:
"""Main health monitoring system."""
def __init__(self):
self.components: Dict[str, HealthCheckComponent] = {}
self.check_interval_seconds = 30.0
self.last_full_check: Optional[datetime] = None
self._monitoring_task: Optional[asyncio.Task] = None
self._running = False
# Health history for trends
self.health_history: List[Dict[str, Any]] = []
self.max_history_entries = 100
def register_component(self, component: HealthCheckComponent):
"""Register a health check component."""
self.components[component.name] = component
logger.info(f"Registered health check component: {component.name}")
async def check_all_components(self) -> Dict[str, HealthCheckResult]:
"""Check health of all registered components."""
results = {}
# Run all health checks in parallel
tasks = {
name: component.check_health()
for name, component in self.components.items()
}
completed_results = await asyncio.gather(*tasks.values(), return_exceptions=True)
for name, result in zip(tasks.keys(), completed_results):
if isinstance(result, Exception):
# Handle exceptions in health checks
results[name] = HealthCheckResult(
component=name,
status=HealthStatus.UNHEALTHY,
message=f"Health check exception: {str(result)}",
duration_ms=0.0,
details={"error": str(result)},
timestamp=datetime.now()
)
else:
results[name] = result
self.last_full_check = datetime.now()
# Store in history
self._store_health_history(results)
return results
def _store_health_history(self, results: Dict[str, HealthCheckResult]):
"""Store health check results in history."""
history_entry = {
'timestamp': datetime.now().isoformat(),
'overall_status': self._calculate_overall_status(results).value,
'components': {
name: {
'status': result.status.value,
'duration_ms': result.duration_ms,
'message': result.message
}
for name, result in results.items()
}
}
self.health_history.append(history_entry)
# Keep history size manageable
if len(self.health_history) > self.max_history_entries:
self.health_history = self.health_history[-self.max_history_entries:]
def _calculate_overall_status(self, results: Dict[str, HealthCheckResult]) -> HealthStatus:
"""Calculate overall system health status."""
if not results:
return HealthStatus.UNKNOWN
statuses = [result.status for result in results.values()]
if HealthStatus.UNHEALTHY in statuses:
return HealthStatus.UNHEALTHY
elif HealthStatus.DEGRADED in statuses:
return HealthStatus.DEGRADED
elif all(status == HealthStatus.HEALTHY for status in statuses):
return HealthStatus.HEALTHY
else:
return HealthStatus.UNKNOWN
async def get_health_summary(self) -> Dict[str, Any]:
"""Get comprehensive health summary."""
results = await self.check_all_components()
overall_status = self._calculate_overall_status(results)
return {
'status': overall_status.value,
'timestamp': datetime.now().isoformat(),
'components': {
name: {
'status': result.status.value,
'message': result.message,
'duration_ms': result.duration_ms,
'details': result.details,
'failure_rate': self.components[name].get_failure_rate()
}
for name, result in results.items()
},
'last_check': self.last_full_check.isoformat() if self.last_full_check else None,
'check_interval_seconds': self.check_interval_seconds
}
async def start_monitoring(self):
"""Start continuous health monitoring."""
if self._running:
return
self._running = True
self._monitoring_task = asyncio.create_task(self._monitoring_loop())
logger.info("Health monitoring started")
async def stop_monitoring(self):
"""Stop health monitoring."""
self._running = False
if self._monitoring_task:
self._monitoring_task.cancel()
try:
await self._monitoring_task
except asyncio.CancelledError:
pass
logger.info("Health monitoring stopped")
async def _monitoring_loop(self):
"""Main health monitoring loop."""
while self._running:
try:
await self.check_all_components()
await asyncio.sleep(self.check_interval_seconds)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in health monitoring loop: {e}")
await asyncio.sleep(5.0) # Back off on error
def get_readiness_status(self) -> Dict[str, Any]:
"""Get readiness probe status (for Kubernetes)."""
if not self.last_full_check:
return {
'ready': False,
'reason': 'No health checks completed yet'
}
# Check if recent health check was successful
time_since_check = datetime.now() - self.last_full_check
if time_since_check > timedelta(minutes=2):
return {
'ready': False,
'reason': 'Health checks stale'
}
# Get latest results
critical_components = ['database', 'websocket']
for component_name in critical_components:
component = self.components.get(component_name)
if component and component.last_check:
if component.last_check.status == HealthStatus.UNHEALTHY:
return {
'ready': False,
'reason': f'Critical component {component_name} unhealthy'
}
return {'ready': True}
def get_liveness_status(self) -> Dict[str, Any]:
"""Get liveness probe status (for Kubernetes)."""
# Simple liveness check - ensure monitoring is running
return {
'alive': self._running or self.last_full_check is not None,
'last_check': self.last_full_check.isoformat() if self.last_full_check else None
}
# Global health monitor instance
health_monitor = HealthMonitor()