""" Admin API endpoints for the AI Voice Bot server. This module contains admin-only endpoints for managing users, sessions, and system health. Extracted from main.py to improve maintainability and separation of concerns. """ from typing import TYPE_CHECKING from fastapi import APIRouter, Request, Response, Body # Import shared models import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))) from shared.models import ( AdminNamesResponse, AdminActionResponse, AdminSetPassword, AdminClearPassword, AdminValidationResponse, AdminMetricsResponse, AdminMetricsConfig, ) from logger import logger if TYPE_CHECKING: from ..core.session_manager import SessionManager from ..core.lobby_manager import LobbyManager from ..core.auth_manager import AuthManager class AdminAPI: """Admin API endpoint handlers""" def __init__( self, session_manager: "SessionManager", lobby_manager: "LobbyManager", auth_manager: "AuthManager", admin_token: str = None, public_url: str = "/" ): self.session_manager = session_manager self.lobby_manager = lobby_manager self.auth_manager = auth_manager self.admin_token = admin_token self.router = APIRouter(prefix=f"{public_url}api/admin") self._register_routes() def _require_admin(self, request: Request) -> bool: """Check if request has valid admin token""" if not self.admin_token: return True token = request.headers.get("X-Admin-Token") return token == self.admin_token def _register_routes(self): """Register all admin routes""" @self.router.get("/names", response_model=AdminNamesResponse) def list_names(request: Request): if not self._require_admin(request): return Response(status_code=403) name_passwords_models = self.auth_manager.get_all_protected_names() return AdminNamesResponse(name_passwords=name_passwords_models) @self.router.post("/set_password", response_model=AdminActionResponse) def set_password(request: Request, payload: AdminSetPassword = Body(...)): if not self._require_admin(request): return Response(status_code=403) self.auth_manager.set_password(payload.name, payload.password) self.session_manager.save() # Save changes return AdminActionResponse(status="ok", name=payload.name) @self.router.post("/clear_password", response_model=AdminActionResponse) def clear_password(request: Request, payload: AdminClearPassword = Body(...)): if not self._require_admin(request): return Response(status_code=403) if self.auth_manager.clear_password(payload.name): self.session_manager.save() # Save changes return AdminActionResponse(status="ok", name=payload.name) return AdminActionResponse(status="not_found", name=payload.name) @self.router.post("/cleanup_sessions", response_model=AdminActionResponse) def cleanup_sessions(request: Request): if not self._require_admin(request): return Response(status_code=403) try: removed_count = self.session_manager.cleanup_old_sessions() return AdminActionResponse( status="ok", name=f"Removed {removed_count} sessions" ) except Exception as e: logger.error(f"Error during manual session cleanup: {e}") return AdminActionResponse(status="error", name=f"Error: {str(e)}") @self.router.get("/session_metrics", response_model=AdminMetricsResponse) def session_metrics(request: Request): if not self._require_admin(request): return Response(status_code=403) try: return self._get_cleanup_metrics() except Exception as e: logger.error(f"Error getting session metrics: {e}") return Response(status_code=500) @self.router.get("/validate_sessions", response_model=AdminValidationResponse) def validate_sessions(request: Request): if not self._require_admin(request): return Response(status_code=403) try: session_issues = self.session_manager.validate_session_integrity() auth_issues = self.auth_manager.validate_integrity() all_issues = session_issues + auth_issues return AdminValidationResponse( status="ok", issues=all_issues, issue_count=len(all_issues) ) except Exception as e: logger.error(f"Error validating sessions: {e}") return AdminValidationResponse(status="error", error=str(e)) @self.router.post("/cleanup_lobbies", response_model=AdminActionResponse) def cleanup_lobbies(request: Request): if not self._require_admin(request): return Response(status_code=403) try: removed_count = self.lobby_manager.cleanup_empty_lobbies() return AdminActionResponse( status="ok", name=f"Removed {removed_count} empty lobbies" ) except Exception as e: logger.error(f"Error during lobby cleanup: {e}") return AdminActionResponse(status="error", name=f"Error: {str(e)}") def _get_cleanup_metrics(self) -> AdminMetricsResponse: """Get session cleanup metrics""" # Get current counts all_sessions = self.session_manager.get_all_sessions() total_sessions = len(all_sessions) active_sessions = sum(1 for s in all_sessions if s.ws is not None) named_sessions = sum(1 for s in all_sessions if s.name) displaced_sessions = sum(1 for s in all_sessions if s.displaced_at is not None) # Count sessions that would be cleaned up import time current_time = time.time() cleanup_candidates = 0 old_anonymous = 0 old_displaced = 0 for session in all_sessions: from ..core.session_manager import SessionConfig # Anonymous sessions if (not session.ws and not session.name and current_time - session.created_at > SessionConfig.ANONYMOUS_SESSION_TIMEOUT): cleanup_candidates += 1 old_anonymous += 1 # Displaced sessions if (not session.ws and session.displaced_at is not None and current_time - session.last_used > SessionConfig.DISPLACED_SESSION_TIMEOUT): cleanup_candidates += 1 old_displaced += 1 config = AdminMetricsConfig( anonymous_timeout=SessionConfig.ANONYMOUS_SESSION_TIMEOUT, displaced_timeout=SessionConfig.DISPLACED_SESSION_TIMEOUT, cleanup_interval=SessionConfig.CLEANUP_INTERVAL, max_cleanup_per_cycle=SessionConfig.MAX_SESSIONS_PER_CLEANUP, ) return AdminMetricsResponse( total_sessions=total_sessions, active_sessions=active_sessions, named_sessions=named_sessions, displaced_sessions=displaced_sessions, old_anonymous_sessions=old_anonymous, old_displaced_sessions=old_displaced, total_lobbies=self.lobby_manager.get_lobby_count(), cleanup_candidates=cleanup_candidates, config=config, )