333 lines
13 KiB
Python
333 lines
13 KiB
Python
"""
|
|
Performance Monitoring and Metrics Collection
|
|
|
|
This module provides comprehensive performance monitoring, metrics collection,
|
|
and health checking capabilities for the AI VoiceBot server.
|
|
|
|
Features:
|
|
- Real-time performance metrics (CPU, memory, network)
|
|
- Application-specific metrics (sessions, messages, errors)
|
|
- Health check endpoints
|
|
- Performance baselines and alerting
|
|
- Async metrics collection with minimal overhead
|
|
"""
|
|
|
|
import asyncio
|
|
import time
|
|
import psutil
|
|
import threading
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, Any, List, Optional, Callable
|
|
from dataclasses import dataclass, field
|
|
from collections import defaultdict, deque
|
|
from contextlib import asynccontextmanager
|
|
|
|
from shared.logger import logger
|
|
|
|
|
|
@dataclass
|
|
class MetricPoint:
|
|
"""Single metric data point with timestamp."""
|
|
timestamp: datetime
|
|
value: float
|
|
labels: Dict[str, str] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class PerformanceBaseline:
|
|
"""Performance baseline for comparison and alerting."""
|
|
metric_name: str
|
|
expected_value: float
|
|
tolerance: float # Percentage tolerance (e.g., 0.1 for 10%)
|
|
alert_threshold: float # When to trigger alerts
|
|
enabled: bool = True
|
|
|
|
|
|
class MetricsCollector:
|
|
"""Collects and stores performance metrics with time-series data."""
|
|
|
|
def __init__(self, max_history_minutes: int = 60):
|
|
self.max_history_minutes = max_history_minutes
|
|
self.metrics: Dict[str, deque] = defaultdict(lambda: deque(maxlen=max_history_minutes * 60)) # 1 per second
|
|
self.counters: Dict[str, float] = defaultdict(float)
|
|
self.gauges: Dict[str, float] = defaultdict(float)
|
|
self.histograms: Dict[str, List[float]] = defaultdict(list)
|
|
self.baselines: Dict[str, PerformanceBaseline] = {}
|
|
self.alert_callbacks: List[Callable] = []
|
|
self._lock = threading.Lock()
|
|
self._running = False
|
|
self._collection_task: Optional[asyncio.Task] = None
|
|
|
|
# System metrics collection
|
|
self.process = psutil.Process()
|
|
self.system_start_time = time.time()
|
|
|
|
# Initialize default baselines
|
|
self._setup_default_baselines()
|
|
|
|
def _setup_default_baselines(self):
|
|
"""Setup default performance baselines."""
|
|
self.baselines.update({
|
|
'cpu_usage_percent': PerformanceBaseline('cpu_usage_percent', 50.0, 0.2, 80.0),
|
|
'memory_usage_percent': PerformanceBaseline('memory_usage_percent', 60.0, 0.15, 85.0),
|
|
'active_sessions': PerformanceBaseline('active_sessions', 10.0, 0.5, 100.0),
|
|
'websocket_connections': PerformanceBaseline('websocket_connections', 10.0, 0.5, 100.0),
|
|
'error_rate_per_minute': PerformanceBaseline('error_rate_per_minute', 0.0, 1.0, 5.0),
|
|
})
|
|
|
|
async def start_collection(self):
|
|
"""Start async metrics collection."""
|
|
if self._running:
|
|
return
|
|
|
|
self._running = True
|
|
self._collection_task = asyncio.create_task(self._collection_loop())
|
|
logger.info("Metrics collection started")
|
|
|
|
async def stop_collection(self):
|
|
"""Stop metrics collection."""
|
|
self._running = False
|
|
if self._collection_task:
|
|
self._collection_task.cancel()
|
|
try:
|
|
await self._collection_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
logger.info("Metrics collection stopped")
|
|
|
|
async def _collection_loop(self):
|
|
"""Main metrics collection loop."""
|
|
while self._running:
|
|
try:
|
|
await self._collect_system_metrics()
|
|
await asyncio.sleep(1.0) # Collect every second
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in metrics collection: {e}")
|
|
await asyncio.sleep(5.0) # Back off on error
|
|
|
|
async def _collect_system_metrics(self):
|
|
"""Collect system-level metrics."""
|
|
try:
|
|
# CPU metrics
|
|
cpu_percent = self.process.cpu_percent()
|
|
self.record_gauge('cpu_usage_percent', cpu_percent)
|
|
|
|
# Memory metrics
|
|
memory_info = self.process.memory_info()
|
|
memory_percent = self.process.memory_percent()
|
|
self.record_gauge('memory_usage_mb', memory_info.rss / 1024 / 1024)
|
|
self.record_gauge('memory_usage_percent', memory_percent)
|
|
|
|
# System uptime
|
|
uptime_seconds = time.time() - self.system_start_time
|
|
self.record_gauge('uptime_seconds', uptime_seconds)
|
|
|
|
# Check baselines and trigger alerts
|
|
await self._check_baselines()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error collecting system metrics: {e}")
|
|
|
|
def record_counter(self, name: str, value: float = 1.0, labels: Dict[str, str] = None):
|
|
"""Record a counter metric (always increasing)."""
|
|
with self._lock:
|
|
self.counters[name] += value
|
|
self._record_metric(name, self.counters[name], labels or {})
|
|
|
|
def record_gauge(self, name: str, value: float, labels: Dict[str, str] = None):
|
|
"""Record a gauge metric (current value)."""
|
|
with self._lock:
|
|
self.gauges[name] = value
|
|
self._record_metric(name, value, labels or {})
|
|
|
|
def record_histogram(self, name: str, value: float, labels: Dict[str, str] = None):
|
|
"""Record a histogram metric (distribution of values)."""
|
|
with self._lock:
|
|
self.histograms[name].append(value)
|
|
# Keep only last 1000 values to prevent memory growth
|
|
if len(self.histograms[name]) > 1000:
|
|
self.histograms[name] = self.histograms[name][-1000:]
|
|
self._record_metric(name, value, labels or {})
|
|
|
|
def _record_metric(self, name: str, value: float, labels: Dict[str, str]):
|
|
"""Internal method to record metric point."""
|
|
point = MetricPoint(datetime.now(), value, labels)
|
|
self.metrics[name].append(point)
|
|
|
|
async def _check_baselines(self):
|
|
"""Check current metrics against baselines and trigger alerts."""
|
|
for metric_name, baseline in self.baselines.items():
|
|
if not baseline.enabled:
|
|
continue
|
|
|
|
current_value = self.gauges.get(metric_name)
|
|
if current_value is None:
|
|
continue
|
|
|
|
# Check if metric exceeds alert threshold
|
|
if current_value > baseline.alert_threshold:
|
|
await self._trigger_alert(metric_name, current_value, baseline)
|
|
|
|
async def _trigger_alert(self, metric_name: str, current_value: float, baseline: PerformanceBaseline):
|
|
"""Trigger alert for metric threshold violation."""
|
|
alert_data = {
|
|
'metric': metric_name,
|
|
'current_value': current_value,
|
|
'threshold': baseline.alert_threshold,
|
|
'timestamp': datetime.now(),
|
|
'severity': 'high' if current_value > baseline.alert_threshold * 1.2 else 'medium'
|
|
}
|
|
|
|
logger.warning(f"Performance alert: {metric_name} = {current_value:.2f} (threshold: {baseline.alert_threshold})")
|
|
|
|
# Call registered alert callbacks
|
|
for callback in self.alert_callbacks:
|
|
try:
|
|
await callback(alert_data)
|
|
except Exception as e:
|
|
logger.error(f"Error in alert callback: {e}")
|
|
|
|
def add_alert_callback(self, callback: Callable):
|
|
"""Add callback function for alerts."""
|
|
self.alert_callbacks.append(callback)
|
|
|
|
def get_current_metrics(self) -> Dict[str, Any]:
|
|
"""Get current metric values."""
|
|
with self._lock:
|
|
return {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'counters': dict(self.counters),
|
|
'gauges': dict(self.gauges),
|
|
'histograms': {name: {
|
|
'count': len(values),
|
|
'min': min(values) if values else 0,
|
|
'max': max(values) if values else 0,
|
|
'avg': sum(values) / len(values) if values else 0
|
|
} for name, values in self.histograms.items()}
|
|
}
|
|
|
|
def get_metric_history(self, metric_name: str, minutes: int = 5) -> List[Dict[str, Any]]:
|
|
"""Get historical data for a specific metric."""
|
|
cutoff_time = datetime.now() - timedelta(minutes=minutes)
|
|
|
|
with self._lock:
|
|
points = self.metrics.get(metric_name, [])
|
|
recent_points = [
|
|
{
|
|
'timestamp': point.timestamp.isoformat(),
|
|
'value': point.value,
|
|
'labels': point.labels
|
|
}
|
|
for point in points
|
|
if point.timestamp > cutoff_time
|
|
]
|
|
return recent_points
|
|
|
|
def get_performance_summary(self) -> Dict[str, Any]:
|
|
"""Get comprehensive performance summary."""
|
|
current_metrics = self.get_current_metrics()
|
|
|
|
# Calculate rates
|
|
message_rate = self._calculate_rate('websocket_messages_total', window_minutes=1)
|
|
error_rate = self._calculate_rate('errors_total', window_minutes=1)
|
|
|
|
return {
|
|
'current_metrics': current_metrics,
|
|
'rates': {
|
|
'messages_per_minute': message_rate,
|
|
'errors_per_minute': error_rate
|
|
},
|
|
'health_status': self._get_health_status(),
|
|
'baselines': {name: {
|
|
'expected': baseline.expected_value,
|
|
'threshold': baseline.alert_threshold,
|
|
'current': self.gauges.get(name, 0)
|
|
} for name, baseline in self.baselines.items()},
|
|
'uptime_seconds': self.gauges.get('uptime_seconds', 0)
|
|
}
|
|
|
|
def _calculate_rate(self, metric_name: str, window_minutes: int = 1) -> float:
|
|
"""Calculate rate of change for a counter metric."""
|
|
history = self.get_metric_history(metric_name, window_minutes)
|
|
if len(history) < 2:
|
|
return 0.0
|
|
|
|
latest = history[-1]['value']
|
|
earliest = history[0]['value']
|
|
time_diff = len(history) / 60.0 # Convert to minutes
|
|
|
|
if time_diff > 0:
|
|
return (latest - earliest) / time_diff
|
|
return 0.0
|
|
|
|
def _get_health_status(self) -> str:
|
|
"""Determine overall health status based on current metrics."""
|
|
critical_metrics = ['cpu_usage_percent', 'memory_usage_percent', 'error_rate_per_minute']
|
|
|
|
for metric_name in critical_metrics:
|
|
baseline = self.baselines.get(metric_name)
|
|
current_value = self.gauges.get(metric_name, 0)
|
|
|
|
if baseline and current_value > baseline.alert_threshold:
|
|
return 'unhealthy'
|
|
|
|
return 'healthy'
|
|
|
|
|
|
class PerformanceTimer:
|
|
"""Context manager for timing operations."""
|
|
|
|
def __init__(self, metrics_collector: MetricsCollector, operation_name: str):
|
|
self.metrics_collector = metrics_collector
|
|
self.operation_name = operation_name
|
|
self.start_time = None
|
|
|
|
def __enter__(self):
|
|
self.start_time = time.time()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
if self.start_time:
|
|
duration = time.time() - self.start_time
|
|
self.metrics_collector.record_histogram(
|
|
f'{self.operation_name}_duration_seconds',
|
|
duration
|
|
)
|
|
|
|
|
|
@asynccontextmanager
|
|
async def async_performance_timer(metrics_collector: MetricsCollector, operation_name: str):
|
|
"""Async context manager for timing operations."""
|
|
start_time = time.time()
|
|
try:
|
|
yield
|
|
finally:
|
|
duration = time.time() - start_time
|
|
metrics_collector.record_histogram(
|
|
f'{operation_name}_duration_seconds',
|
|
duration
|
|
)
|
|
|
|
|
|
# Global metrics collector instance
|
|
metrics_collector = MetricsCollector()
|
|
|
|
|
|
# Decorator for automatic performance monitoring
|
|
def monitor_performance(operation_name: str):
|
|
"""Decorator to automatically monitor function performance."""
|
|
def decorator(func):
|
|
if asyncio.iscoroutinefunction(func):
|
|
async def async_wrapper(*args, **kwargs):
|
|
async with async_performance_timer(metrics_collector, operation_name):
|
|
return await func(*args, **kwargs)
|
|
return async_wrapper
|
|
else:
|
|
def sync_wrapper(*args, **kwargs):
|
|
with PerformanceTimer(metrics_collector, operation_name):
|
|
return func(*args, **kwargs)
|
|
return sync_wrapper
|
|
return decorator
|