""" Health Check System Provides comprehensive health monitoring for all system components including database connectivity, WebSocket connections, external services, and application state. Features: - Deep health checks for all dependencies - Readiness and liveness probes - Graceful degradation strategies - Health status aggregation - Kubernetes-compatible endpoints """ import asyncio import time from datetime import datetime, timedelta from typing import Dict, Any, List, Optional, NamedTuple from enum import Enum from logger import logger class HealthStatus(Enum): """Health status levels.""" HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy" UNKNOWN = "unknown" class HealthCheckResult(NamedTuple): """Result of a health check.""" component: str status: HealthStatus message: str duration_ms: float details: Dict[str, Any] timestamp: datetime class HealthCheckComponent: """Base class for health check components.""" def __init__(self, name: str, timeout_seconds: float = 5.0): self.name = name self.timeout_seconds = timeout_seconds self.last_check: Optional[HealthCheckResult] = None self.check_count = 0 self.failure_count = 0 async def check_health(self) -> HealthCheckResult: """Perform health check with timeout.""" start_time = time.time() self.check_count += 1 try: # Run the actual health check with timeout result = await asyncio.wait_for( self._perform_check(), timeout=self.timeout_seconds ) duration_ms = (time.time() - start_time) * 1000 self.last_check = HealthCheckResult( component=self.name, status=result.get('status', HealthStatus.UNKNOWN), message=result.get('message', ''), duration_ms=duration_ms, details=result.get('details', {}), timestamp=datetime.now() ) if self.last_check.status != HealthStatus.HEALTHY: self.failure_count += 1 return self.last_check except asyncio.TimeoutError: self.failure_count += 1 duration_ms = (time.time() - start_time) * 1000 self.last_check = HealthCheckResult( component=self.name, status=HealthStatus.UNHEALTHY, message=f"Health check timeout after {self.timeout_seconds}s", duration_ms=duration_ms, details={"error": "timeout"}, timestamp=datetime.now() ) return self.last_check except Exception as e: self.failure_count += 1 duration_ms = (time.time() - start_time) * 1000 self.last_check = HealthCheckResult( component=self.name, status=HealthStatus.UNHEALTHY, message=f"Health check failed: {str(e)}", duration_ms=duration_ms, details={"error": str(e), "error_type": type(e).__name__}, timestamp=datetime.now() ) return self.last_check async def _perform_check(self) -> Dict[str, Any]: """Override this method to implement specific health check logic.""" raise NotImplementedError("Subclasses must implement _perform_check") def get_failure_rate(self) -> float: """Get failure rate as percentage.""" if self.check_count == 0: return 0.0 return (self.failure_count / self.check_count) * 100 class DatabaseHealthCheck(HealthCheckComponent): """Health check for database connectivity.""" def __init__(self, session_manager, timeout_seconds: float = 3.0): super().__init__("database", timeout_seconds) self.session_manager = session_manager async def _perform_check(self) -> Dict[str, Any]: """Check database connectivity and basic operations.""" try: # Test basic session operations session_count = len(self.session_manager.sessions) # Test session file read/write test_session_id = "health_check_test" if test_session_id in self.session_manager.sessions: del self.session_manager.sessions[test_session_id] return { 'status': HealthStatus.HEALTHY, 'message': f"Database operational, {session_count} sessions", 'details': { 'session_count': session_count, 'test_completed': True } } except Exception as e: return { 'status': HealthStatus.UNHEALTHY, 'message': f"Database check failed: {str(e)}", 'details': {'error': str(e)} } class WebSocketHealthCheck(HealthCheckComponent): """Health check for WebSocket connections.""" def __init__(self, session_manager, timeout_seconds: float = 2.0): super().__init__("websocket", timeout_seconds) self.session_manager = session_manager async def _perform_check(self) -> Dict[str, Any]: """Check WebSocket connection health.""" try: # Count active WebSocket connections active_connections = 0 total_sessions = len(self.session_manager.sessions) for session in self.session_manager.sessions.values(): if hasattr(session, 'websocket') and session.websocket: active_connections += 1 # Determine health based on connection ratio if total_sessions > 0: connection_ratio = active_connections / total_sessions if connection_ratio > 0.8: status = HealthStatus.HEALTHY message = f"WebSocket connections healthy ({active_connections}/{total_sessions})" elif connection_ratio > 0.5: status = HealthStatus.DEGRADED message = f"Some WebSocket connections lost ({active_connections}/{total_sessions})" else: status = HealthStatus.UNHEALTHY message = f"Many WebSocket connections lost ({active_connections}/{total_sessions})" else: status = HealthStatus.HEALTHY message = "No active sessions" return { 'status': status, 'message': message, 'details': { 'active_connections': active_connections, 'total_sessions': total_sessions, 'connection_ratio': active_connections / max(total_sessions, 1) } } except Exception as e: return { 'status': HealthStatus.UNHEALTHY, 'message': f"WebSocket check failed: {str(e)}", 'details': {'error': str(e)} } class LobbyHealthCheck(HealthCheckComponent): """Health check for lobby management.""" def __init__(self, lobby_manager, timeout_seconds: float = 2.0): super().__init__("lobby", timeout_seconds) self.lobby_manager = lobby_manager async def _perform_check(self) -> Dict[str, Any]: """Check lobby management health.""" try: lobby_count = len(self.lobby_manager.lobbies) active_lobbies = sum(1 for lobby in self.lobby_manager.lobbies.values() if len(lobby.sessions) > 0) return { 'status': HealthStatus.HEALTHY, 'message': f"Lobby system operational, {active_lobbies}/{lobby_count} active", 'details': { 'total_lobbies': lobby_count, 'active_lobbies': active_lobbies, 'empty_lobbies': lobby_count - active_lobbies } } except Exception as e: return { 'status': HealthStatus.UNHEALTHY, 'message': f"Lobby check failed: {str(e)}", 'details': {'error': str(e)} } class SystemResourceHealthCheck(HealthCheckComponent): """Health check for system resources.""" def __init__(self, metrics_collector, timeout_seconds: float = 1.0): super().__init__("system_resources", timeout_seconds) self.metrics_collector = metrics_collector async def _perform_check(self) -> Dict[str, Any]: """Check system resource utilization.""" try: current_metrics = self.metrics_collector.get_current_metrics() cpu_usage = current_metrics['gauges'].get('cpu_usage_percent', 0) memory_usage = current_metrics['gauges'].get('memory_usage_percent', 0) # Determine status based on resource usage if cpu_usage > 90 or memory_usage > 90: status = HealthStatus.UNHEALTHY message = f"High resource usage: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%" elif cpu_usage > 70 or memory_usage > 70: status = HealthStatus.DEGRADED message = f"Moderate resource usage: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%" else: status = HealthStatus.HEALTHY message = f"Resource usage normal: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%" return { 'status': status, 'message': message, 'details': { 'cpu_usage_percent': cpu_usage, 'memory_usage_percent': memory_usage, 'memory_usage_mb': current_metrics['gauges'].get('memory_usage_mb', 0) } } except Exception as e: return { 'status': HealthStatus.UNHEALTHY, 'message': f"System resource check failed: {str(e)}", 'details': {'error': str(e)} } class HealthMonitor: """Main health monitoring system.""" def __init__(self): self.components: Dict[str, HealthCheckComponent] = {} self.check_interval_seconds = 30.0 self.last_full_check: Optional[datetime] = None self._monitoring_task: Optional[asyncio.Task] = None self._running = False # Health history for trends self.health_history: List[Dict[str, Any]] = [] self.max_history_entries = 100 def register_component(self, component: HealthCheckComponent): """Register a health check component.""" self.components[component.name] = component logger.info(f"Registered health check component: {component.name}") async def check_all_components(self) -> Dict[str, HealthCheckResult]: """Check health of all registered components.""" results = {} # Run all health checks in parallel tasks = { name: component.check_health() for name, component in self.components.items() } completed_results = await asyncio.gather(*tasks.values(), return_exceptions=True) for name, result in zip(tasks.keys(), completed_results): if isinstance(result, Exception): # Handle exceptions in health checks results[name] = HealthCheckResult( component=name, status=HealthStatus.UNHEALTHY, message=f"Health check exception: {str(result)}", duration_ms=0.0, details={"error": str(result)}, timestamp=datetime.now() ) else: results[name] = result self.last_full_check = datetime.now() # Store in history self._store_health_history(results) return results def _store_health_history(self, results: Dict[str, HealthCheckResult]): """Store health check results in history.""" history_entry = { 'timestamp': datetime.now().isoformat(), 'overall_status': self._calculate_overall_status(results).value, 'components': { name: { 'status': result.status.value, 'duration_ms': result.duration_ms, 'message': result.message } for name, result in results.items() } } self.health_history.append(history_entry) # Keep history size manageable if len(self.health_history) > self.max_history_entries: self.health_history = self.health_history[-self.max_history_entries:] def _calculate_overall_status(self, results: Dict[str, HealthCheckResult]) -> HealthStatus: """Calculate overall system health status.""" if not results: return HealthStatus.UNKNOWN statuses = [result.status for result in results.values()] if HealthStatus.UNHEALTHY in statuses: return HealthStatus.UNHEALTHY elif HealthStatus.DEGRADED in statuses: return HealthStatus.DEGRADED elif all(status == HealthStatus.HEALTHY for status in statuses): return HealthStatus.HEALTHY else: return HealthStatus.UNKNOWN async def get_health_summary(self) -> Dict[str, Any]: """Get comprehensive health summary.""" results = await self.check_all_components() overall_status = self._calculate_overall_status(results) return { 'status': overall_status.value, 'timestamp': datetime.now().isoformat(), 'components': { name: { 'status': result.status.value, 'message': result.message, 'duration_ms': result.duration_ms, 'details': result.details, 'failure_rate': self.components[name].get_failure_rate() } for name, result in results.items() }, 'last_check': self.last_full_check.isoformat() if self.last_full_check else None, 'check_interval_seconds': self.check_interval_seconds } async def start_monitoring(self): """Start continuous health monitoring.""" if self._running: return self._running = True self._monitoring_task = asyncio.create_task(self._monitoring_loop()) logger.info("Health monitoring started") async def stop_monitoring(self): """Stop health monitoring.""" self._running = False if self._monitoring_task: self._monitoring_task.cancel() try: await self._monitoring_task except asyncio.CancelledError: pass logger.info("Health monitoring stopped") async def _monitoring_loop(self): """Main health monitoring loop.""" while self._running: try: await self.check_all_components() await asyncio.sleep(self.check_interval_seconds) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in health monitoring loop: {e}") await asyncio.sleep(5.0) # Back off on error def get_readiness_status(self) -> Dict[str, Any]: """Get readiness probe status (for Kubernetes).""" if not self.last_full_check: return { 'ready': False, 'reason': 'No health checks completed yet' } # Check if recent health check was successful time_since_check = datetime.now() - self.last_full_check if time_since_check > timedelta(minutes=2): return { 'ready': False, 'reason': 'Health checks stale' } # Get latest results critical_components = ['database', 'websocket'] for component_name in critical_components: component = self.components.get(component_name) if component and component.last_check: if component.last_check.status == HealthStatus.UNHEALTHY: return { 'ready': False, 'reason': f'Critical component {component_name} unhealthy' } return {'ready': True} def get_liveness_status(self) -> Dict[str, Any]: """Get liveness probe status (for Kubernetes).""" # Simple liveness check - ensure monitoring is running return { 'alive': self._running or self.last_full_check is not None, 'last_check': self.last_full_check.isoformat() if self.last_full_check else None } # Global health monitor instance health_monitor = HealthMonitor()