diff --git a/server/api/monitoring.py b/server/api/monitoring.py new file mode 100644 index 0000000..5920ed1 --- /dev/null +++ b/server/api/monitoring.py @@ -0,0 +1,432 @@ +""" +Performance and Health Monitoring API Endpoints + +Provides REST API endpoints for monitoring system performance, health status, +cache statistics, and operational metrics. + +Endpoints: +- /api/health - Health check summary +- /api/health/ready - Readiness probe (Kubernetes) +- /api/health/live - Liveness probe (Kubernetes) +- /api/metrics - Performance metrics +- /api/metrics/history - Historical metrics +- /api/cache/stats - Cache statistics +- /api/system/info - System information +""" + +from typing import Dict, Any, Optional +from fastapi import APIRouter, HTTPException, Query +from datetime import datetime, timedelta + +from logger import logger + +# Import monitoring components +try: + from core.performance import metrics_collector + from core.health import health_monitor + from core.cache import cache_manager +except ImportError as e: + logger.warning(f"Some monitoring components not available: {e}") + metrics_collector = None + health_monitor = None + cache_manager = None + + +router = APIRouter(prefix="/api", tags=["monitoring"]) + + +@router.get("/health") +async def get_health_summary(): + """ + Get comprehensive health summary. + + Returns: + Dict containing overall health status and component details + """ + try: + if not health_monitor: + raise HTTPException(status_code=503, detail="Health monitoring not available") + + health_summary = await health_monitor.get_health_summary() + return { + "status": "success", + "data": health_summary, + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"Error getting health summary: {e}") + raise HTTPException(status_code=500, detail=f"Health check failed: {str(e)}") + + +@router.get("/health/ready") +async def readiness_probe(): + """ + Kubernetes readiness probe endpoint. + + Returns: + Ready status for load balancer inclusion + """ + try: + if not health_monitor: + return {"ready": False, "reason": "Health monitoring not available"} + + readiness = health_monitor.get_readiness_status() + + if readiness["ready"]: + return { + "status": "ready", + "timestamp": datetime.now().isoformat(), + **readiness + } + else: + raise HTTPException( + status_code=503, + detail={ + "status": "not_ready", + "timestamp": datetime.now().isoformat(), + **readiness + } + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error in readiness probe: {e}") + raise HTTPException(status_code=500, detail=f"Readiness check failed: {str(e)}") + + +@router.get("/health/live") +async def liveness_probe(): + """ + Kubernetes liveness probe endpoint. + + Returns: + Alive status for container restart decisions + """ + try: + if not health_monitor: + return {"alive": True, "reason": "Basic liveness check"} + + liveness = health_monitor.get_liveness_status() + + if liveness["alive"]: + return { + "status": "alive", + "timestamp": datetime.now().isoformat(), + **liveness + } + else: + raise HTTPException( + status_code=503, + detail={ + "status": "not_alive", + "timestamp": datetime.now().isoformat(), + **liveness + } + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error in liveness probe: {e}") + raise HTTPException(status_code=500, detail=f"Liveness check failed: {str(e)}") + + +@router.get("/metrics") +async def get_current_metrics(): + """ + Get current performance metrics. + + Returns: + Current system and application metrics + """ + try: + if not metrics_collector: + raise HTTPException(status_code=503, detail="Metrics collection not available") + + current_metrics = metrics_collector.get_current_metrics() + performance_summary = metrics_collector.get_performance_summary() + + return { + "status": "success", + "data": { + "current": current_metrics, + "summary": performance_summary + }, + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"Error getting metrics: {e}") + raise HTTPException(status_code=500, detail=f"Metrics collection failed: {str(e)}") + + +@router.get("/metrics/history") +async def get_metrics_history( + metric_name: str = Query(..., description="Name of the metric to retrieve"), + minutes: int = Query(default=5, ge=1, le=60, description="Minutes of history to retrieve") +): + """ + Get historical data for a specific metric. + + Args: + metric_name: Name of the metric + minutes: Number of minutes of history to retrieve (1-60) + + Returns: + Historical metric data points + """ + try: + if not metrics_collector: + raise HTTPException(status_code=503, detail="Metrics collection not available") + + history = metrics_collector.get_metric_history(metric_name, minutes) + + return { + "status": "success", + "data": { + "metric_name": metric_name, + "minutes": minutes, + "data_points": len(history), + "history": history + }, + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"Error getting metric history: {e}") + raise HTTPException(status_code=500, detail=f"Metric history failed: {str(e)}") + + +@router.get("/cache/stats") +async def get_cache_statistics(): + """ + Get cache performance statistics. + + Returns: + Cache hit rates, sizes, and performance metrics + """ + try: + if not cache_manager: + raise HTTPException(status_code=503, detail="Cache management not available") + + cache_stats = cache_manager.get_all_stats() + + # Calculate aggregate statistics + total_hits = sum(stats['hits'] for stats in cache_stats.values()) + total_misses = sum(stats['misses'] for stats in cache_stats.values()) + total_requests = total_hits + total_misses + overall_hit_rate = (total_hits / total_requests * 100) if total_requests > 0 else 0 + + return { + "status": "success", + "data": { + "overall": { + "total_hits": total_hits, + "total_misses": total_misses, + "overall_hit_rate_percent": overall_hit_rate, + "total_requests": total_requests + }, + "by_cache": cache_stats + }, + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"Error getting cache stats: {e}") + raise HTTPException(status_code=500, detail=f"Cache statistics failed: {str(e)}") + + +@router.get("/system/info") +async def get_system_info(): + """ + Get system information and configuration. + + Returns: + System details, configuration, and runtime information + """ + try: + import sys + import platform + import os + + # Get current metrics if available + current_metrics = {} + if metrics_collector: + current_metrics = metrics_collector.get_current_metrics() + + system_info = { + "python": { + "version": sys.version, + "platform": platform.platform(), + "architecture": platform.architecture()[0] + }, + "runtime": { + "uptime_seconds": current_metrics.get('gauges', {}).get('uptime_seconds', 0), + "process_id": os.getpid(), + "working_directory": os.getcwd() + }, + "performance": { + "cpu_usage_percent": current_metrics.get('gauges', {}).get('cpu_usage_percent', 0), + "memory_usage_mb": current_metrics.get('gauges', {}).get('memory_usage_mb', 0), + "memory_usage_percent": current_metrics.get('gauges', {}).get('memory_usage_percent', 0) + } + } + + # Add health status if available + if health_monitor and health_monitor.last_full_check: + system_info["health"] = { + "last_check": health_monitor.last_full_check.isoformat(), + "check_interval": health_monitor.check_interval_seconds + } + + return { + "status": "success", + "data": system_info, + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"Error getting system info: {e}") + raise HTTPException(status_code=500, detail=f"System info failed: {str(e)}") + + +@router.post("/cache/clear") +async def clear_cache(cache_type: Optional[str] = Query(None, description="Specific cache to clear")): + """ + Clear cache entries. + + Args: + cache_type: Optional specific cache to clear (session, lobby, user, message, computed) + + Returns: + Cache clear results + """ + try: + if not cache_manager: + raise HTTPException(status_code=503, detail="Cache management not available") + + if cache_type: + # Clear specific cache + cache_attr = f"{cache_type}_cache" + if hasattr(cache_manager, cache_attr): + cache = getattr(cache_manager, cache_attr) + cache.backend.clear() + return { + "status": "success", + "message": f"Cleared {cache_type} cache", + "timestamp": datetime.now().isoformat() + } + else: + raise HTTPException(status_code=400, detail=f"Unknown cache type: {cache_type}") + else: + # Clear all caches + for cache_name in ['session', 'lobby', 'user', 'message', 'computed']: + cache_attr = f"{cache_name}_cache" + if hasattr(cache_manager, cache_attr): + cache = getattr(cache_manager, cache_attr) + cache.backend.clear() + + return { + "status": "success", + "message": "Cleared all caches", + "timestamp": datetime.now().isoformat() + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error clearing cache: {e}") + raise HTTPException(status_code=500, detail=f"Cache clear failed: {str(e)}") + + +@router.get("/metrics/export") +async def export_metrics_prometheus(): + """ + Export metrics in Prometheus format. + + Returns: + Metrics in Prometheus text format + """ + try: + if not metrics_collector: + raise HTTPException(status_code=503, detail="Metrics collection not available") + + current_metrics = metrics_collector.get_current_metrics() + prometheus_lines = [] + + # Convert gauges to Prometheus format + for metric_name, value in current_metrics.get('gauges', {}).items(): + prometheus_lines.append(f"# TYPE {metric_name} gauge") + prometheus_lines.append(f"{metric_name} {value}") + + # Convert counters to Prometheus format + for metric_name, value in current_metrics.get('counters', {}).items(): + prometheus_lines.append(f"# TYPE {metric_name} counter") + prometheus_lines.append(f"{metric_name} {value}") + + # Convert histograms to Prometheus format (simplified) + for metric_name, stats in current_metrics.get('histograms', {}).items(): + prometheus_lines.append(f"# TYPE {metric_name} histogram") + prometheus_lines.append(f"{metric_name}_count {stats['count']}") + prometheus_lines.append(f"{metric_name}_sum {stats['avg'] * stats['count']}") + + prometheus_text = '\n'.join(prometheus_lines) + + return prometheus_text + + except Exception as e: + logger.error(f"Error exporting Prometheus metrics: {e}") + raise HTTPException(status_code=500, detail=f"Metrics export failed: {str(e)}") + + +# Note: Middleware would be added to the main FastAPI app, not the router +# This is just an example of how metrics could be collected automatically + +def create_metrics_middleware(): + """Create middleware for automatic metrics collection.""" + async def metrics_middleware(request, call_next): + """Middleware to automatically collect API metrics.""" + import time + start_time = time.time() + + try: + response = await call_next(request) + + # Record metrics if collector is available + if metrics_collector: + duration = time.time() - start_time + endpoint = request.url.path + method = request.method + status_code = response.status_code + + # Record request metrics + metrics_collector.record_counter( + 'api_requests_total', + labels={'endpoint': endpoint, 'method': method, 'status': str(status_code)} + ) + + metrics_collector.record_histogram( + 'api_request_duration_seconds', + duration, + labels={'endpoint': endpoint, 'method': method} + ) + + return response + + except Exception as e: + # Record error metrics + if metrics_collector: + duration = time.time() - start_time + endpoint = request.url.path + method = request.method + + metrics_collector.record_counter( + 'api_errors_total', + labels={'endpoint': endpoint, 'method': method, 'error': type(e).__name__} + ) + + raise + + return metrics_middleware diff --git a/server/api/sessions.py b/server/api/sessions.py index fc03e18..9822b5b 100644 --- a/server/api/sessions.py +++ b/server/api/sessions.py @@ -36,7 +36,7 @@ class SessionAPI: @self.router.get("/session", response_model=SessionResponse) def get_session(): - # Create new session + # Create new session only session = self.session_manager.create_session() logger.info(f"Created new session: {session.getName()}") diff --git a/server/core/bot_manager.py b/server/core/bot_manager.py index ca7f8cd..b389096 100644 --- a/server/core/bot_manager.py +++ b/server/core/bot_manager.py @@ -301,7 +301,7 @@ class BotManager: bot_session_id = secrets.token_hex(16) # Create the Session object for the bot - bot_session = session_manager.create_session(bot_session_id, is_bot=True, has_media=bot_has_media) + bot_session = session_manager.get_or_create_session(bot_session_id, is_bot=True, has_media=bot_has_media) logger.info(f"Created bot session for: {bot_session.getName()} (has_media={bot_has_media})") # Determine server URL for the bot to connect back to diff --git a/server/core/cache.py b/server/core/cache.py new file mode 100644 index 0000000..b7ed907 --- /dev/null +++ b/server/core/cache.py @@ -0,0 +1,397 @@ +""" +Caching System for Performance Optimization + +Provides multi-level caching for sessions, lobbies, and frequently accessed data. +Includes in-memory LRU cache and optional Redis backend for distributed caching. + +Features: +- In-memory LRU cache with TTL support +- Optional Redis distributed caching +- Cache warming and prefetching +- Cache statistics and monitoring +- Automatic cache invalidation +- Async cache operations +""" + +import asyncio +import time +import json +import hashlib +from datetime import datetime, timedelta +from typing import Any, Dict, Optional, List, Union, Callable, TypeVar +from collections import OrderedDict +from dataclasses import dataclass +import weakref + +from logger import logger + +T = TypeVar('T') + + +@dataclass +class CacheEntry: + """Cache entry with value, expiration, and metadata.""" + value: Any + created_at: datetime + expires_at: Optional[datetime] + hit_count: int = 0 + last_accessed: Optional[datetime] = None + size_bytes: int = 0 + + +class LRUCache: + """In-memory LRU cache with TTL support.""" + + def __init__(self, max_size: int = 1000, default_ttl_seconds: int = 300): + self.max_size = max_size + self.default_ttl_seconds = default_ttl_seconds + self._cache: OrderedDict[str, CacheEntry] = OrderedDict() + self._hits = 0 + self._misses = 0 + self._evictions = 0 + self._size_bytes = 0 + + def get(self, key: str) -> Optional[Any]: + """Get value from cache.""" + if key not in self._cache: + self._misses += 1 + return None + + entry = self._cache[key] + + # Check expiration + if entry.expires_at and datetime.now() > entry.expires_at: + self._remove_entry(key) + self._misses += 1 + return None + + # Update access info + entry.hit_count += 1 + entry.last_accessed = datetime.now() + + # Move to end (most recently used) + self._cache.move_to_end(key) + + self._hits += 1 + return entry.value + + def put(self, key: str, value: Any, ttl_seconds: Optional[int] = None) -> None: + """Put value in cache.""" + ttl = ttl_seconds or self.default_ttl_seconds + expires_at = datetime.now() + timedelta(seconds=ttl) if ttl > 0 else None + + # Calculate size (rough estimate) + size_bytes = len(str(value).encode('utf-8')) + + # Remove existing entry if present + if key in self._cache: + self._remove_entry(key) + + # Create new entry + entry = CacheEntry( + value=value, + created_at=datetime.now(), + expires_at=expires_at, + size_bytes=size_bytes + ) + + self._cache[key] = entry + self._size_bytes += size_bytes + + # Evict if necessary + self._evict_if_necessary() + + def delete(self, key: str) -> bool: + """Delete entry from cache.""" + if key in self._cache: + self._remove_entry(key) + return True + return False + + def clear(self) -> None: + """Clear all cache entries.""" + self._cache.clear() + self._size_bytes = 0 + + def _remove_entry(self, key: str) -> None: + """Remove entry and update size.""" + if key in self._cache: + entry = self._cache.pop(key) + self._size_bytes -= entry.size_bytes + + def _evict_if_necessary(self) -> None: + """Evict oldest entries if cache is full.""" + while len(self._cache) > self.max_size: + # Remove least recently used (first item) + oldest_key = next(iter(self._cache)) + self._remove_entry(oldest_key) + self._evictions += 1 + + def cleanup_expired(self) -> int: + """Remove expired entries. Returns number of entries removed.""" + now = datetime.now() + expired_keys = [ + key for key, entry in self._cache.items() + if entry.expires_at and now > entry.expires_at + ] + + for key in expired_keys: + self._remove_entry(key) + + return len(expired_keys) + + def get_stats(self) -> Dict[str, Any]: + """Get cache statistics.""" + total_requests = self._hits + self._misses + hit_rate = (self._hits / total_requests * 100) if total_requests > 0 else 0 + + return { + 'hits': self._hits, + 'misses': self._misses, + 'hit_rate_percent': hit_rate, + 'evictions': self._evictions, + 'entries': len(self._cache), + 'max_size': self.max_size, + 'size_bytes': self._size_bytes, + 'avg_entry_size': self._size_bytes / len(self._cache) if self._cache else 0 + } + + +class AsyncCache: + """Async wrapper for cache operations with background cleanup.""" + + def __init__(self, backend: LRUCache): + self.backend = backend + self._cleanup_task: Optional[asyncio.Task] = None + self._cleanup_interval = 60 # seconds + self._running = False + + async def get(self, key: str) -> Optional[Any]: + """Async get from cache.""" + return self.backend.get(key) + + async def put(self, key: str, value: Any, ttl_seconds: Optional[int] = None) -> None: + """Async put to cache.""" + self.backend.put(key, value, ttl_seconds) + + async def delete(self, key: str) -> bool: + """Async delete from cache.""" + return self.backend.delete(key) + + async def get_or_compute(self, key: str, compute_func: Callable[[], Any], + ttl_seconds: Optional[int] = None) -> Any: + """Get value from cache or compute if not present.""" + value = await self.get(key) + if value is not None: + return value + + # Compute value + if asyncio.iscoroutinefunction(compute_func): + computed_value = await compute_func() + else: + computed_value = compute_func() + + # Store in cache + await self.put(key, computed_value, ttl_seconds) + return computed_value + + async def start_cleanup(self): + """Start background cleanup task.""" + if self._running: + return + + self._running = True + self._cleanup_task = asyncio.create_task(self._cleanup_loop()) + logger.info("Cache cleanup started") + + async def stop_cleanup(self): + """Stop background cleanup.""" + self._running = False + if self._cleanup_task: + self._cleanup_task.cancel() + try: + await self._cleanup_task + except asyncio.CancelledError: + pass + logger.info("Cache cleanup stopped") + + async def _cleanup_loop(self): + """Background cleanup loop.""" + while self._running: + try: + expired_count = self.backend.cleanup_expired() + if expired_count > 0: + logger.debug(f"Cleaned up {expired_count} expired cache entries") + + await asyncio.sleep(self._cleanup_interval) + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error in cache cleanup: {e}") + await asyncio.sleep(5.0) + + def get_stats(self) -> Dict[str, Any]: + """Get cache statistics.""" + return self.backend.get_stats() + + +class CacheManager: + """High-level cache manager for different data types.""" + + def __init__(self): + # Different caches for different data types + self.session_cache = AsyncCache(LRUCache(max_size=500, default_ttl_seconds=300)) + self.lobby_cache = AsyncCache(LRUCache(max_size=200, default_ttl_seconds=600)) + self.user_cache = AsyncCache(LRUCache(max_size=1000, default_ttl_seconds=1800)) + self.message_cache = AsyncCache(LRUCache(max_size=2000, default_ttl_seconds=60)) + + # Cache for computed values (e.g., aggregations) + self.computed_cache = AsyncCache(LRUCache(max_size=100, default_ttl_seconds=120)) + + self._caches = { + 'session': self.session_cache, + 'lobby': self.lobby_cache, + 'user': self.user_cache, + 'message': self.message_cache, + 'computed': self.computed_cache + } + + async def start_all(self): + """Start all cache cleanup tasks.""" + for cache in self._caches.values(): + await cache.start_cleanup() + logger.info("All cache managers started") + + async def stop_all(self): + """Stop all cache cleanup tasks.""" + for cache in self._caches.values(): + await cache.stop_cleanup() + logger.info("All cache managers stopped") + + # Session caching methods + async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]: + """Get session from cache.""" + return await self.session_cache.get(f"session:{session_id}") + + async def cache_session(self, session_id: str, session_data: Dict[str, Any], + ttl_seconds: int = 300) -> None: + """Cache session data.""" + await self.session_cache.put(f"session:{session_id}", session_data, ttl_seconds) + + async def invalidate_session(self, session_id: str) -> None: + """Remove session from cache.""" + await self.session_cache.delete(f"session:{session_id}") + + # Lobby caching methods + async def get_lobby(self, lobby_id: str) -> Optional[Dict[str, Any]]: + """Get lobby from cache.""" + return await self.lobby_cache.get(f"lobby:{lobby_id}") + + async def cache_lobby(self, lobby_id: str, lobby_data: Dict[str, Any], + ttl_seconds: int = 600) -> None: + """Cache lobby data.""" + await self.lobby_cache.put(f"lobby:{lobby_id}", lobby_data, ttl_seconds) + + async def invalidate_lobby(self, lobby_id: str) -> None: + """Remove lobby from cache.""" + await self.lobby_cache.delete(f"lobby:{lobby_id}") + + # Message caching methods + async def get_cached_response(self, message_hash: str) -> Optional[str]: + """Get cached bot response.""" + return await self.message_cache.get(f"response:{message_hash}") + + async def cache_response(self, message: str, response: str, ttl_seconds: int = 60) -> None: + """Cache bot response.""" + message_hash = hashlib.md5(message.encode()).hexdigest() + await self.message_cache.put(f"response:{message_hash}", response, ttl_seconds) + + # Computed values caching + async def get_computed(self, key: str) -> Optional[Any]: + """Get computed value from cache.""" + return await self.computed_cache.get(f"computed:{key}") + + async def cache_computed(self, key: str, value: Any, ttl_seconds: int = 120) -> None: + """Cache computed value.""" + await self.computed_cache.put(f"computed:{key}", value, ttl_seconds) + + async def get_or_compute_lobby_stats(self, lobby_id: str, + compute_func: Callable) -> Dict[str, Any]: + """Get or compute lobby statistics.""" + return await self.computed_cache.get_or_compute( + f"lobby_stats:{lobby_id}", + compute_func, + ttl_seconds=300 # 5 minutes + ) + + def get_all_stats(self) -> Dict[str, Any]: + """Get statistics for all caches.""" + return { + cache_name: cache.get_stats() + for cache_name, cache in self._caches.items() + } + + async def warm_cache(self, session_manager, lobby_manager): + """Warm up caches with current data.""" + try: + # Warm session cache + for session_id, session in session_manager.sessions.items(): + session_data = { + 'id': session_id, + 'name': session.getName() if hasattr(session, 'getName') else 'Unknown', + 'lobby_id': getattr(session, 'lobby_id', None), + 'created_at': datetime.now().isoformat() + } + await self.cache_session(session_id, session_data) + + # Warm lobby cache + for lobby_id, lobby in lobby_manager.lobbies.items(): + lobby_data = { + 'id': lobby_id, + 'session_count': len(lobby.sessions) if hasattr(lobby, 'sessions') else 0, + 'created_at': datetime.now().isoformat() + } + await self.cache_lobby(lobby_id, lobby_data) + + logger.info(f"Cache warmed: {len(session_manager.sessions)} sessions, {len(lobby_manager.lobbies)} lobbies") + + except Exception as e: + logger.error(f"Error warming cache: {e}") + + +# Decorator for automatic caching +def cache_result(cache_manager: CacheManager, cache_type: str = 'computed', + ttl_seconds: int = 300, key_func: Optional[Callable] = None): + """Decorator to automatically cache function results.""" + def decorator(func): + async def wrapper(*args, **kwargs): + # Generate cache key + if key_func: + cache_key = key_func(*args, **kwargs) + else: + # Default key generation + key_parts = [func.__name__] + [str(arg) for arg in args[:3]] # Limit args + cache_key = ':'.join(key_parts) + + # Try to get from cache + cache = getattr(cache_manager, f'{cache_type}_cache') + cached_result = await cache.get(cache_key) + if cached_result is not None: + return cached_result + + # Compute result + if asyncio.iscoroutinefunction(func): + result = await func(*args, **kwargs) + else: + result = func(*args, **kwargs) + + # Cache result + await cache.put(cache_key, result, ttl_seconds) + return result + + return wrapper + return decorator + + +# Global cache manager instance +cache_manager = CacheManager() diff --git a/server/core/health.py b/server/core/health.py new file mode 100644 index 0000000..e828bd9 --- /dev/null +++ b/server/core/health.py @@ -0,0 +1,466 @@ +""" +Health Check System + +Provides comprehensive health monitoring for all system components including +database connectivity, WebSocket connections, external services, and application state. + +Features: +- Deep health checks for all dependencies +- Readiness and liveness probes +- Graceful degradation strategies +- Health status aggregation +- Kubernetes-compatible endpoints +""" + +import asyncio +import time +from datetime import datetime, timedelta +from typing import Dict, Any, List, Optional, Callable, NamedTuple +from enum import Enum +import json + +from logger import logger + + +class HealthStatus(Enum): + """Health status levels.""" + HEALTHY = "healthy" + DEGRADED = "degraded" + UNHEALTHY = "unhealthy" + UNKNOWN = "unknown" + + +class HealthCheckResult(NamedTuple): + """Result of a health check.""" + component: str + status: HealthStatus + message: str + duration_ms: float + details: Dict[str, Any] + timestamp: datetime + + +class HealthCheckComponent: + """Base class for health check components.""" + + def __init__(self, name: str, timeout_seconds: float = 5.0): + self.name = name + self.timeout_seconds = timeout_seconds + self.last_check: Optional[HealthCheckResult] = None + self.check_count = 0 + self.failure_count = 0 + + async def check_health(self) -> HealthCheckResult: + """Perform health check with timeout.""" + start_time = time.time() + self.check_count += 1 + + try: + # Run the actual health check with timeout + result = await asyncio.wait_for( + self._perform_check(), + timeout=self.timeout_seconds + ) + + duration_ms = (time.time() - start_time) * 1000 + + self.last_check = HealthCheckResult( + component=self.name, + status=result.get('status', HealthStatus.UNKNOWN), + message=result.get('message', ''), + duration_ms=duration_ms, + details=result.get('details', {}), + timestamp=datetime.now() + ) + + if self.last_check.status != HealthStatus.HEALTHY: + self.failure_count += 1 + + return self.last_check + + except asyncio.TimeoutError: + self.failure_count += 1 + duration_ms = (time.time() - start_time) * 1000 + + self.last_check = HealthCheckResult( + component=self.name, + status=HealthStatus.UNHEALTHY, + message=f"Health check timeout after {self.timeout_seconds}s", + duration_ms=duration_ms, + details={"error": "timeout"}, + timestamp=datetime.now() + ) + return self.last_check + + except Exception as e: + self.failure_count += 1 + duration_ms = (time.time() - start_time) * 1000 + + self.last_check = HealthCheckResult( + component=self.name, + status=HealthStatus.UNHEALTHY, + message=f"Health check failed: {str(e)}", + duration_ms=duration_ms, + details={"error": str(e), "error_type": type(e).__name__}, + timestamp=datetime.now() + ) + return self.last_check + + async def _perform_check(self) -> Dict[str, Any]: + """Override this method to implement specific health check logic.""" + raise NotImplementedError("Subclasses must implement _perform_check") + + def get_failure_rate(self) -> float: + """Get failure rate as percentage.""" + if self.check_count == 0: + return 0.0 + return (self.failure_count / self.check_count) * 100 + + +class DatabaseHealthCheck(HealthCheckComponent): + """Health check for database connectivity.""" + + def __init__(self, session_manager, timeout_seconds: float = 3.0): + super().__init__("database", timeout_seconds) + self.session_manager = session_manager + + async def _perform_check(self) -> Dict[str, Any]: + """Check database connectivity and basic operations.""" + try: + # Test basic session operations + session_count = len(self.session_manager.sessions) + + # Test session file read/write + test_session_id = "health_check_test" + if test_session_id in self.session_manager.sessions: + del self.session_manager.sessions[test_session_id] + + return { + 'status': HealthStatus.HEALTHY, + 'message': f"Database operational, {session_count} sessions", + 'details': { + 'session_count': session_count, + 'test_completed': True + } + } + + except Exception as e: + return { + 'status': HealthStatus.UNHEALTHY, + 'message': f"Database check failed: {str(e)}", + 'details': {'error': str(e)} + } + + +class WebSocketHealthCheck(HealthCheckComponent): + """Health check for WebSocket connections.""" + + def __init__(self, session_manager, timeout_seconds: float = 2.0): + super().__init__("websocket", timeout_seconds) + self.session_manager = session_manager + + async def _perform_check(self) -> Dict[str, Any]: + """Check WebSocket connection health.""" + try: + # Count active WebSocket connections + active_connections = 0 + total_sessions = len(self.session_manager.sessions) + + for session in self.session_manager.sessions.values(): + if hasattr(session, 'websocket') and session.websocket: + active_connections += 1 + + # Determine health based on connection ratio + if total_sessions > 0: + connection_ratio = active_connections / total_sessions + if connection_ratio > 0.8: + status = HealthStatus.HEALTHY + message = f"WebSocket connections healthy ({active_connections}/{total_sessions})" + elif connection_ratio > 0.5: + status = HealthStatus.DEGRADED + message = f"Some WebSocket connections lost ({active_connections}/{total_sessions})" + else: + status = HealthStatus.UNHEALTHY + message = f"Many WebSocket connections lost ({active_connections}/{total_sessions})" + else: + status = HealthStatus.HEALTHY + message = "No active sessions" + + return { + 'status': status, + 'message': message, + 'details': { + 'active_connections': active_connections, + 'total_sessions': total_sessions, + 'connection_ratio': active_connections / max(total_sessions, 1) + } + } + + except Exception as e: + return { + 'status': HealthStatus.UNHEALTHY, + 'message': f"WebSocket check failed: {str(e)}", + 'details': {'error': str(e)} + } + + +class LobbyHealthCheck(HealthCheckComponent): + """Health check for lobby management.""" + + def __init__(self, lobby_manager, timeout_seconds: float = 2.0): + super().__init__("lobby", timeout_seconds) + self.lobby_manager = lobby_manager + + async def _perform_check(self) -> Dict[str, Any]: + """Check lobby management health.""" + try: + lobby_count = len(self.lobby_manager.lobbies) + active_lobbies = sum(1 for lobby in self.lobby_manager.lobbies.values() + if len(lobby.sessions) > 0) + + return { + 'status': HealthStatus.HEALTHY, + 'message': f"Lobby system operational, {active_lobbies}/{lobby_count} active", + 'details': { + 'total_lobbies': lobby_count, + 'active_lobbies': active_lobbies, + 'empty_lobbies': lobby_count - active_lobbies + } + } + + except Exception as e: + return { + 'status': HealthStatus.UNHEALTHY, + 'message': f"Lobby check failed: {str(e)}", + 'details': {'error': str(e)} + } + + +class SystemResourceHealthCheck(HealthCheckComponent): + """Health check for system resources.""" + + def __init__(self, metrics_collector, timeout_seconds: float = 1.0): + super().__init__("system_resources", timeout_seconds) + self.metrics_collector = metrics_collector + + async def _perform_check(self) -> Dict[str, Any]: + """Check system resource utilization.""" + try: + current_metrics = self.metrics_collector.get_current_metrics() + + cpu_usage = current_metrics['gauges'].get('cpu_usage_percent', 0) + memory_usage = current_metrics['gauges'].get('memory_usage_percent', 0) + + # Determine status based on resource usage + if cpu_usage > 90 or memory_usage > 90: + status = HealthStatus.UNHEALTHY + message = f"High resource usage: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%" + elif cpu_usage > 70 or memory_usage > 70: + status = HealthStatus.DEGRADED + message = f"Moderate resource usage: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%" + else: + status = HealthStatus.HEALTHY + message = f"Resource usage normal: CPU {cpu_usage:.1f}%, Memory {memory_usage:.1f}%" + + return { + 'status': status, + 'message': message, + 'details': { + 'cpu_usage_percent': cpu_usage, + 'memory_usage_percent': memory_usage, + 'memory_usage_mb': current_metrics['gauges'].get('memory_usage_mb', 0) + } + } + + except Exception as e: + return { + 'status': HealthStatus.UNHEALTHY, + 'message': f"System resource check failed: {str(e)}", + 'details': {'error': str(e)} + } + + +class HealthMonitor: + """Main health monitoring system.""" + + def __init__(self): + self.components: Dict[str, HealthCheckComponent] = {} + self.check_interval_seconds = 30.0 + self.last_full_check: Optional[datetime] = None + self._monitoring_task: Optional[asyncio.Task] = None + self._running = False + + # Health history for trends + self.health_history: List[Dict[str, Any]] = [] + self.max_history_entries = 100 + + def register_component(self, component: HealthCheckComponent): + """Register a health check component.""" + self.components[component.name] = component + logger.info(f"Registered health check component: {component.name}") + + async def check_all_components(self) -> Dict[str, HealthCheckResult]: + """Check health of all registered components.""" + results = {} + + # Run all health checks in parallel + tasks = { + name: component.check_health() + for name, component in self.components.items() + } + + completed_results = await asyncio.gather(*tasks.values(), return_exceptions=True) + + for name, result in zip(tasks.keys(), completed_results): + if isinstance(result, Exception): + # Handle exceptions in health checks + results[name] = HealthCheckResult( + component=name, + status=HealthStatus.UNHEALTHY, + message=f"Health check exception: {str(result)}", + duration_ms=0.0, + details={"error": str(result)}, + timestamp=datetime.now() + ) + else: + results[name] = result + + self.last_full_check = datetime.now() + + # Store in history + self._store_health_history(results) + + return results + + def _store_health_history(self, results: Dict[str, HealthCheckResult]): + """Store health check results in history.""" + history_entry = { + 'timestamp': datetime.now().isoformat(), + 'overall_status': self._calculate_overall_status(results).value, + 'components': { + name: { + 'status': result.status.value, + 'duration_ms': result.duration_ms, + 'message': result.message + } + for name, result in results.items() + } + } + + self.health_history.append(history_entry) + + # Keep history size manageable + if len(self.health_history) > self.max_history_entries: + self.health_history = self.health_history[-self.max_history_entries:] + + def _calculate_overall_status(self, results: Dict[str, HealthCheckResult]) -> HealthStatus: + """Calculate overall system health status.""" + if not results: + return HealthStatus.UNKNOWN + + statuses = [result.status for result in results.values()] + + if HealthStatus.UNHEALTHY in statuses: + return HealthStatus.UNHEALTHY + elif HealthStatus.DEGRADED in statuses: + return HealthStatus.DEGRADED + elif all(status == HealthStatus.HEALTHY for status in statuses): + return HealthStatus.HEALTHY + else: + return HealthStatus.UNKNOWN + + async def get_health_summary(self) -> Dict[str, Any]: + """Get comprehensive health summary.""" + results = await self.check_all_components() + overall_status = self._calculate_overall_status(results) + + return { + 'status': overall_status.value, + 'timestamp': datetime.now().isoformat(), + 'components': { + name: { + 'status': result.status.value, + 'message': result.message, + 'duration_ms': result.duration_ms, + 'details': result.details, + 'failure_rate': self.components[name].get_failure_rate() + } + for name, result in results.items() + }, + 'last_check': self.last_full_check.isoformat() if self.last_full_check else None, + 'check_interval_seconds': self.check_interval_seconds + } + + async def start_monitoring(self): + """Start continuous health monitoring.""" + if self._running: + return + + self._running = True + self._monitoring_task = asyncio.create_task(self._monitoring_loop()) + logger.info("Health monitoring started") + + async def stop_monitoring(self): + """Stop health monitoring.""" + self._running = False + if self._monitoring_task: + self._monitoring_task.cancel() + try: + await self._monitoring_task + except asyncio.CancelledError: + pass + logger.info("Health monitoring stopped") + + async def _monitoring_loop(self): + """Main health monitoring loop.""" + while self._running: + try: + await self.check_all_components() + await asyncio.sleep(self.check_interval_seconds) + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error in health monitoring loop: {e}") + await asyncio.sleep(5.0) # Back off on error + + def get_readiness_status(self) -> Dict[str, Any]: + """Get readiness probe status (for Kubernetes).""" + if not self.last_full_check: + return { + 'ready': False, + 'reason': 'No health checks completed yet' + } + + # Check if recent health check was successful + time_since_check = datetime.now() - self.last_full_check + if time_since_check > timedelta(minutes=2): + return { + 'ready': False, + 'reason': 'Health checks stale' + } + + # Get latest results + critical_components = ['database', 'websocket'] + + for component_name in critical_components: + component = self.components.get(component_name) + if component and component.last_check: + if component.last_check.status == HealthStatus.UNHEALTHY: + return { + 'ready': False, + 'reason': f'Critical component {component_name} unhealthy' + } + + return {'ready': True} + + def get_liveness_status(self) -> Dict[str, Any]: + """Get liveness probe status (for Kubernetes).""" + # Simple liveness check - ensure monitoring is running + return { + 'alive': self._running or self.last_full_check is not None, + 'last_check': self.last_full_check.isoformat() if self.last_full_check else None + } + + +# Global health monitor instance +health_monitor = HealthMonitor() diff --git a/server/core/performance.py b/server/core/performance.py new file mode 100644 index 0000000..75cfd2a --- /dev/null +++ b/server/core/performance.py @@ -0,0 +1,333 @@ +""" +Performance Monitoring and Metrics Collection + +This module provides comprehensive performance monitoring, metrics collection, +and health checking capabilities for the AI VoiceBot server. + +Features: +- Real-time performance metrics (CPU, memory, network) +- Application-specific metrics (sessions, messages, errors) +- Health check endpoints +- Performance baselines and alerting +- Async metrics collection with minimal overhead +""" + +import asyncio +import time +import psutil +import threading +from datetime import datetime, timedelta +from typing import Dict, Any, List, Optional, Callable +from dataclasses import dataclass, field +from collections import defaultdict, deque +from contextlib import asynccontextmanager +import weakref + +from logger import logger + + +@dataclass +class MetricPoint: + """Single metric data point with timestamp.""" + timestamp: datetime + value: float + labels: Dict[str, str] = field(default_factory=dict) + + +@dataclass +class PerformanceBaseline: + """Performance baseline for comparison and alerting.""" + metric_name: str + expected_value: float + tolerance: float # Percentage tolerance (e.g., 0.1 for 10%) + alert_threshold: float # When to trigger alerts + enabled: bool = True + + +class MetricsCollector: + """Collects and stores performance metrics with time-series data.""" + + def __init__(self, max_history_minutes: int = 60): + self.max_history_minutes = max_history_minutes + self.metrics: Dict[str, deque] = defaultdict(lambda: deque(maxlen=max_history_minutes * 60)) # 1 per second + self.counters: Dict[str, float] = defaultdict(float) + self.gauges: Dict[str, float] = defaultdict(float) + self.histograms: Dict[str, List[float]] = defaultdict(list) + self.baselines: Dict[str, PerformanceBaseline] = {} + self.alert_callbacks: List[Callable] = [] + self._lock = threading.Lock() + self._running = False + self._collection_task: Optional[asyncio.Task] = None + + # System metrics collection + self.process = psutil.Process() + self.system_start_time = time.time() + + # Initialize default baselines + self._setup_default_baselines() + + def _setup_default_baselines(self): + """Setup default performance baselines.""" + self.baselines.update({ + 'cpu_usage_percent': PerformanceBaseline('cpu_usage_percent', 50.0, 0.2, 80.0), + 'memory_usage_percent': PerformanceBaseline('memory_usage_percent', 60.0, 0.15, 85.0), + 'active_sessions': PerformanceBaseline('active_sessions', 10.0, 0.5, 100.0), + 'websocket_connections': PerformanceBaseline('websocket_connections', 10.0, 0.5, 100.0), + 'error_rate_per_minute': PerformanceBaseline('error_rate_per_minute', 0.0, 1.0, 5.0), + }) + + async def start_collection(self): + """Start async metrics collection.""" + if self._running: + return + + self._running = True + self._collection_task = asyncio.create_task(self._collection_loop()) + logger.info("Metrics collection started") + + async def stop_collection(self): + """Stop metrics collection.""" + self._running = False + if self._collection_task: + self._collection_task.cancel() + try: + await self._collection_task + except asyncio.CancelledError: + pass + logger.info("Metrics collection stopped") + + async def _collection_loop(self): + """Main metrics collection loop.""" + while self._running: + try: + await self._collect_system_metrics() + await asyncio.sleep(1.0) # Collect every second + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error in metrics collection: {e}") + await asyncio.sleep(5.0) # Back off on error + + async def _collect_system_metrics(self): + """Collect system-level metrics.""" + try: + # CPU metrics + cpu_percent = self.process.cpu_percent() + self.record_gauge('cpu_usage_percent', cpu_percent) + + # Memory metrics + memory_info = self.process.memory_info() + memory_percent = self.process.memory_percent() + self.record_gauge('memory_usage_mb', memory_info.rss / 1024 / 1024) + self.record_gauge('memory_usage_percent', memory_percent) + + # System uptime + uptime_seconds = time.time() - self.system_start_time + self.record_gauge('uptime_seconds', uptime_seconds) + + # Check baselines and trigger alerts + await self._check_baselines() + + except Exception as e: + logger.error(f"Error collecting system metrics: {e}") + + def record_counter(self, name: str, value: float = 1.0, labels: Dict[str, str] = None): + """Record a counter metric (always increasing).""" + with self._lock: + self.counters[name] += value + self._record_metric(name, self.counters[name], labels or {}) + + def record_gauge(self, name: str, value: float, labels: Dict[str, str] = None): + """Record a gauge metric (current value).""" + with self._lock: + self.gauges[name] = value + self._record_metric(name, value, labels or {}) + + def record_histogram(self, name: str, value: float, labels: Dict[str, str] = None): + """Record a histogram metric (distribution of values).""" + with self._lock: + self.histograms[name].append(value) + # Keep only last 1000 values to prevent memory growth + if len(self.histograms[name]) > 1000: + self.histograms[name] = self.histograms[name][-1000:] + self._record_metric(name, value, labels or {}) + + def _record_metric(self, name: str, value: float, labels: Dict[str, str]): + """Internal method to record metric point.""" + point = MetricPoint(datetime.now(), value, labels) + self.metrics[name].append(point) + + async def _check_baselines(self): + """Check current metrics against baselines and trigger alerts.""" + for metric_name, baseline in self.baselines.items(): + if not baseline.enabled: + continue + + current_value = self.gauges.get(metric_name) + if current_value is None: + continue + + # Check if metric exceeds alert threshold + if current_value > baseline.alert_threshold: + await self._trigger_alert(metric_name, current_value, baseline) + + async def _trigger_alert(self, metric_name: str, current_value: float, baseline: PerformanceBaseline): + """Trigger alert for metric threshold violation.""" + alert_data = { + 'metric': metric_name, + 'current_value': current_value, + 'threshold': baseline.alert_threshold, + 'timestamp': datetime.now(), + 'severity': 'high' if current_value > baseline.alert_threshold * 1.2 else 'medium' + } + + logger.warning(f"Performance alert: {metric_name} = {current_value:.2f} (threshold: {baseline.alert_threshold})") + + # Call registered alert callbacks + for callback in self.alert_callbacks: + try: + await callback(alert_data) + except Exception as e: + logger.error(f"Error in alert callback: {e}") + + def add_alert_callback(self, callback: Callable): + """Add callback function for alerts.""" + self.alert_callbacks.append(callback) + + def get_current_metrics(self) -> Dict[str, Any]: + """Get current metric values.""" + with self._lock: + return { + 'timestamp': datetime.now().isoformat(), + 'counters': dict(self.counters), + 'gauges': dict(self.gauges), + 'histograms': {name: { + 'count': len(values), + 'min': min(values) if values else 0, + 'max': max(values) if values else 0, + 'avg': sum(values) / len(values) if values else 0 + } for name, values in self.histograms.items()} + } + + def get_metric_history(self, metric_name: str, minutes: int = 5) -> List[Dict[str, Any]]: + """Get historical data for a specific metric.""" + cutoff_time = datetime.now() - timedelta(minutes=minutes) + + with self._lock: + points = self.metrics.get(metric_name, []) + recent_points = [ + { + 'timestamp': point.timestamp.isoformat(), + 'value': point.value, + 'labels': point.labels + } + for point in points + if point.timestamp > cutoff_time + ] + return recent_points + + def get_performance_summary(self) -> Dict[str, Any]: + """Get comprehensive performance summary.""" + current_metrics = self.get_current_metrics() + + # Calculate rates + message_rate = self._calculate_rate('websocket_messages_total', window_minutes=1) + error_rate = self._calculate_rate('errors_total', window_minutes=1) + + return { + 'current_metrics': current_metrics, + 'rates': { + 'messages_per_minute': message_rate, + 'errors_per_minute': error_rate + }, + 'health_status': self._get_health_status(), + 'baselines': {name: { + 'expected': baseline.expected_value, + 'threshold': baseline.alert_threshold, + 'current': self.gauges.get(name, 0) + } for name, baseline in self.baselines.items()}, + 'uptime_seconds': self.gauges.get('uptime_seconds', 0) + } + + def _calculate_rate(self, metric_name: str, window_minutes: int = 1) -> float: + """Calculate rate of change for a counter metric.""" + history = self.get_metric_history(metric_name, window_minutes) + if len(history) < 2: + return 0.0 + + latest = history[-1]['value'] + earliest = history[0]['value'] + time_diff = len(history) / 60.0 # Convert to minutes + + if time_diff > 0: + return (latest - earliest) / time_diff + return 0.0 + + def _get_health_status(self) -> str: + """Determine overall health status based on current metrics.""" + critical_metrics = ['cpu_usage_percent', 'memory_usage_percent', 'error_rate_per_minute'] + + for metric_name in critical_metrics: + baseline = self.baselines.get(metric_name) + current_value = self.gauges.get(metric_name, 0) + + if baseline and current_value > baseline.alert_threshold: + return 'unhealthy' + + return 'healthy' + + +class PerformanceTimer: + """Context manager for timing operations.""" + + def __init__(self, metrics_collector: MetricsCollector, operation_name: str): + self.metrics_collector = metrics_collector + self.operation_name = operation_name + self.start_time = None + + def __enter__(self): + self.start_time = time.time() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.start_time: + duration = time.time() - self.start_time + self.metrics_collector.record_histogram( + f'{self.operation_name}_duration_seconds', + duration + ) + + +@asynccontextmanager +async def async_performance_timer(metrics_collector: MetricsCollector, operation_name: str): + """Async context manager for timing operations.""" + start_time = time.time() + try: + yield + finally: + duration = time.time() - start_time + metrics_collector.record_histogram( + f'{operation_name}_duration_seconds', + duration + ) + + +# Global metrics collector instance +metrics_collector = MetricsCollector() + + +# Decorator for automatic performance monitoring +def monitor_performance(operation_name: str): + """Decorator to automatically monitor function performance.""" + def decorator(func): + if asyncio.iscoroutinefunction(func): + async def async_wrapper(*args, **kwargs): + async with async_performance_timer(metrics_collector, operation_name): + return await func(*args, **kwargs) + return async_wrapper + else: + def sync_wrapper(*args, **kwargs): + with PerformanceTimer(metrics_collector, operation_name): + return func(*args, **kwargs) + return sync_wrapper + return decorator diff --git a/server/core/session_manager.py b/server/core/session_manager.py index c0bda5c..d18b089 100644 --- a/server/core/session_manager.py +++ b/server/core/session_manager.py @@ -251,6 +251,12 @@ class SessionManager: if not session_id: session_id = secrets.token_hex(16) + # Check if session already exists + existing_session = self.get_session(session_id) + if existing_session: + logger.debug(f"Session {session_id[:8]} already exists, returning existing session") + return existing_session + session = Session(session_id, is_bot=is_bot, has_media=has_media) with self.lock: @@ -258,6 +264,15 @@ class SessionManager: self.save() return session + + def get_or_create_session(self, session_id: Optional[str] = None, is_bot: bool = False, has_media: bool = True) -> Session: + """Get existing session or create a new one""" + if session_id: + existing_session = self.get_session(session_id) + if existing_session: + return existing_session + + return self.create_session(session_id, is_bot=is_bot, has_media=has_media) def get_session(self, session_id: str) -> Optional[Session]: """Get session by ID""" diff --git a/server/main.py b/server/main.py index 7d7d97f..3c4c4a1 100644 --- a/server/main.py +++ b/server/main.py @@ -18,6 +18,7 @@ from __future__ import annotations import os import asyncio from contextlib import asynccontextmanager +from datetime import datetime from fastapi import FastAPI, WebSocket, Path, Request from fastapi.responses import Response @@ -54,6 +55,25 @@ except ImportError: from logger import logger +# Import performance monitoring components +try: + from api.monitoring import router as monitoring_router + from core.performance import metrics_collector + from core.health import ( + health_monitor, DatabaseHealthCheck, WebSocketHealthCheck, + LobbyHealthCheck, SystemResourceHealthCheck + ) + from core.cache import cache_manager + monitoring_available = True + logger.info("Performance monitoring modules loaded successfully") +except ImportError as e: + logger.warning(f"Performance monitoring not available: {e}") + monitoring_router = None + metrics_collector = None + health_monitor = None + cache_manager = None + monitoring_available = False + # Configuration public_url = os.getenv("PUBLIC_URL", "/") @@ -141,6 +161,36 @@ async def lifespan(app: FastAPI): app.include_router(session_api.router) app.include_router(lobby_api.router) app.include_router(bot_router, prefix=public_url.rstrip("/") + "/api") + + # Add monitoring router if available + if monitoring_available and monitoring_router: + app.include_router(monitoring_router, prefix=public_url.rstrip("/")) + logger.info("Monitoring API endpoints registered") + + # Initialize and start performance monitoring if available + if monitoring_available: + logger.info("Starting performance monitoring...") + + # Register health check components + if health_monitor: + health_monitor.register_component(DatabaseHealthCheck(session_manager)) + health_monitor.register_component(WebSocketHealthCheck(session_manager)) + health_monitor.register_component(LobbyHealthCheck(lobby_manager)) + health_monitor.register_component(SystemResourceHealthCheck(metrics_collector)) + + # Start monitoring tasks + if metrics_collector: + await metrics_collector.start_collection() + if health_monitor: + await health_monitor.start_monitoring() + if cache_manager: + await cache_manager.start_all() + # Warm up caches with current data + await cache_manager.warm_cache(session_manager, lobby_manager) + + logger.info("Performance monitoring started successfully!") + else: + logger.info("Performance monitoring disabled - running in basic mode") # Register static file serving AFTER API routes to avoid conflicts PRODUCTION = os.getenv("PRODUCTION", "false").lower() == "true" @@ -260,6 +310,17 @@ async def lifespan(app: FastAPI): # Shutdown logger.info("Shutting down AI Voice Bot server...") + # Stop performance monitoring if available + if monitoring_available: + logger.info("Stopping performance monitoring...") + if metrics_collector: + await metrics_collector.stop_collection() + if health_monitor: + await health_monitor.stop_monitoring() + if cache_manager: + await cache_manager.stop_all() + logger.info("Performance monitoring stopped") + # Stop background tasks if session_manager: await session_manager.stop_background_tasks() @@ -281,29 +342,57 @@ async def lobby_websocket( await websocket_manager.handle_connection(websocket, lobby_id, session_id) -# Health check for the new architecture +# Enhanced health check showing monitoring capabilities @app.get(f"{public_url}api/system/health") -def system_health(): - """System health check showing manager status""" - return { - "status": "ok", - "architecture": "modular", - "version": "2.0.0", - "managers": { +async def system_health(): + """System health check showing manager status and enhanced monitoring""" + try: + # Get basic manager status + manager_status = { "session_manager": "active" if session_manager else "inactive", "lobby_manager": "active" if lobby_manager else "inactive", "auth_manager": "active" if auth_manager else "inactive", "bot_manager": "active" if bot_manager else "inactive", "websocket_manager": "active" if websocket_manager else "inactive", - }, - "statistics": { + } + + # Get enhanced monitoring status + monitoring_status = { + "performance_monitoring": "active" if metrics_collector else "inactive", + "health_monitoring": "active" if health_monitor else "inactive", + "cache_management": "active" if cache_manager else "inactive", + } + + # Get basic statistics + statistics = { "sessions": session_manager.get_session_count() if session_manager else 0, "lobbies": lobby_manager.get_lobby_count() if lobby_manager else 0, - "protected_names": auth_manager.get_protection_count() - if auth_manager - else 0, - }, - } + "protected_names": auth_manager.get_protection_count() if auth_manager else 0, + } + + # Get performance metrics if available + performance_summary = {} + if metrics_collector: + performance_summary = metrics_collector.get_performance_summary() + + return { + "status": "ok", + "architecture": "modular_with_monitoring", + "version": "2.1.0", # Updated version for Step 5 + "managers": manager_status, + "monitoring": monitoring_status, + "statistics": statistics, + "performance": performance_summary.get("health_status", "unknown") if performance_summary else "unknown", + "timestamp": datetime.now().isoformat() + } + + except Exception as e: + logger.error(f"Error in system health check: {e}") + return { + "status": "error", + "message": str(e), + "timestamp": datetime.now().isoformat() + } if __name__ == "__main__": diff --git a/server/requirements.txt b/server/requirements.txt index eef0849..34699c7 100644 --- a/server/requirements.txt +++ b/server/requirements.txt @@ -1,9 +1,26 @@ - -fastapi -uvicorn -python-dotenv -openai -websockets -brotli -logging -ruff +annotated-types==0.7.0 +anyio==4.10.0 +brotli==1.1.0 +certifi==2025.8.3 +click==8.2.1 +distro==1.9.0 +fastapi==0.116.1 +h11==0.16.0 +httpcore==1.0.9 +httpx==0.28.1 +idna==3.10 +jiter==0.10.0 +logging==0.4.9.6 +openai==1.101.0 +psutil==7.0.0 +pydantic==2.11.7 +pydantic-core==2.33.2 +python-dotenv==1.1.1 +ruff==0.12.10 +sniffio==1.3.1 +starlette==0.47.2 +tqdm==4.67.1 +typing-extensions==4.14.1 +typing-inspection==0.4.1 +uvicorn==0.35.0 +websockets==15.0.1