diff --git a/client/src/api-evolution-checker.ts b/client/src/api-evolution-checker.ts index be9ec29..fb387bc 100644 --- a/client/src/api-evolution-checker.ts +++ b/client/src/api-evolution-checker.ts @@ -73,7 +73,7 @@ export class AdvancedApiEvolutionChecker { async loadSchemaFromJson(): Promise { try { // In a real implementation, you might fetch this from a URL or import it - const response = await fetch('/openapi-schema.json'); + const response = await fetch("/ai-voicebot/openapi-schema.json"); if (response.ok) { return await response.json(); } diff --git a/server/main.py b/server/main.py index fee58f5..caed2ae 100644 --- a/server/main.py +++ b/server/main.py @@ -18,6 +18,8 @@ import hashlib import binascii import sys import asyncio +import threading +import time from contextlib import asynccontextmanager from fastapi.staticfiles import StaticFiles @@ -43,6 +45,9 @@ from shared.models import ( AdminActionResponse, AdminSetPassword, AdminClearPassword, + AdminValidationResponse, + AdminMetricsResponse, + AdminMetricsConfig, JoinStatusModel, ChatMessageModel, ChatMessagesResponse, @@ -60,6 +65,28 @@ from shared.models import ( ) +class SessionConfig: + """Configuration class for session management""" + + ANONYMOUS_SESSION_TIMEOUT = int( + os.getenv("ANONYMOUS_SESSION_TIMEOUT", "60") + ) # 1 minute + DISPLACED_SESSION_TIMEOUT = int( + os.getenv("DISPLACED_SESSION_TIMEOUT", "10800") + ) # 3 hours + CLEANUP_INTERVAL = int(os.getenv("CLEANUP_INTERVAL", "300")) # 5 minutes + MAX_SESSIONS_PER_CLEANUP = int( + os.getenv("MAX_SESSIONS_PER_CLEANUP", "100") + ) # Circuit breaker + MAX_CHAT_MESSAGES_PER_LOBBY = int(os.getenv("MAX_CHAT_MESSAGES_PER_LOBBY", "100")) + SESSION_VALIDATION_INTERVAL = int( + os.getenv("SESSION_VALIDATION_INTERVAL", "1800") + ) # 30 minutes + + +# Thread lock for session operations +session_lock = threading.RLock() + # Mapping of reserved names to password records (lowercased name -> {salt:..., hash:...}) name_passwords: dict[str, dict[str, str]] = {} @@ -89,50 +116,104 @@ public_url = os.getenv("PUBLIC_URL", "/") if not public_url.endswith("/"): public_url += "/" -# Global variable to control the cleanup task +# Global variables to control background tasks cleanup_task_running = False cleanup_task = None +validation_task_running = False +validation_task = None async def periodic_cleanup(): """Background task to periodically clean up old sessions""" global cleanup_task_running + cleanup_errors = 0 + max_consecutive_errors = 5 + while cleanup_task_running: try: - Session.cleanup_old_sessions() - # Run cleanup every 5 minutes - await asyncio.sleep(300) + removed_count = Session.cleanup_old_sessions() + if removed_count > 0: + logger.info(f"Periodic cleanup removed {removed_count} old sessions") + cleanup_errors = 0 # Reset error counter on success + + # Run cleanup at configured interval + await asyncio.sleep(SessionConfig.CLEANUP_INTERVAL) except Exception as e: - logger.error(f"Error in session cleanup task: {e}") - await asyncio.sleep(60) # Wait 1 minute before retrying on error + cleanup_errors += 1 + logger.error( + f"Error in session cleanup task (attempt {cleanup_errors}): {e}" + ) + + if cleanup_errors >= max_consecutive_errors: + logger.error( + f"Too many consecutive cleanup errors ({cleanup_errors}), stopping cleanup task" + ) + break + + # Exponential backoff on errors + await asyncio.sleep(min(60 * cleanup_errors, 300)) + + +async def periodic_validation(): + """Background task to periodically validate session integrity""" + global validation_task_running + + while validation_task_running: + try: + issues = Session.validate_session_integrity() + if issues: + logger.warning(f"Session integrity issues found: {len(issues)} issues") + for issue in issues[:10]: # Log first 10 issues + logger.warning(f"Integrity issue: {issue}") + + await asyncio.sleep(SessionConfig.SESSION_VALIDATION_INTERVAL) + except Exception as e: + logger.error(f"Error in session validation task: {e}") + await asyncio.sleep(300) # Wait 5 minutes before retrying on error @asynccontextmanager async def lifespan(app: FastAPI): """Lifespan context manager for startup and shutdown events""" - global cleanup_task_running, cleanup_task + global cleanup_task_running, cleanup_task, validation_task_running, validation_task # Startup + logger.info("Starting background tasks...") cleanup_task_running = True + validation_task_running = True cleanup_task = asyncio.create_task(periodic_cleanup()) - logger.info("Session cleanup task started") + validation_task = asyncio.create_task(periodic_validation()) + logger.info("Session cleanup and validation tasks started") yield # Shutdown + logger.info("Shutting down background tasks...") cleanup_task_running = False - if cleanup_task: - cleanup_task.cancel() - try: - await cleanup_task - except asyncio.CancelledError: - pass - logger.info("Session cleanup task stopped") + validation_task_running = False + + # Cancel tasks + for task in [cleanup_task, validation_task]: + if task: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # Clean up all sessions gracefully + await Session.cleanup_all_sessions() + logger.info("All background tasks stopped and sessions cleaned up") app = FastAPI(lifespan=lifespan) logger.info(f"Starting server with public URL: {public_url}") +logger.info( + f"Session config - Anonymous timeout: {SessionConfig.ANONYMOUS_SESSION_TIMEOUT}s, " + f"Displaced timeout: {SessionConfig.DISPLACED_SESSION_TIMEOUT}s, " + f"Cleanup interval: {SessionConfig.CLEANUP_INTERVAL}s" +) # Optional admin token to protect admin endpoints ADMIN_TOKEN = os.getenv("ADMIN_TOKEN", None) @@ -149,7 +230,11 @@ def _require_admin(request: Request) -> bool: def admin_list_names(request: Request): if not _require_admin(request): return Response(status_code=403) - return {"name_passwords": name_passwords} + # Convert dict format to Pydantic models + name_passwords_models = { + name: NamePasswordRecord(**record) for name, record in name_passwords.items() + } + return AdminNamesResponse(name_passwords=name_passwords_models) @app.post(public_url + "api/admin/set_password", response_model=AdminActionResponse) @@ -160,7 +245,7 @@ def admin_set_password(request: Request, payload: AdminSetPassword = Body(...)): salt, hash_hex = _hash_password(payload.password) name_passwords[lname] = {"salt": salt, "hash": hash_hex} Session.save() - return {"status": "ok", "name": payload.name} + return AdminActionResponse(status="ok", name=payload.name) @app.post(public_url + "api/admin/clear_password", response_model=AdminActionResponse) @@ -171,8 +256,8 @@ def admin_clear_password(request: Request, payload: AdminClearPassword = Body(.. if lname in name_passwords: del name_passwords[lname] Session.save() - return {"status": "ok", "name": payload.name} - return {"status": "not_found", "name": payload.name} + return AdminActionResponse(status="ok", name=payload.name) + return AdminActionResponse(status="not_found", name=payload.name) @app.post(public_url + "api/admin/cleanup_sessions", response_model=AdminActionResponse) @@ -181,10 +266,39 @@ def admin_cleanup_sessions(request: Request): return Response(status_code=403) try: removed_count = Session.cleanup_old_sessions() - return {"status": "ok", "name": f"Removed {removed_count} sessions"} + return AdminActionResponse( + status="ok", name=f"Removed {removed_count} sessions" + ) except Exception as e: logger.error(f"Error during manual session cleanup: {e}") - return {"status": "not_found", "name": f"Error: {str(e)}"} + return AdminActionResponse(status="error", name=f"Error: {str(e)}") + + +@app.get(public_url + "api/admin/session_metrics", response_model=AdminMetricsResponse) +def admin_session_metrics(request: Request): + if not _require_admin(request): + return Response(status_code=403) + try: + return Session.get_cleanup_metrics() + except Exception as e: + logger.error(f"Error getting session metrics: {e}") + return Response(status_code=500) + + +@app.get( + public_url + "api/admin/validate_sessions", response_model=AdminValidationResponse +) +def admin_validate_sessions(request: Request): + if not _require_admin(request): + return Response(status_code=403) + try: + issues = Session.validate_session_integrity() + return AdminValidationResponse( + status="ok", issues=issues, issue_count=len(issues) + ) + except Exception as e: + logger.error(f"Error validating sessions: {e}") + return AdminValidationResponse(status="error", error=str(e)) lobbies: dict[str, Lobby] = {} @@ -198,43 +312,33 @@ class Lobby: self.sessions: dict[str, Session] = {} # All lobby members self.private = private self.chat_messages: list[ChatMessageModel] = [] # Store chat messages + self.lock = threading.RLock() # Thread safety for lobby operations def getName(self) -> str: return f"{self.short}:{self.name}" async def update_state(self, requesting_session: Session | None = None): - users: list[ParticipantModel] = [ - ParticipantModel( - name=s.name, - live=True if s.ws else False, - session_id=s.id, - protected=True - if s.name and s.name.lower() in name_passwords - else False, - ) - for s in self.sessions.values() - if s.name - ] + with self.lock: + users: list[ParticipantModel] = [ + ParticipantModel( + name=s.name, + live=True if s.ws else False, + session_id=s.id, + protected=True + if s.name and s.name.lower() in name_passwords + else False, + ) + for s in self.sessions.values() + if s.name + ] + if requesting_session: logger.info( f"{requesting_session.getName()} -> lobby_state({self.getName()})" ) if requesting_session.ws: - await requesting_session.ws.send_json( - { - "type": "lobby_state", - "data": {"participants": [user.model_dump() for user in users]}, - } - ) - else: - logger.warning( - f"{requesting_session.getName()} - No WebSocket connection." - ) - else: - for s in self.sessions.values(): - logger.info(f"{s.getName()} -> lobby_state({self.getName()})") - if s.ws: - await s.ws.send_json( + try: + await requesting_session.ws.send_json( { "type": "lobby_state", "data": { @@ -242,48 +346,90 @@ class Lobby: }, } ) + except Exception as e: + logger.warning( + f"Failed to send lobby state to {requesting_session.getName()}: {e}" + ) + else: + logger.warning( + f"{requesting_session.getName()} - No WebSocket connection." + ) + else: + # Send to all sessions in lobby + failed_sessions: list[Session] = [] + for s in self.sessions.values(): + logger.info(f"{s.getName()} -> lobby_state({self.getName()})") + if s.ws: + try: + await s.ws.send_json( + { + "type": "lobby_state", + "data": { + "participants": [ + user.model_dump() for user in users + ] + }, + } + ) + except Exception as e: + logger.warning( + f"Failed to send lobby state to {s.getName()}: {e}" + ) + failed_sessions.append(s) + + # Clean up failed sessions + for failed_session in failed_sessions: + failed_session.ws = None def getSession(self, id: str) -> Session | None: - return self.sessions.get(id, None) + with self.lock: + return self.sessions.get(id, None) async def addSession(self, session: Session) -> None: - if session.id in self.sessions: - logger.warning(f"{session.getName()} - Already in lobby {self.getName()}.") - return None - self.sessions[session.id] = session + with self.lock: + if session.id in self.sessions: + logger.warning( + f"{session.getName()} - Already in lobby {self.getName()}." + ) + return None + self.sessions[session.id] = session await self.update_state() async def removeSession(self, session: Session) -> None: - if session.id not in self.sessions: - logger.warning(f"{session.getName()} - Not in lobby {self.getName()}.") - return None - del self.sessions[session.id] + with self.lock: + if session.id not in self.sessions: + logger.warning(f"{session.getName()} - Not in lobby {self.getName()}.") + return None + del self.sessions[session.id] await self.update_state() def add_chat_message(self, session: Session, message: str) -> ChatMessageModel: """Add a chat message to the lobby and return the message data""" - import time - - chat_message = ChatMessageModel( - id=secrets.token_hex(8), - message=message, - sender_name=session.name or session.short, - sender_session_id=session.id, - timestamp=time.time(), - lobby_id=self.id, - ) - self.chat_messages.append(chat_message) - # Keep only the latest 100 messages per lobby - if len(self.chat_messages) > 100: - self.chat_messages = self.chat_messages[-100:] + with self.lock: + chat_message = ChatMessageModel( + id=secrets.token_hex(8), + message=message, + sender_name=session.name or session.short, + sender_session_id=session.id, + timestamp=time.time(), + lobby_id=self.id, + ) + self.chat_messages.append(chat_message) + # Keep only the latest messages per lobby + if len(self.chat_messages) > SessionConfig.MAX_CHAT_MESSAGES_PER_LOBBY: + self.chat_messages = self.chat_messages[ + -SessionConfig.MAX_CHAT_MESSAGES_PER_LOBBY : + ] return chat_message def get_chat_messages(self, limit: int = 50) -> list[ChatMessageModel]: """Get the most recent chat messages from the lobby""" - return self.chat_messages[-limit:] if self.chat_messages else [] + with self.lock: + return self.chat_messages[-limit:] if self.chat_messages else [] async def broadcast_chat_message(self, chat_message: ChatMessageModel) -> None: """Broadcast a chat message to all connected sessions in the lobby""" + failed_sessions: list[Session] = [] for session in self.sessions.values(): if session.ws: try: @@ -294,17 +440,23 @@ class Lobby: logger.warning( f"Failed to send chat message to {session.getName()}: {e}" ) + failed_sessions.append(session) + + # Clean up failed sessions + for failed_session in failed_sessions: + failed_session.ws = None class Session: _instances: list[Session] = [] _save_file = "sessions.json" _loaded = False + lock = threading.RLock() # Thread safety for class-level operations def __init__(self, id: str): - import time logger.info(f"Instantiating new session {id}") - self._instances.append(self) + with Session.lock: + self._instances.append(self) self.id = id self.short = id[:8] self.name = "" @@ -316,50 +468,76 @@ class Session: self.created_at = time.time() self.last_used = time.time() self.displaced_at: float | None = None # When name was taken over + self.session_lock = threading.RLock() # Instance-level lock self.save() @classmethod def save(cls): - sessions_list: list[SessionSaved] = [] - for s in cls._instances: - lobbies_list: list[LobbySaved] = [ - LobbySaved(id=lobby.id, name=lobby.name, private=lobby.private) - for lobby in s.lobbies - ] - sessions_list.append( - SessionSaved( - id=s.id, - name=s.name or "", - lobbies=lobbies_list, - created_at=s.created_at, - last_used=s.last_used, - displaced_at=s.displaced_at, + try: + with cls.lock: + sessions_list: list[SessionSaved] = [] + for s in cls._instances: + with s.session_lock: + lobbies_list: list[LobbySaved] = [ + LobbySaved( + id=lobby.id, name=lobby.name, private=lobby.private + ) + for lobby in s.lobbies + ] + sessions_list.append( + SessionSaved( + id=s.id, + name=s.name or "", + lobbies=lobbies_list, + created_at=s.created_at, + last_used=s.last_used, + displaced_at=s.displaced_at, + ) + ) + + # Prepare name password store for persistence (salt+hash). Only structured records are supported. + saved_pw: dict[str, NamePasswordRecord] = { + name: NamePasswordRecord(**record) + for name, record in name_passwords.items() + } + + payload_model = SessionsPayload( + sessions=sessions_list, name_passwords=saved_pw ) - ) - # Prepare name password store for persistence (salt+hash). Only structured records are supported. - saved_pw: dict[str, NamePasswordRecord] = { - name: NamePasswordRecord(**record) - for name, record in name_passwords.items() - } + payload = payload_model.model_dump() - payload_model = SessionsPayload(sessions=sessions_list, name_passwords=saved_pw) - payload = payload_model.model_dump() + # Atomic write using temp file + temp_file = cls._save_file + ".tmp" + with open(temp_file, "w") as f: + json.dump(payload, f, indent=2) - with open(cls._save_file, "w") as f: - json.dump(payload, f, indent=2) + # Atomic rename + os.rename(temp_file, cls._save_file) - logger.info( - f"Saved {len(sessions_list)} sessions and {len(saved_pw)} name passwords to {cls._save_file}" - ) + logger.info( + f"Saved {len(sessions_list)} sessions and {len(saved_pw)} name passwords to {cls._save_file}" + ) + except Exception as e: + logger.error(f"Failed to save sessions: {e}") + # Clean up temp file if it exists + try: + if os.path.exists(cls._save_file + ".tmp"): + os.remove(cls._save_file + ".tmp") + except: + pass @classmethod def load(cls): - import time if not os.path.exists(cls._save_file): logger.info(f"No session save file found: {cls._save_file}") return - with open(cls._save_file, "r") as f: - raw = json.load(f) + + try: + with open(cls._save_file, "r") as f: + raw = json.load(f) + except Exception as e: + logger.error(f"Failed to read session save file: {e}") + return try: payload = SessionsPayload.model_validate(raw) @@ -374,61 +552,49 @@ class Session: name_passwords[name] = {"salt": rec.salt, "hash": rec.hash} current_time = time.time() - one_minute = 60.0 - three_hours = 3 * 60 * 60.0 sessions_loaded = 0 sessions_expired = 0 - for s_saved in payload.sessions: - # Check if this session should be expired during loading - created_at = getattr(s_saved, "created_at", time.time()) - last_used = getattr(s_saved, "last_used", time.time()) - displaced_at = getattr(s_saved, "displaced_at", None) - name = s_saved.name or "" + with cls.lock: + for s_saved in payload.sessions: + # Check if this session should be expired during loading + created_at = getattr(s_saved, "created_at", time.time()) + last_used = getattr(s_saved, "last_used", time.time()) + displaced_at = getattr(s_saved, "displaced_at", None) + name = s_saved.name or "" - # Apply same removal criteria as cleanup_old_sessions - should_expire = False - - # Rule 1: Sessions with no name that are older than 1 minute (no connection assumed for disk sessions) - if not name and current_time - created_at > one_minute: - should_expire = True - logger.info( - f"Expiring session {s_saved.id[:8]} during load - no name, older than 1 minute" + # Apply same removal criteria as cleanup_old_sessions + should_expire = cls._should_remove_session_static( + name, None, created_at, last_used, displaced_at, current_time ) - # Rule 2: Displaced sessions unused for 3+ hours (no connection assumed for disk sessions) - elif displaced_at is not None and current_time - last_used > three_hours: - should_expire = True - logger.info( - f"Expiring session {s_saved.id[:8]}:{name} during load - displaced and unused for 3+ hours" - ) + if should_expire: + sessions_expired += 1 + logger.info(f"Expiring session {s_saved.id[:8]}:{name} during load") + continue # Skip loading this expired session - if should_expire: - sessions_expired += 1 - continue # Skip loading this expired session - - session = Session(s_saved.id) - session.name = name - # Load timestamps, with defaults for backward compatibility - session.created_at = created_at - session.last_used = last_used - session.displaced_at = displaced_at - for lobby_saved in s_saved.lobbies: - session.lobbies.append( - Lobby( - name=lobby_saved.name, - id=lobby_saved.id, - private=lobby_saved.private, + session = Session(s_saved.id) + session.name = name + # Load timestamps, with defaults for backward compatibility + session.created_at = created_at + session.last_used = last_used + session.displaced_at = displaced_at + for lobby_saved in s_saved.lobbies: + session.lobbies.append( + Lobby( + name=lobby_saved.name, + id=lobby_saved.id, + private=lobby_saved.private, + ) ) + logger.info( + f"Loaded session {session.getName()} with {len(session.lobbies)} lobbies" ) - logger.info( - f"Loaded session {session.getName()} with {len(session.lobbies)} lobbies" - ) - for lobby in session.lobbies: - lobbies[lobby.id] = Lobby( - name=lobby.name, id=lobby.id - ) # Ensure lobby exists - sessions_loaded += 1 + for lobby in session.lobbies: + lobbies[lobby.id] = Lobby( + name=lobby.name, id=lobby.id, private=lobby.private + ) # Ensure lobby exists + sessions_loaded += 1 logger.info( f"Loaded {sessions_loaded} sessions and {len(name_passwords)} name passwords from {cls._save_file}" @@ -444,18 +610,22 @@ class Session: cls.load() logger.info(f"Loaded {len(cls._instances)} sessions from disk...") cls._loaded = True - for s in cls._instances: - if s.id == id: - return s + + with cls.lock: + for s in cls._instances: + if s.id == id: + return s return None @classmethod def isUniqueName(cls, name: str) -> bool: if not name: return False - for s in cls._instances: - if s.name.lower() == name.lower(): - return False + with cls.lock: + for s in cls._instances: + with s.session_lock: + if s.name.lower() == name.lower(): + return False return True @classmethod @@ -463,110 +633,348 @@ class Session: if not name: return None lname = name.lower() - for s in cls._instances: - if s.name and s.name.lower() == lname: - return s + with cls.lock: + for s in cls._instances: + with s.session_lock: + if s.name and s.name.lower() == lname: + return s return None def getName(self) -> str: - return f"{self.short}:{self.name if self.name else unset_label}" + with self.session_lock: + return f"{self.short}:{self.name if self.name else unset_label}" def setName(self, name: str): - self.name = name - self.update_last_used() + with self.session_lock: + self.name = name + self.update_last_used() self.save() def update_last_used(self): """Update the last_used timestamp""" - import time - - self.last_used = time.time() + with self.session_lock: + self.last_used = time.time() def mark_displaced(self): """Mark this session as having its name taken over""" - import time + with self.session_lock: + self.displaced_at = time.time() - self.displaced_at = time.time() + @staticmethod + def _should_remove_session_static( + name: str, + ws: WebSocket | None, + created_at: float, + last_used: float, + displaced_at: float | None, + current_time: float, + ) -> bool: + """Static method to determine if a session should be removed""" + # Rule 1: Delete sessions with no active connection and no name that are older than threshold + if ( + not ws + and not name + and current_time - created_at > SessionConfig.ANONYMOUS_SESSION_TIMEOUT + ): + return True + + # Rule 2: Delete inactive sessions that had their nick taken over and haven't been used recently + if ( + not ws + and displaced_at is not None + and current_time - last_used > SessionConfig.DISPLACED_SESSION_TIMEOUT + ): + return True + + return False + + def _should_remove(self, current_time: float) -> bool: + """Check if this session should be removed""" + with self.session_lock: + return self._should_remove_session_static( + self.name, + self.ws, + self.created_at, + self.last_used, + self.displaced_at, + current_time, + ) + + @classmethod + def _remove_session_safely(cls, session: Session, empty_lobbies: set[str]) -> None: + """Safely remove a session and track affected lobbies""" + try: + with session.session_lock: + # Remove from lobbies first + for lobby in session.lobbies[ + : + ]: # Copy list to avoid modification during iteration + try: + with lobby.lock: + if session.id in lobby.sessions: + del lobby.sessions[session.id] + if len(lobby.sessions) == 0: + empty_lobbies.add(lobby.id) + + if lobby.id in session.lobby_peers: + del session.lobby_peers[lobby.id] + except Exception as e: + logger.warning( + f"Error removing session {session.getName()} from lobby {lobby.getName()}: {e}" + ) + + # Close WebSocket if open + if session.ws: + try: + asyncio.create_task(session.ws.close()) + except Exception as e: + logger.warning( + f"Error closing WebSocket for {session.getName()}: {e}" + ) + session.ws = None + + # Remove from instances list + with cls.lock: + if session in cls._instances: + cls._instances.remove(session) + + except Exception as e: + logger.error( + f"Error during safe session removal for {session.getName()}: {e}" + ) + + @classmethod + def _cleanup_empty_lobbies(cls, empty_lobbies: set[str]) -> int: + """Clean up empty lobbies from global lobbies dict""" + removed_count = 0 + for lobby_id in empty_lobbies: + if lobby_id in lobbies: + lobby_name = lobbies[lobby_id].getName() + del lobbies[lobby_id] + logger.info(f"Removed empty lobby {lobby_name}") + removed_count += 1 + return removed_count @classmethod def cleanup_old_sessions(cls) -> int: - """Clean up old sessions based on the specified criteria""" - import time - + """Clean up old sessions based on the specified criteria with improved safety""" current_time = time.time() - one_minute = 60.0 - three_hours = 3 * 60 * 60.0 sessions_removed = 0 - # Make a copy of the list to avoid modifying it while iterating - sessions_to_remove: list[Session] = [] + try: + # Circuit breaker - don't remove too many sessions at once + sessions_to_remove: list[Session] = [] + empty_lobbies: set[str] = set() - for session in cls._instances[:]: - # Rule 1: Delete sessions with no active connection and no name that are older than 1 minute - if ( - not session.ws - and not session.name - and current_time - session.created_at > one_minute - ): - logger.info( - f"Removing session {session.getName()} - no connection, no name, older than 1 minute" - ) - sessions_to_remove.append(session) - continue + with cls.lock: + # Identify sessions to remove (up to max limit) + for session in cls._instances[:]: + if ( + len(sessions_to_remove) + >= SessionConfig.MAX_SESSIONS_PER_CLEANUP + ): + logger.warning( + f"Hit session cleanup limit ({SessionConfig.MAX_SESSIONS_PER_CLEANUP}), " + f"stopping cleanup. Remaining sessions will be cleaned up in next cycle." + ) + break - # Rule 2: Delete inactive sessions that had their nick taken over and haven't been used in 3 hours - if ( - not session.ws - and session.displaced_at is not None - and current_time - session.last_used > three_hours - ): - logger.info( - f"Removing session {session.getName()} - displaced and unused for 3+ hours" - ) - sessions_to_remove.append(session) - continue + if session._should_remove(current_time): + sessions_to_remove.append(session) + logger.info( + f"Marking session {session.getName()} for removal - " + f"criteria: no_ws={session.ws is None}, no_name={not session.name}, " + f"age={current_time - session.created_at:.0f}s, " + f"displaced={session.displaced_at is not None}, " + f"unused={current_time - session.last_used:.0f}s" + ) - # Remove the sessions - for session in sessions_to_remove: - # Remove from lobbies first - for lobby in session.lobbies[ - : - ]: # Copy list to avoid modification during iteration - try: - # Use async cleanup if needed, but for cleanup we'll just remove from data structures - if session.id in lobby.sessions: - del lobby.sessions[session.id] - if lobby.id in session.lobby_peers: - del session.lobby_peers[lobby.id] - except Exception as e: - logger.warning( - f"Error removing session {session.getName()} from lobby {lobby.getName()}: {e}" - ) - - # Remove from instances list - if session in cls._instances: - cls._instances.remove(session) + # Remove the identified sessions + for session in sessions_to_remove: + cls._remove_session_safely(session, empty_lobbies) sessions_removed += 1 - # Clean up empty lobbies from global lobbies dict - empty_lobbies: list[str] = [] - for lobby_id, lobby in lobbies.items(): - if len(lobby.sessions) == 0: - empty_lobbies.append(lobby_id) + # Clean up empty lobbies + empty_lobbies_removed = cls._cleanup_empty_lobbies(empty_lobbies) - for lobby_id in empty_lobbies: - del lobbies[lobby_id] - logger.info(f"Removed empty lobby {lobby_id}") + # Save state if we made changes + if sessions_removed > 0: + cls.save() + logger.info( + f"Session cleanup completed: removed {sessions_removed} sessions, " + f"{empty_lobbies_removed} empty lobbies" + ) - if sessions_removed > 0: - cls.save() - logger.info(f"Session cleanup: removed {sessions_removed} old sessions") - - if empty_lobbies: - logger.info(f"Session cleanup: removed {len(empty_lobbies)} empty lobbies") + except Exception as e: + logger.error(f"Error during session cleanup: {e}") + # Don't re-raise - cleanup should be resilient return sessions_removed + @classmethod + def get_cleanup_metrics(cls) -> AdminMetricsResponse: + """Return cleanup metrics for monitoring""" + current_time = time.time() + + with cls.lock: + total_sessions = len(cls._instances) + active_sessions = 0 + named_sessions = 0 + displaced_sessions = 0 + old_anonymous = 0 + old_displaced = 0 + + for s in cls._instances: + with s.session_lock: + if s.ws: + active_sessions += 1 + if s.name: + named_sessions += 1 + if s.displaced_at is not None: + displaced_sessions += 1 + if ( + not s.ws + and current_time - s.last_used + > SessionConfig.DISPLACED_SESSION_TIMEOUT + ): + old_displaced += 1 + if ( + not s.ws + and not s.name + and current_time - s.created_at + > SessionConfig.ANONYMOUS_SESSION_TIMEOUT + ): + old_anonymous += 1 + + config = AdminMetricsConfig( + anonymous_timeout=SessionConfig.ANONYMOUS_SESSION_TIMEOUT, + displaced_timeout=SessionConfig.DISPLACED_SESSION_TIMEOUT, + cleanup_interval=SessionConfig.CLEANUP_INTERVAL, + max_cleanup_per_cycle=SessionConfig.MAX_SESSIONS_PER_CLEANUP, + ) + + return AdminMetricsResponse( + total_sessions=total_sessions, + active_sessions=active_sessions, + named_sessions=named_sessions, + displaced_sessions=displaced_sessions, + old_anonymous_sessions=old_anonymous, + old_displaced_sessions=old_displaced, + total_lobbies=len(lobbies), + cleanup_candidates=old_anonymous + old_displaced, + config=config, + ) + + @classmethod + def validate_session_integrity(cls) -> list[str]: + """Validate session data integrity""" + issues: list[str] = [] + + try: + with cls.lock: + for session in cls._instances: + with session.session_lock: + # Check for orphaned lobby references + for lobby in session.lobbies: + if lobby.id not in lobbies: + issues.append( + f"Session {session.id[:8]}:{session.name} references missing lobby {lobby.id}" + ) + + # Check for inconsistent peer relationships + for lobby_id, peer_ids in session.lobby_peers.items(): + lobby = lobbies.get(lobby_id) + if lobby: + with lobby.lock: + if session.id not in lobby.sessions: + issues.append( + f"Session {session.id[:8]}:{session.name} has peers in lobby {lobby_id} but not in lobby.sessions" + ) + + # Check if peer sessions actually exist + for peer_id in peer_ids: + if peer_id not in lobby.sessions: + issues.append( + f"Session {session.id[:8]}:{session.name} references non-existent peer {peer_id} in lobby {lobby_id}" + ) + else: + issues.append( + f"Session {session.id[:8]}:{session.name} has peer list for non-existent lobby {lobby_id}" + ) + + # Check lobbies for consistency + for lobby_id, lobby in lobbies.items(): + with lobby.lock: + for session_id in lobby.sessions: + found_session = None + for s in cls._instances: + if s.id == session_id: + found_session = s + break + + if not found_session: + issues.append( + f"Lobby {lobby_id} references non-existent session {session_id}" + ) + else: + with found_session.session_lock: + if lobby not in found_session.lobbies: + issues.append( + f"Lobby {lobby_id} contains session {session_id} but session doesn't reference lobby" + ) + + except Exception as e: + logger.error(f"Error during session validation: {e}") + issues.append(f"Validation error: {str(e)}") + + return issues + + @classmethod + async def cleanup_all_sessions(cls): + """Clean up all sessions during shutdown""" + logger.info("Starting graceful session cleanup...") + + try: + with cls.lock: + sessions_to_cleanup = cls._instances[:] + + for session in sessions_to_cleanup: + try: + with session.session_lock: + # Close WebSocket connections + if session.ws: + try: + await session.ws.close() + except Exception as e: + logger.warning( + f"Error closing WebSocket for {session.getName()}: {e}" + ) + session.ws = None + + # Remove from lobbies + for lobby in session.lobbies[:]: + try: + await session.part(lobby) + except Exception as e: + logger.warning( + f"Error removing {session.getName()} from lobby: {e}" + ) + + except Exception as e: + logger.error(f"Error cleaning up session {session.getName()}: {e}") + + # Clear all data structures + with cls.lock: + cls._instances.clear() + lobbies.clear() + + logger.info( + f"Graceful session cleanup completed for {len(sessions_to_cleanup)} sessions" + ) + + except Exception as e: + logger.error(f"Error during graceful session cleanup: {e}") + async def join(self, lobby: Lobby): if not self.ws: logger.error( @@ -574,89 +982,129 @@ class Session: ) return - if lobby.id in self.lobby_peers or self.id in lobby.sessions: - logger.info(f"{self.getName()} - Already joined to {lobby.getName()}.") - data = JoinStatusModel( - status="Joined", message=f"Already joined to lobby {lobby.getName()}" - ) - await self.ws.send_json({"type": "join_status", "data": data.model_dump()}) - return + with self.session_lock: + if lobby.id in self.lobby_peers or self.id in lobby.sessions: + logger.info(f"{self.getName()} - Already joined to {lobby.getName()}.") + data = JoinStatusModel( + status="Joined", + message=f"Already joined to lobby {lobby.getName()}", + ) + try: + await self.ws.send_json( + {"type": "join_status", "data": data.model_dump()} + ) + except Exception as e: + logger.warning( + f"Failed to send join status to {self.getName()}: {e}" + ) + return # Initialize the peer list for this lobby - self.lobbies.append(lobby) - self.lobby_peers[lobby.id] = [] + with self.session_lock: + self.lobbies.append(lobby) + self.lobby_peers[lobby.id] = [] - for peer_id in lobby.sessions: - if peer_id == self.id: - raise Exception( + with lobby.lock: + peer_sessions = list(lobby.sessions.values()) + + for peer_session in peer_sessions: + if peer_session.id == self.id: + logger.error( "Should not happen: self in lobby.sessions while not in lobby." ) + continue - peer_session = lobby.getSession(peer_id) - if not peer_session or not peer_session.ws: + if not peer_session.ws: logger.warning( - f"{self.getName()} - Live peer session {peer_id} not found in lobby {lobby.getName()}. Removing." + f"{self.getName()} - Live peer session {peer_session.id} not found in lobby {lobby.getName()}. Removing." ) - del lobby.sessions[peer_id] + with lobby.lock: + if peer_session.id in lobby.sessions: + del lobby.sessions[peer_session.id] continue # Add the peer to session's RTC peer list - self.lobby_peers[lobby.id].append(peer_id) + with self.session_lock: + self.lobby_peers[lobby.id].append(peer_session.id) # Add this user as an RTC peer to each existing peer - peer_session.lobby_peers[lobby.id].append(self.id) + with peer_session.session_lock: + if lobby.id not in peer_session.lobby_peers: + peer_session.lobby_peers[lobby.id] = [] + peer_session.lobby_peers[lobby.id].append(self.id) logger.info( - f"{self.getName()} -> {peer_session.getName()}:addPeer({self.getName(), lobby.getName()}, should_create_offer=False)" - ) - await peer_session.ws.send_json( - { - "type": "addPeer", - "data": { - "peer_id": self.id, - "peer_name": self.name, - "should_create_offer": False, - }, - } + f"{self.getName()} -> {peer_session.getName()}:addPeer({self.getName()}, {lobby.getName()}, should_create_offer=False)" ) + try: + await peer_session.ws.send_json( + { + "type": "addPeer", + "data": { + "peer_id": self.id, + "peer_name": self.name, + "should_create_offer": False, + }, + } + ) + except Exception as e: + logger.warning( + f"Failed to send addPeer to {peer_session.getName()}: {e}" + ) # Add each other peer to the caller logger.info( - f"{self.getName()} -> {self.getName()}:addPeer({peer_session.getName(), lobby.getName()}, should_create_offer=True)" - ) - await self.ws.send_json( - { - "type": "addPeer", - "data": { - "peer_id": peer_session.id, - "peer_name": peer_session.name, - "should_create_offer": True, - }, - } + f"{self.getName()} -> {self.getName()}:addPeer({peer_session.getName()}, {lobby.getName()}, should_create_offer=True)" ) + try: + await self.ws.send_json( + { + "type": "addPeer", + "data": { + "peer_id": peer_session.id, + "peer_name": peer_session.name, + "should_create_offer": True, + }, + } + ) + except Exception as e: + logger.warning(f"Failed to send addPeer to {self.getName()}: {e}") # Add this user as an RTC peer await lobby.addSession(self) Session.save() - await self.ws.send_json({"type": "join_status", "data": {"status": "Joined"}}) + try: + await self.ws.send_json( + {"type": "join_status", "data": {"status": "Joined"}} + ) + except Exception as e: + logger.warning(f"Failed to send join confirmation to {self.getName()}: {e}") async def part(self, lobby: Lobby): - if lobby.id not in self.lobby_peers or self.id not in lobby.sessions: - logger.info( - f"{self.getName()} - Attempt to part non-joined lobby {lobby.getName()}." - ) - if self.ws: - await self.ws.send_json( - {"type": "error", "error": "Attempt to part non-joined lobby"} + with self.session_lock: + if lobby.id not in self.lobby_peers or self.id not in lobby.sessions: + logger.info( + f"{self.getName()} - Attempt to part non-joined lobby {lobby.getName()}." ) - return + if self.ws: + try: + await self.ws.send_json( + { + "type": "error", + "error": "Attempt to part non-joined lobby", + } + ) + except Exception: + pass + return - logger.info(f"{self.getName()} <- part({lobby.getName()}) - Lobby part.") + logger.info(f"{self.getName()} <- part({lobby.getName()}) - Lobby part.") - lobby_peers = self.lobby_peers[lobby.id] - del self.lobby_peers[lobby.id] - self.lobbies.remove(lobby) + lobby_peers = self.lobby_peers[lobby.id][:] # Copy the list + del self.lobby_peers[lobby.id] + if lobby in self.lobbies: + self.lobbies.remove(lobby) # Remove this peer from all other RTC peers, and remove each peer from this peer for peer_session_id in lobby_peers: @@ -666,36 +1114,57 @@ class Session: f"{self.getName()} <- part({lobby.getName()}) - Peer session {peer_session_id} not found. Skipping." ) continue - if not peer_session.ws: + + if peer_session.ws: + logger.info( + f"{peer_session.getName()} <- remove_peer({self.getName()})" + ) + try: + await peer_session.ws.send_json( + { + "type": "removePeer", + "data": {"peer_name": self.name, "peer_id": self.id}, + } + ) + except Exception as e: + logger.warning( + f"Failed to send removePeer to {peer_session.getName()}: {e}" + ) + else: logger.warning( f"{self.getName()} <- part({lobby.getName()}) - No WebSocket connection for {peer_session.getName()}. Skipping." ) - continue - logger.info(f"{peer_session.getName()} <- remove_peer({self.getName()})") - await peer_session.ws.send_json( - { - "type": "removePeer", - "data": {"peer_name": self.name, "peer_id": self.id}, - } - ) - if not self.ws: + # Remove from peer's lobby_peers + with peer_session.session_lock: + if ( + lobby.id in peer_session.lobby_peers + and self.id in peer_session.lobby_peers[lobby.id] + ): + peer_session.lobby_peers[lobby.id].remove(self.id) + + if self.ws: + logger.info( + f"{self.getName()} <- remove_peer({peer_session.getName()})" + ) + try: + await self.ws.send_json( + { + "type": "removePeer", + "data": { + "peer_name": peer_session.name, + "peer_id": peer_session.id, + }, + } + ) + except Exception as e: + logger.warning( + f"Failed to send removePeer to {self.getName()}: {e}" + ) + else: logger.error( f"{self.getName()} <- part({lobby.getName()}) - No WebSocket connection." ) - continue - - logger.info(f"{self.getName()} <- remove_peer({peer_session.getName()})") - - await self.ws.send_json( - { - "type": "removePeer", - "data": { - "peer_name": peer_session.name, - "peer_id": peer_session.id, - }, - } - ) await lobby.removeSession(self) Session.save() @@ -730,9 +1199,7 @@ def getLobbyByName(lobby_name: str) -> Lobby | None: @app.get(f"{public_url}api/health", response_model=HealthResponse) def health(): logger.info("Health check endpoint called.") - return { - "status": "ok", - } + return HealthResponse(status="ok") # A session (cookie) is bound to a single user (name). @@ -764,25 +1231,25 @@ async def session( session.update_last_used() # Update activity on session resumption logger.info(f"{session.getName()}: Existing session resumed.") # Part all lobbies for this session that have no active websocket - for lobby_id in list(session.lobby_peers.keys()): - lobby = None + with session.session_lock: + lobbies_to_part = session.lobbies[:] + for lobby in lobbies_to_part: try: - lobby = getLobby(lobby_id) + await session.part(lobby) except Exception as e: logger.error( - f"{session.getName()} - Error getting lobby {lobby_id}: {e}" + f"{session.getName()} - Error parting lobby {lobby.getName()}: {e}" ) - continue - await session.part(lobby) - return SessionResponse( - id=session_id, - name=session.name if session.name else "", - lobbies=[ - LobbyModel(id=lobby.id, name=lobby.name, private=lobby.private) - for lobby in session.lobbies - ], - ) + with session.session_lock: + return SessionResponse( + id=session_id, + name=session.name if session.name else "", + lobbies=[ + LobbyModel(id=lobby.id, name=lobby.name, private=lobby.private) + for lobby in session.lobbies + ], + ) @app.get(public_url + "api/lobby", response_model=LobbiesResponse) @@ -804,7 +1271,11 @@ async def lobby_create( create_request: LobbyCreateRequest = Body(...), ) -> Response | LobbyCreateResponse: if create_request.type != "lobby_create": - return {"error": "Invalid request type"} + return Response( + content=json.dumps({"error": "Invalid request type"}), + status_code=400, + media_type="application/json", + ) data = create_request.data session = getSession(session_id) @@ -867,7 +1338,6 @@ async def register_bot_provider( request: BotProviderRegisterRequest, ) -> BotProviderRegisterResponse: """Register a new bot provider""" - import time import uuid provider_id = str(uuid.uuid4()) @@ -903,8 +1373,6 @@ async def list_available_bots() -> BotListResponse: # Update last_seen timestamps and fetch bots from each provider for provider_id, provider in bot_providers.items(): try: - import time - provider.last_seen = time.time() # Make HTTP request to provider's /bots endpoint @@ -1062,31 +1530,54 @@ async def lobby_join( session.ws = websocket session.update_last_used() # Update activity timestamp - if session.id in lobby.sessions: - logger.info( - f"{session.getName()} - Stale session in lobby {lobby.getName()}. Re-joining." - ) - await session.part(lobby) - await lobby.removeSession(session) - for peer_id in lobby.sessions: - peer_session = lobby.getSession(peer_id) - if not peer_session or not peer_session.ws: - logger.warning( - f"{session.getName()} - Live peer session {peer_id} not found in lobby {lobby.getName()}. Removing." + # Check if session is already in lobby and clean up if needed + with lobby.lock: + if session.id in lobby.sessions: + logger.info( + f"{session.getName()} - Stale session in lobby {lobby.getName()}. Re-joining." ) - del lobby.sessions[peer_id] + try: + await session.part(lobby) + await lobby.removeSession(session) + except Exception as e: + logger.warning(f"Error cleaning up stale session: {e}") + + # Notify existing peers about new user + failed_peers: list[str] = [] + with lobby.lock: + peer_sessions = list(lobby.sessions.values()) + + for peer_session in peer_sessions: + if not peer_session.ws: + logger.warning( + f"{session.getName()} - Live peer session {peer_session.id} not found in lobby {lobby.getName()}. Marking for removal." + ) + failed_peers.append(peer_session.id) continue + logger.info(f"{session.getName()} -> user_joined({peer_session.getName()})") - await peer_session.ws.send_json( - { - "type": "user_joined", - "data": { - "session_id": session.id, - "name": session.name, - }, - } - ) + try: + await peer_session.ws.send_json( + { + "type": "user_joined", + "data": { + "session_id": session.id, + "name": session.name, + }, + } + ) + except Exception as e: + logger.warning( + f"Failed to notify {peer_session.getName()} of user join: {e}" + ) + failed_peers.append(peer_session.id) + + # Clean up failed peers + with lobby.lock: + for failed_peer_id in failed_peers: + if failed_peer_id in lobby.sessions: + del lobby.sessions[failed_peer_id] try: while True: @@ -1216,7 +1707,9 @@ async def lobby_join( "Failed to notify displaced session websocket" ) # Update all lobbies the displaced session was in - for d_lobby in list(displaced.lobbies): + with displaced.session_lock: + displaced_lobbies = displaced.lobbies[:] + for d_lobby in displaced_lobbies: try: await d_lobby.update_state() except Exception: @@ -1313,18 +1806,20 @@ async def lobby_join( ) continue - if ( - lobby.id not in session.lobby_peers - or session.id not in lobby.sessions - ): - logger.error( - f"{session.short}:{session.name} <- relayICECandidate - Not an RTC peer ({session.id})" - ) - await websocket.send_json( - {"type": "error", "error": "Not joined to lobby"} - ) - continue - session_peers = session.lobby_peers[lobby.id] + with session.session_lock: + if ( + lobby.id not in session.lobby_peers + or session.id not in lobby.sessions + ): + logger.error( + f"{session.short}:{session.name} <- relayICECandidate - Not an RTC peer ({session.id})" + ) + await websocket.send_json( + {"type": "error", "error": "Not joined to lobby"} + ) + continue + session_peers = session.lobby_peers[lobby.id] + peer_id = data.get("peer_id") if peer_id not in session_peers: logger.error( @@ -1354,11 +1849,14 @@ async def lobby_join( logger.warning( f"{session.getName()} - Live peer session {peer_id} not found in lobby {lobby.getName()}." ) - break + continue logger.info( f"{session.getName()} -> iceCandidate({peer_session.getName()})" ) - await peer_session.ws.send_json(message) + try: + await peer_session.ws.send_json(message) + except Exception as e: + logger.warning(f"Failed to relay ICE candidate: {e}") case "relaySessionDescription": logger.info(f"{session.getName()} <- relaySessionDescription") @@ -1374,19 +1872,21 @@ async def lobby_join( ) continue - if ( - lobby.id not in session.lobby_peers - or session.id not in lobby.sessions - ): - logger.error( - f"{session.short}:{session.name} <- relaySessionDescription - Not an RTC peer ({session.id})" - ) - await websocket.send_json( - {"type": "error", "error": "Not joined to lobby"} - ) - continue + with session.session_lock: + if ( + lobby.id not in session.lobby_peers + or session.id not in lobby.sessions + ): + logger.error( + f"{session.short}:{session.name} <- relaySessionDescription - Not an RTC peer ({session.id})" + ) + await websocket.send_json( + {"type": "error", "error": "Not joined to lobby"} + ) + continue + + lobby_peers = session.lobby_peers[lobby.id] - lobby_peers = session.lobby_peers[lobby.id] peer_id = data.get("peer_id") if peer_id not in lobby_peers: logger.error( @@ -1400,7 +1900,6 @@ async def lobby_join( ) continue - peer_id = data.get("peer_id", None) if not peer_id: logger.error( f"{session.getName()} - relaySessionDescription missing peer_id" @@ -1417,7 +1916,7 @@ async def lobby_join( logger.warning( f"{session.getName()} - Live peer session {peer_id} not found in lobby {lobby.getName()}." ) - break + continue session_description = data.get("session_description") message = { @@ -1432,7 +1931,10 @@ async def lobby_join( logger.info( f"{session.getName()} -> sessionDescription({peer_session.getName()})" ) - await peer_session.ws.send_json(message) + try: + await peer_session.ws.send_json(message) + except Exception as e: + logger.warning(f"Failed to relay session description: {e}") case _: await websocket.send_json( @@ -1447,15 +1949,30 @@ async def lobby_join( # Cleanup: remove session from lobby and sessions dict session.ws = None if session.id in lobby.sessions: - await session.part(lobby) + try: + await session.part(lobby) + except Exception as e: + logger.warning(f"Error during websocket disconnect cleanup: {e}") - await lobby.update_state() + try: + await lobby.update_state() + except Exception as e: + logger.warning(f"Error updating lobby state after disconnect: {e}") # Clean up empty lobbies - if not lobby.sessions: - if lobby.id in lobbies: - del lobbies[lobby.id] - logger.info(f"Cleaned up empty lobby {lobby.getName()}") + with lobby.lock: + if not lobby.sessions: + if lobby.id in lobbies: + del lobbies[lobby.id] + logger.info(f"Cleaned up empty lobby {lobby.getName()}") + except Exception as e: + logger.error( + f"Unexpected error in websocket handler for {session.getName()}: {e}" + ) + try: + await websocket.close() + except: + pass # Serve static files or proxy to frontend development server @@ -1513,11 +2030,9 @@ else: # WebSocket proxy for /ws (for React DevTools, etc.) import websockets - import asyncio - from starlette.websockets import WebSocket as StarletteWebSocket @app.websocket("/ws") - async def websocket_proxy(websocket: StarletteWebSocket): + async def websocket_proxy(websocket: WebSocket): logger.info("REACT: WebSocket proxy connection established.") # Get scheme from websocket.url (should be 'ws' or 'wss') scheme = websocket.url.scheme if hasattr(websocket, "url") else "ws" @@ -1549,4 +2064,4 @@ else: logger.info("REACT: WebSocket proxy connection closed.") except Exception as e: logger.error(f"REACT: WebSocket proxy error: {e}") - await websocket.close() + await websocket.close() \ No newline at end of file diff --git a/shared/models.py b/shared/models.py index c535a2a..5effd3d 100644 --- a/shared/models.py +++ b/shared/models.py @@ -56,10 +56,43 @@ class AdminNamesResponse(BaseModel): class AdminActionResponse(BaseModel): """Response for admin actions""" - status: Literal["ok", "not_found"] + + status: Literal["ok", "not_found", "error"] name: str +class AdminValidationResponse(BaseModel): + """Response for admin session validation""" + + status: Literal["ok", "error"] + issues: List[str] = [] + issue_count: int = 0 + error: Optional[str] = None + + +class AdminMetricsConfig(BaseModel): + """Config data for metrics response""" + + anonymous_timeout: int + displaced_timeout: int + cleanup_interval: int + max_cleanup_per_cycle: int + + +class AdminMetricsResponse(BaseModel): + """Response for admin session metrics""" + + total_sessions: int + active_sessions: int + named_sessions: int + displaced_sessions: int + old_anonymous_sessions: int + old_displaced_sessions: int + total_lobbies: int + cleanup_candidates: int + config: AdminMetricsConfig + + class AdminSetPassword(BaseModel): """Request model for setting admin password""" name: str @@ -147,6 +180,12 @@ class UpdateNameModel(BaseModel): protected: Optional[bool] = False +class WebSocketErrorModel(BaseModel): + """WebSocket error message""" + + error: str + + class WebSocketMessageModel(BaseModel): """Base model for all WebSocket messages""" @@ -164,6 +203,7 @@ class WebSocketMessageModel(BaseModel): | LobbyCreateResponse | ChatMessageModel | ChatMessagesListModel + | WebSocketErrorModel | Dict[str, str] ) # Generic dict for simple messages