""" Performance Monitoring and Metrics Collection This module provides comprehensive performance monitoring, metrics collection, and health checking capabilities for the AI VoiceBot server. Features: - Real-time performance metrics (CPU, memory, network) - Application-specific metrics (sessions, messages, errors) - Health check endpoints - Performance baselines and alerting - Async metrics collection with minimal overhead """ import asyncio import time import psutil import threading from datetime import datetime, timedelta from typing import Dict, Any, List, Optional, Callable from dataclasses import dataclass, field from collections import defaultdict, deque from contextlib import asynccontextmanager from logger import logger @dataclass class MetricPoint: """Single metric data point with timestamp.""" timestamp: datetime value: float labels: Dict[str, str] = field(default_factory=dict) @dataclass class PerformanceBaseline: """Performance baseline for comparison and alerting.""" metric_name: str expected_value: float tolerance: float # Percentage tolerance (e.g., 0.1 for 10%) alert_threshold: float # When to trigger alerts enabled: bool = True class MetricsCollector: """Collects and stores performance metrics with time-series data.""" def __init__(self, max_history_minutes: int = 60): self.max_history_minutes = max_history_minutes self.metrics: Dict[str, deque] = defaultdict(lambda: deque(maxlen=max_history_minutes * 60)) # 1 per second self.counters: Dict[str, float] = defaultdict(float) self.gauges: Dict[str, float] = defaultdict(float) self.histograms: Dict[str, List[float]] = defaultdict(list) self.baselines: Dict[str, PerformanceBaseline] = {} self.alert_callbacks: List[Callable] = [] self._lock = threading.Lock() self._running = False self._collection_task: Optional[asyncio.Task] = None # System metrics collection self.process = psutil.Process() self.system_start_time = time.time() # Initialize default baselines self._setup_default_baselines() def _setup_default_baselines(self): """Setup default performance baselines.""" self.baselines.update({ 'cpu_usage_percent': PerformanceBaseline('cpu_usage_percent', 50.0, 0.2, 80.0), 'memory_usage_percent': PerformanceBaseline('memory_usage_percent', 60.0, 0.15, 85.0), 'active_sessions': PerformanceBaseline('active_sessions', 10.0, 0.5, 100.0), 'websocket_connections': PerformanceBaseline('websocket_connections', 10.0, 0.5, 100.0), 'error_rate_per_minute': PerformanceBaseline('error_rate_per_minute', 0.0, 1.0, 5.0), }) async def start_collection(self): """Start async metrics collection.""" if self._running: return self._running = True self._collection_task = asyncio.create_task(self._collection_loop()) logger.info("Metrics collection started") async def stop_collection(self): """Stop metrics collection.""" self._running = False if self._collection_task: self._collection_task.cancel() try: await self._collection_task except asyncio.CancelledError: pass logger.info("Metrics collection stopped") async def _collection_loop(self): """Main metrics collection loop.""" while self._running: try: await self._collect_system_metrics() await asyncio.sleep(1.0) # Collect every second except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in metrics collection: {e}") await asyncio.sleep(5.0) # Back off on error async def _collect_system_metrics(self): """Collect system-level metrics.""" try: # CPU metrics cpu_percent = self.process.cpu_percent() self.record_gauge('cpu_usage_percent', cpu_percent) # Memory metrics memory_info = self.process.memory_info() memory_percent = self.process.memory_percent() self.record_gauge('memory_usage_mb', memory_info.rss / 1024 / 1024) self.record_gauge('memory_usage_percent', memory_percent) # System uptime uptime_seconds = time.time() - self.system_start_time self.record_gauge('uptime_seconds', uptime_seconds) # Check baselines and trigger alerts await self._check_baselines() except Exception as e: logger.error(f"Error collecting system metrics: {e}") def record_counter(self, name: str, value: float = 1.0, labels: Dict[str, str] = None): """Record a counter metric (always increasing).""" with self._lock: self.counters[name] += value self._record_metric(name, self.counters[name], labels or {}) def record_gauge(self, name: str, value: float, labels: Dict[str, str] = None): """Record a gauge metric (current value).""" with self._lock: self.gauges[name] = value self._record_metric(name, value, labels or {}) def record_histogram(self, name: str, value: float, labels: Dict[str, str] = None): """Record a histogram metric (distribution of values).""" with self._lock: self.histograms[name].append(value) # Keep only last 1000 values to prevent memory growth if len(self.histograms[name]) > 1000: self.histograms[name] = self.histograms[name][-1000:] self._record_metric(name, value, labels or {}) def _record_metric(self, name: str, value: float, labels: Dict[str, str]): """Internal method to record metric point.""" point = MetricPoint(datetime.now(), value, labels) self.metrics[name].append(point) async def _check_baselines(self): """Check current metrics against baselines and trigger alerts.""" for metric_name, baseline in self.baselines.items(): if not baseline.enabled: continue current_value = self.gauges.get(metric_name) if current_value is None: continue # Check if metric exceeds alert threshold if current_value > baseline.alert_threshold: await self._trigger_alert(metric_name, current_value, baseline) async def _trigger_alert(self, metric_name: str, current_value: float, baseline: PerformanceBaseline): """Trigger alert for metric threshold violation.""" alert_data = { 'metric': metric_name, 'current_value': current_value, 'threshold': baseline.alert_threshold, 'timestamp': datetime.now(), 'severity': 'high' if current_value > baseline.alert_threshold * 1.2 else 'medium' } logger.warning(f"Performance alert: {metric_name} = {current_value:.2f} (threshold: {baseline.alert_threshold})") # Call registered alert callbacks for callback in self.alert_callbacks: try: await callback(alert_data) except Exception as e: logger.error(f"Error in alert callback: {e}") def add_alert_callback(self, callback: Callable): """Add callback function for alerts.""" self.alert_callbacks.append(callback) def get_current_metrics(self) -> Dict[str, Any]: """Get current metric values.""" with self._lock: return { 'timestamp': datetime.now().isoformat(), 'counters': dict(self.counters), 'gauges': dict(self.gauges), 'histograms': {name: { 'count': len(values), 'min': min(values) if values else 0, 'max': max(values) if values else 0, 'avg': sum(values) / len(values) if values else 0 } for name, values in self.histograms.items()} } def get_metric_history(self, metric_name: str, minutes: int = 5) -> List[Dict[str, Any]]: """Get historical data for a specific metric.""" cutoff_time = datetime.now() - timedelta(minutes=minutes) with self._lock: points = self.metrics.get(metric_name, []) recent_points = [ { 'timestamp': point.timestamp.isoformat(), 'value': point.value, 'labels': point.labels } for point in points if point.timestamp > cutoff_time ] return recent_points def get_performance_summary(self) -> Dict[str, Any]: """Get comprehensive performance summary.""" current_metrics = self.get_current_metrics() # Calculate rates message_rate = self._calculate_rate('websocket_messages_total', window_minutes=1) error_rate = self._calculate_rate('errors_total', window_minutes=1) return { 'current_metrics': current_metrics, 'rates': { 'messages_per_minute': message_rate, 'errors_per_minute': error_rate }, 'health_status': self._get_health_status(), 'baselines': {name: { 'expected': baseline.expected_value, 'threshold': baseline.alert_threshold, 'current': self.gauges.get(name, 0) } for name, baseline in self.baselines.items()}, 'uptime_seconds': self.gauges.get('uptime_seconds', 0) } def _calculate_rate(self, metric_name: str, window_minutes: int = 1) -> float: """Calculate rate of change for a counter metric.""" history = self.get_metric_history(metric_name, window_minutes) if len(history) < 2: return 0.0 latest = history[-1]['value'] earliest = history[0]['value'] time_diff = len(history) / 60.0 # Convert to minutes if time_diff > 0: return (latest - earliest) / time_diff return 0.0 def _get_health_status(self) -> str: """Determine overall health status based on current metrics.""" critical_metrics = ['cpu_usage_percent', 'memory_usage_percent', 'error_rate_per_minute'] for metric_name in critical_metrics: baseline = self.baselines.get(metric_name) current_value = self.gauges.get(metric_name, 0) if baseline and current_value > baseline.alert_threshold: return 'unhealthy' return 'healthy' class PerformanceTimer: """Context manager for timing operations.""" def __init__(self, metrics_collector: MetricsCollector, operation_name: str): self.metrics_collector = metrics_collector self.operation_name = operation_name self.start_time = None def __enter__(self): self.start_time = time.time() return self def __exit__(self, exc_type, exc_val, exc_tb): if self.start_time: duration = time.time() - self.start_time self.metrics_collector.record_histogram( f'{self.operation_name}_duration_seconds', duration ) @asynccontextmanager async def async_performance_timer(metrics_collector: MetricsCollector, operation_name: str): """Async context manager for timing operations.""" start_time = time.time() try: yield finally: duration = time.time() - start_time metrics_collector.record_histogram( f'{operation_name}_duration_seconds', duration ) # Global metrics collector instance metrics_collector = MetricsCollector() # Decorator for automatic performance monitoring def monitor_performance(operation_name: str): """Decorator to automatically monitor function performance.""" def decorator(func): if asyncio.iscoroutinefunction(func): async def async_wrapper(*args, **kwargs): async with async_performance_timer(metrics_collector, operation_name): return await func(*args, **kwargs) return async_wrapper else: def sync_wrapper(*args, **kwargs): with PerformanceTimer(metrics_collector, operation_name): return func(*args, **kwargs) return sync_wrapper return decorator