466 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			466 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """
 | |
| Health Check System
 | |
| 
 | |
| Provides comprehensive health monitoring for all system components including
 | |
| database connectivity, WebSocket connections, external services, and application state.
 | |
| 
 | |
| Features:
 | |
| - Deep health checks for all dependencies
 | |
| - Readiness and liveness probes
 | |
| - Graceful degradation strategies
 | |
| - Health status aggregation
 | |
| - Kubernetes-compatible endpoints
 | |
| """
 | |
| 
 | |
| import asyncio
 | |
| import time
 | |
| from datetime import datetime, timedelta
 | |
| from typing import Dict, Any, List, Optional, NamedTuple
 | |
| from enum import Enum
 | |
| 
 | |
| from shared.logger import logger
 | |
| 
 | |
| 
 | |
| class HealthStatus(Enum):
 | |
|     """Health status levels."""
 | |
|     HEALTHY = "healthy"
 | |
|     DEGRADED = "degraded"
 | |
|     UNHEALTHY = "unhealthy"
 | |
|     UNKNOWN = "unknown"
 | |
| 
 | |
| 
 | |
| class HealthCheckResult(NamedTuple):
 | |
|     """Result of a health check."""
 | |
|     component: str
 | |
|     status: HealthStatus
 | |
|     message: str
 | |
|     duration_ms: float
 | |
|     details: Dict[str, Any]
 | |
|     timestamp: datetime
 | |
| 
 | |
| 
 | |
| class HealthCheckComponent:
 | |
|     """Base class for health check components."""
 | |
|     
 | |
|     def __init__(self, name: str, timeout_seconds: float = 5.0):
 | |
|         self.name = name
 | |
|         self.timeout_seconds = timeout_seconds
 | |
|         self.last_check: Optional[HealthCheckResult] = None
 | |
|         self.check_count = 0
 | |
|         self.failure_count = 0
 | |
|     
 | |
|     async def check_health(self) -> HealthCheckResult:
 | |
|         """Perform health check with timeout."""
 | |
|         start_time = time.time()
 | |
|         self.check_count += 1
 | |
|         
 | |
|         try:
 | |
|             # Run the actual health check with timeout
 | |
|             result = await asyncio.wait_for(
 | |
|                 self._perform_check(),
 | |
|                 timeout=self.timeout_seconds
 | |
|             )
 | |
|             
 | |
|             duration_ms = (time.time() - start_time) * 1000
 | |
|             
 | |
|             self.last_check = HealthCheckResult(
 | |
|                 component=self.name,
 | |
|                 status=result.get('status', HealthStatus.UNKNOWN),
 | |
|                 message=result.get('message', ''),
 | |
|                 duration_ms=duration_ms,
 | |
|                 details=result.get('details', {}),
 | |
|                 timestamp=datetime.now()
 | |
|             )
 | |
|             
 | |
|             if self.last_check.status != HealthStatus.HEALTHY:
 | |
|                 self.failure_count += 1
 | |
|             
 | |
|             return self.last_check
 | |
|             
 | |
|         except asyncio.TimeoutError:
 | |
|             self.failure_count += 1
 | |
|             duration_ms = (time.time() - start_time) * 1000
 | |
|             
 | |
|             self.last_check = HealthCheckResult(
 | |
|                 component=self.name,
 | |
|                 status=HealthStatus.UNHEALTHY,
 | |
|                 message=f"Health check timeout after {self.timeout_seconds}s",
 | |
|                 duration_ms=duration_ms,
 | |
|                 details={"error": "timeout"},
 | |
|                 timestamp=datetime.now()
 | |
|             )
 | |
|             return self.last_check
 | |
|             
 | |
|         except Exception as e:
 | |
|             self.failure_count += 1
 | |
|             duration_ms = (time.time() - start_time) * 1000
 | |
|             
 | |
|             self.last_check = HealthCheckResult(
 | |
|                 component=self.name,
 | |
|                 status=HealthStatus.UNHEALTHY,
 | |
|                 message=f"Health check failed: {str(e)}",
 | |
|                 duration_ms=duration_ms,
 | |
|                 details={"error": str(e), "error_type": type(e).__name__},
 | |
|                 timestamp=datetime.now()
 | |
|             )
 | |
|             return self.last_check
 | |
|     
 | |
|     async def _perform_check(self) -> Dict[str, Any]:
 | |
|         """Override this method to implement specific health check logic."""
 | |
|         raise NotImplementedError("Subclasses must implement _perform_check")
 | |
|     
 | |
|     def get_failure_rate(self) -> float:
 | |
|         """Get failure rate as percentage."""
 | |
|         if self.check_count == 0:
 | |
|             return 0.0
 | |
|         return (self.failure_count / self.check_count) * 100
 | |
| 
 | |
| 
 | |
| class DatabaseHealthCheck(HealthCheckComponent):
 | |
|     """Health check for database connectivity."""
 | |
|     
 | |
|     def __init__(self, session_manager, timeout_seconds: float = 3.0):
 | |
|         super().__init__("database", timeout_seconds)
 | |
|         self.session_manager = session_manager
 | |
|     
 | |
|     async def _perform_check(self) -> Dict[str, Any]:
 | |
|         """Check database connectivity and basic operations."""
 | |
|         try:
 | |
|             # Test basic session operations
 | |
|             session_count = len(self.session_manager.sessions)
 | |
|             
 | |
|             # Test session file read/write
 | |
|             test_session_id = "health_check_test"
 | |
|             if test_session_id in self.session_manager.sessions:
 | |
|                 del self.session_manager.sessions[test_session_id]
 | |
|             
 | |
|             return {
 | |
|                 'status': HealthStatus.HEALTHY,
 | |
|                 'message': f"Database operational, {session_count} sessions",
 | |
|                 'details': {
 | |
|                     'session_count': session_count,
 | |
|                     'test_completed': True
 | |
|                 }
 | |
|             }
 | |
|             
 | |
|         except Exception as e:
 | |
|             return {
 | |
|                 'status': HealthStatus.UNHEALTHY,
 | |
|                 'message': f"Database check failed: {str(e)}",
 | |
|                 'details': {'error': str(e)}
 | |
|             }
 | |
| 
 | |
| 
 | |
| class WebSocketHealthCheck(HealthCheckComponent):
 | |
|     """Health check for WebSocket connections."""
 | |
|     
 | |
|     def __init__(self, session_manager, timeout_seconds: float = 2.0):
 | |
|         super().__init__("websocket", timeout_seconds)
 | |
|         self.session_manager = session_manager
 | |
|     
 | |
|     async def _perform_check(self) -> Dict[str, Any]:
 | |
|         """Check WebSocket connection health."""
 | |
|         try:
 | |
|             # Count active WebSocket connections
 | |
|             active_connections = 0
 | |
|             total_sessions = len(self.session_manager.sessions)
 | |
|             
 | |
|             for session in self.session_manager.sessions.values():
 | |
|                 if hasattr(session, 'websocket') and session.websocket:
 | |
|                     active_connections += 1
 | |
|             
 | |
|             # Determine health based on connection ratio
 | |
|             if total_sessions > 0:
 | |
|                 connection_ratio = active_connections / total_sessions
 | |
|                 if connection_ratio > 0.8:
 | |
|                     status = HealthStatus.HEALTHY
 | |
|                     message = f"WebSocket connections healthy ({active_connections}/{total_sessions})"
 | |
|                 elif connection_ratio > 0.5:
 | |
|                     status = HealthStatus.DEGRADED
 | |
|                     message = f"Some WebSocket connections lost ({active_connections}/{total_sessions})"
 | |
|                 else:
 | |
|                     status = HealthStatus.UNHEALTHY
 | |
|                     message = f"Many WebSocket connections lost ({active_connections}/{total_sessions})"
 | |
|             else:
 | |
|                 status = HealthStatus.HEALTHY
 | |
|                 message = "No active sessions"
 | |
|             
 | |
|             return {
 | |
|                 'status': status,
 | |
|                 'message': message,
 | |
|                 'details': {
 | |
|                     'active_connections': active_connections,
 | |
|                     'total_sessions': total_sessions,
 | |
|                     'connection_ratio': active_connections / max(total_sessions, 1)
 | |
|                 }
 | |
|             }
 | |
|             
 | |
|         except Exception as e:
 | |
|             return {
 | |
|                 'status': HealthStatus.UNHEALTHY,
 | |
|                 'message': f"WebSocket check failed: {str(e)}",
 | |
|                 'details': {'error': str(e)}
 | |
|             }
 | |
| 
 | |
| 
 | |
| class LobbyHealthCheck(HealthCheckComponent):
 | |
|     """Health check for lobby management."""
 | |
|     
 | |
|     def __init__(self, lobby_manager, timeout_seconds: float = 2.0):
 | |
|         super().__init__("lobby", timeout_seconds)
 | |
|         self.lobby_manager = lobby_manager
 | |
|     
 | |
|     async def _perform_check(self) -> Dict[str, Any]:
 | |
|         """Check lobby management health."""
 | |
|         try:
 | |
|             lobby_count = len(self.lobby_manager.lobbies)
 | |
|             active_lobbies = sum(1 for lobby in self.lobby_manager.lobbies.values() 
 | |
|                                if len(lobby.sessions) > 0)
 | |
|             
 | |
|             return {
 | |
|                 'status': HealthStatus.HEALTHY,
 | |
|                 'message': f"Lobby system operational, {active_lobbies}/{lobby_count} active",
 | |
|                 'details': {
 | |
|                     'total_lobbies': lobby_count,
 | |
|                     'active_lobbies': active_lobbies,
 | |
|                     'empty_lobbies': lobby_count - active_lobbies
 | |
|                 }
 | |
|             }
 | |
|             
 | |
|         except Exception as e:
 | |
|             return {
 | |
|                 'status': HealthStatus.UNHEALTHY,
 | |
|                 'message': f"Lobby check failed: {str(e)}",
 | |
|                 'details': {'error': str(e)}
 | |
|             }
 | |
| 
 | |
| 
 | |
| class SystemResourceHealthCheck(HealthCheckComponent):
 | |
|     """Health check for system resources."""
 | |
|     
 | |
|     def __init__(self, metrics_collector, timeout_seconds: float = 1.0):
 | |
|         super().__init__("system_resources", timeout_seconds)
 | |
|         self.metrics_collector = metrics_collector
 | |
|     
 | |
|     async def _perform_check(self) -> Dict[str, Any]:
 | |
|         """Check system resource utilization."""
 | |
|         try:
 | |
|             current_metrics = self.metrics_collector.get_current_metrics()
 | |
|             
 | |
|             cpu_usage = current_metrics['gauges'].get('cpu_usage_percent', 0)
 | |
|             memory_usage = current_metrics['gauges'].get('memory_usage_percent', 0)
 | |
|             
 | |
|             # Determine status based on resource usage
 | |
|             if cpu_usage > 90 or memory_usage > 90:
 | |
|                 status = HealthStatus.UNHEALTHY
 | |
|                 message = f"High resource usage: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%"
 | |
|             elif cpu_usage > 70 or memory_usage > 70:
 | |
|                 status = HealthStatus.DEGRADED
 | |
|                 message = f"Moderate resource usage: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%"
 | |
|             else:
 | |
|                 status = HealthStatus.HEALTHY
 | |
|                 message = f"Resource usage normal: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%"
 | |
|             
 | |
|             return {
 | |
|                 'status': status,
 | |
|                 'message': message,
 | |
|                 'details': {
 | |
|                     'cpu_usage_percent': cpu_usage,
 | |
|                     'memory_usage_percent': memory_usage,
 | |
|                     'memory_usage_mb': current_metrics['gauges'].get('memory_usage_mb', 0)
 | |
|                 }
 | |
|             }
 | |
|             
 | |
|         except Exception as e:
 | |
|             return {
 | |
|                 'status': HealthStatus.UNHEALTHY,
 | |
|                 'message': f"System resource check failed: {str(e)}",
 | |
|                 'details': {'error': str(e)}
 | |
|             }
 | |
| 
 | |
| 
 | |
| class HealthMonitor:
 | |
|     """Main health monitoring system."""
 | |
|     
 | |
|     def __init__(self):
 | |
|         self.components: Dict[str, HealthCheckComponent] = {}
 | |
|         self.check_interval_seconds = 30.0
 | |
|         self.last_full_check: Optional[datetime] = None
 | |
|         self._monitoring_task: Optional[asyncio.Task] = None
 | |
|         self._running = False
 | |
|         
 | |
|         # Health history for trends
 | |
|         self.health_history: List[Dict[str, Any]] = []
 | |
|         self.max_history_entries = 100
 | |
|     
 | |
|     def register_component(self, component: HealthCheckComponent):
 | |
|         """Register a health check component."""
 | |
|         self.components[component.name] = component
 | |
|         logger.info(f"Registered health check component: {component.name}")
 | |
|     
 | |
|     async def check_all_components(self) -> Dict[str, HealthCheckResult]:
 | |
|         """Check health of all registered components."""
 | |
|         results = {}
 | |
|         
 | |
|         # Run all health checks in parallel
 | |
|         tasks = {
 | |
|             name: component.check_health()
 | |
|             for name, component in self.components.items()
 | |
|         }
 | |
|         
 | |
|         completed_results = await asyncio.gather(*tasks.values(), return_exceptions=True)
 | |
|         
 | |
|         for name, result in zip(tasks.keys(), completed_results):
 | |
|             if isinstance(result, Exception):
 | |
|                 # Handle exceptions in health checks
 | |
|                 results[name] = HealthCheckResult(
 | |
|                     component=name,
 | |
|                     status=HealthStatus.UNHEALTHY,
 | |
|                     message=f"Health check exception: {str(result)}",
 | |
|                     duration_ms=0.0,
 | |
|                     details={"error": str(result)},
 | |
|                     timestamp=datetime.now()
 | |
|                 )
 | |
|             else:
 | |
|                 results[name] = result
 | |
|         
 | |
|         self.last_full_check = datetime.now()
 | |
|         
 | |
|         # Store in history
 | |
|         self._store_health_history(results)
 | |
|         
 | |
|         return results
 | |
|     
 | |
|     def _store_health_history(self, results: Dict[str, HealthCheckResult]):
 | |
|         """Store health check results in history."""
 | |
|         history_entry = {
 | |
|             'timestamp': datetime.now().isoformat(),
 | |
|             'overall_status': self._calculate_overall_status(results).value,
 | |
|             'components': {
 | |
|                 name: {
 | |
|                     'status': result.status.value,
 | |
|                     'duration_ms': result.duration_ms,
 | |
|                     'message': result.message
 | |
|                 }
 | |
|                 for name, result in results.items()
 | |
|             }
 | |
|         }
 | |
|         
 | |
|         self.health_history.append(history_entry)
 | |
|         
 | |
|         # Keep history size manageable
 | |
|         if len(self.health_history) > self.max_history_entries:
 | |
|             self.health_history = self.health_history[-self.max_history_entries:]
 | |
|     
 | |
|     def _calculate_overall_status(self, results: Dict[str, HealthCheckResult]) -> HealthStatus:
 | |
|         """Calculate overall system health status."""
 | |
|         if not results:
 | |
|             return HealthStatus.UNKNOWN
 | |
|         
 | |
|         statuses = [result.status for result in results.values()]
 | |
|         
 | |
|         if HealthStatus.UNHEALTHY in statuses:
 | |
|             return HealthStatus.UNHEALTHY
 | |
|         elif HealthStatus.DEGRADED in statuses:
 | |
|             return HealthStatus.DEGRADED
 | |
|         elif all(status == HealthStatus.HEALTHY for status in statuses):
 | |
|             return HealthStatus.HEALTHY
 | |
|         else:
 | |
|             return HealthStatus.UNKNOWN
 | |
|     
 | |
|     async def get_health_summary(self) -> Dict[str, Any]:
 | |
|         """Get comprehensive health summary."""
 | |
|         results = await self.check_all_components()
 | |
|         overall_status = self._calculate_overall_status(results)
 | |
|         
 | |
|         return {
 | |
|             'status': overall_status.value,
 | |
|             'timestamp': datetime.now().isoformat(),
 | |
|             'components': {
 | |
|                 name: {
 | |
|                     'status': result.status.value,
 | |
|                     'message': result.message,
 | |
|                     'duration_ms': result.duration_ms,
 | |
|                     'details': result.details,
 | |
|                     'failure_rate': self.components[name].get_failure_rate()
 | |
|                 }
 | |
|                 for name, result in results.items()
 | |
|             },
 | |
|             'last_check': self.last_full_check.isoformat() if self.last_full_check else None,
 | |
|             'check_interval_seconds': self.check_interval_seconds
 | |
|         }
 | |
|     
 | |
|     async def start_monitoring(self):
 | |
|         """Start continuous health monitoring."""
 | |
|         if self._running:
 | |
|             return
 | |
|         
 | |
|         self._running = True
 | |
|         self._monitoring_task = asyncio.create_task(self._monitoring_loop())
 | |
|         logger.info("Health monitoring started")
 | |
|     
 | |
|     async def stop_monitoring(self):
 | |
|         """Stop health monitoring."""
 | |
|         self._running = False
 | |
|         if self._monitoring_task:
 | |
|             self._monitoring_task.cancel()
 | |
|             try:
 | |
|                 await self._monitoring_task
 | |
|             except asyncio.CancelledError:
 | |
|                 pass
 | |
|         logger.info("Health monitoring stopped")
 | |
|     
 | |
|     async def _monitoring_loop(self):
 | |
|         """Main health monitoring loop."""
 | |
|         while self._running:
 | |
|             try:
 | |
|                 await self.check_all_components()
 | |
|                 await asyncio.sleep(self.check_interval_seconds)
 | |
|             except asyncio.CancelledError:
 | |
|                 break
 | |
|             except Exception as e:
 | |
|                 logger.error(f"Error in health monitoring loop: {e}")
 | |
|                 await asyncio.sleep(5.0)  # Back off on error
 | |
|     
 | |
|     def get_readiness_status(self) -> Dict[str, Any]:
 | |
|         """Get readiness probe status (for Kubernetes)."""
 | |
|         if not self.last_full_check:
 | |
|             return {
 | |
|                 'ready': False,
 | |
|                 'reason': 'No health checks completed yet'
 | |
|             }
 | |
|         
 | |
|         # Check if recent health check was successful
 | |
|         time_since_check = datetime.now() - self.last_full_check
 | |
|         if time_since_check > timedelta(minutes=2):
 | |
|             return {
 | |
|                 'ready': False,
 | |
|                 'reason': 'Health checks stale'
 | |
|             }
 | |
|         
 | |
|         # Get latest results
 | |
|         critical_components = ['database', 'websocket']
 | |
|         
 | |
|         for component_name in critical_components:
 | |
|             component = self.components.get(component_name)
 | |
|             if component and component.last_check:
 | |
|                 if component.last_check.status == HealthStatus.UNHEALTHY:
 | |
|                     return {
 | |
|                         'ready': False,
 | |
|                         'reason': f'Critical component {component_name} unhealthy'
 | |
|                     }
 | |
|         
 | |
|         return {'ready': True}
 | |
|     
 | |
|     def get_liveness_status(self) -> Dict[str, Any]:
 | |
|         """Get liveness probe status (for Kubernetes)."""
 | |
|         # Simple liveness check - ensure monitoring is running
 | |
|         return {
 | |
|             'alive': self._running or self.last_full_check is not None,
 | |
|             'last_check': self.last_full_check.isoformat() if self.last_full_check else None
 | |
|         }
 | |
| 
 | |
| 
 | |
| # Global health monitor instance
 | |
| health_monitor = HealthMonitor()
 |