202 lines
7.9 KiB
Python

"""
Admin API endpoints for the AI Voice Bot server.
This module contains admin-only endpoints for managing users, sessions, and system health.
Extracted from main.py to improve maintainability and separation of concerns.
"""
import sys
import os
from typing import TYPE_CHECKING
# Add the parent directory of server to the path to access shared
current_dir = os.path.dirname(os.path.abspath(__file__))
server_dir = os.path.dirname(current_dir)
project_root = os.path.dirname(server_dir)
sys.path.insert(0, project_root)
from fastapi import APIRouter, Request, Response, Body
from shared.models import (
AdminNamesResponse,
AdminActionResponse,
AdminSetPassword,
AdminClearPassword,
AdminValidationResponse,
AdminMetricsResponse,
AdminMetricsConfig,
)
from logger import logger
if TYPE_CHECKING:
from core.session_manager import SessionManager, SessionConfig
from core.lobby_manager import LobbyManager
from core.auth_manager import AuthManager
else:
# Import for runtime
from core.session_manager import SessionConfig
class AdminAPI:
"""Admin API endpoint handlers"""
def __init__(
self,
session_manager: "SessionManager",
lobby_manager: "LobbyManager",
auth_manager: "AuthManager",
admin_token: str = None,
public_url: str = "/"
):
self.session_manager = session_manager
self.lobby_manager = lobby_manager
self.auth_manager = auth_manager
self.admin_token = admin_token
self.router = APIRouter(prefix=f"{public_url}api/admin")
self._register_routes()
def _require_admin(self, request: Request) -> bool:
"""Check if request has valid admin token"""
if not self.admin_token:
return True
token = request.headers.get("X-Admin-Token")
return token == self.admin_token
def _register_routes(self):
"""Register all admin routes"""
@self.router.get("/names", response_model=AdminNamesResponse)
def list_names(request: Request):
if not self._require_admin(request):
return Response(status_code=403)
name_passwords_models = self.auth_manager.get_all_protected_names()
return AdminNamesResponse(name_passwords=name_passwords_models)
@self.router.post("/set_password", response_model=AdminActionResponse)
def set_password(request: Request, payload: AdminSetPassword = Body(...)):
if not self._require_admin(request):
return Response(status_code=403)
self.auth_manager.set_password(payload.name, payload.password)
self.session_manager.save() # Save changes
return AdminActionResponse(status="ok", name=payload.name)
@self.router.post("/clear_password", response_model=AdminActionResponse)
def clear_password(request: Request, payload: AdminClearPassword = Body(...)):
if not self._require_admin(request):
return Response(status_code=403)
if self.auth_manager.clear_password(payload.name):
self.session_manager.save() # Save changes
return AdminActionResponse(status="ok", name=payload.name)
return AdminActionResponse(status="not_found", name=payload.name)
@self.router.post("/cleanup_sessions", response_model=AdminActionResponse)
def cleanup_sessions(request: Request):
if not self._require_admin(request):
return Response(status_code=403)
try:
removed_count = self.session_manager.cleanup_old_sessions()
return AdminActionResponse(
status="ok",
name=f"Removed {removed_count} sessions"
)
except Exception as e:
logger.error(f"Error during manual session cleanup: {e}")
return AdminActionResponse(status="error", name=f"Error: {str(e)}")
@self.router.get("/session_metrics", response_model=AdminMetricsResponse)
def session_metrics(request: Request):
if not self._require_admin(request):
return Response(status_code=403)
try:
return self._get_cleanup_metrics()
except Exception as e:
logger.error(f"Error getting session metrics: {e}")
return Response(status_code=500)
@self.router.get("/validate_sessions", response_model=AdminValidationResponse)
def validate_sessions(request: Request):
if not self._require_admin(request):
return Response(status_code=403)
try:
session_issues = self.session_manager.validate_session_integrity()
auth_issues = self.auth_manager.validate_integrity()
all_issues = session_issues + auth_issues
return AdminValidationResponse(
status="ok",
issues=all_issues,
issue_count=len(all_issues)
)
except Exception as e:
logger.error(f"Error validating sessions: {e}")
return AdminValidationResponse(status="error", error=str(e))
@self.router.post("/cleanup_lobbies", response_model=AdminActionResponse)
def cleanup_lobbies(request: Request):
if not self._require_admin(request):
return Response(status_code=403)
try:
removed_count = self.lobby_manager.cleanup_empty_lobbies()
return AdminActionResponse(
status="ok",
name=f"Removed {removed_count} empty lobbies"
)
except Exception as e:
logger.error(f"Error during lobby cleanup: {e}")
return AdminActionResponse(status="error", name=f"Error: {str(e)}")
def _get_cleanup_metrics(self) -> AdminMetricsResponse:
"""Get session cleanup metrics"""
# Get current counts
all_sessions = self.session_manager.get_all_sessions()
total_sessions = len(all_sessions)
active_sessions = sum(1 for s in all_sessions if s.ws is not None)
named_sessions = sum(1 for s in all_sessions if s.name)
displaced_sessions = sum(1 for s in all_sessions if s.displaced_at is not None)
# Count sessions that would be cleaned up
import time
current_time = time.time()
cleanup_candidates = 0
old_anonymous = 0
old_displaced = 0
for session in all_sessions:
# Anonymous sessions
if (not session.ws and not session.name and
current_time - session.created_at > SessionConfig.ANONYMOUS_SESSION_TIMEOUT):
cleanup_candidates += 1
old_anonymous += 1
# Displaced sessions
if (not session.ws and session.displaced_at is not None and
current_time - session.last_used > SessionConfig.DISPLACED_SESSION_TIMEOUT):
cleanup_candidates += 1
old_displaced += 1
config = AdminMetricsConfig(
anonymous_timeout=SessionConfig.ANONYMOUS_SESSION_TIMEOUT,
displaced_timeout=SessionConfig.DISPLACED_SESSION_TIMEOUT,
cleanup_interval=SessionConfig.CLEANUP_INTERVAL,
max_cleanup_per_cycle=SessionConfig.MAX_SESSIONS_PER_CLEANUP,
)
return AdminMetricsResponse(
total_sessions=total_sessions,
active_sessions=active_sessions,
named_sessions=named_sessions,
displaced_sessions=displaced_sessions,
old_anonymous_sessions=old_anonymous,
old_displaced_sessions=old_displaced,
total_lobbies=self.lobby_manager.get_lobby_count(),
cleanup_candidates=cleanup_candidates,
config=config,
)