from fastapi import ( Cookie, FastAPI, Path, WebSocket, WebSocketDisconnect, Request, Response, ) from fastapi.staticfiles import StaticFiles import secrets import os import httpx from logger import logger public_url = os.getenv("PUBLIC_URL", "/") if not public_url.endswith("/"): public_url += "/" app = FastAPI() logger.info(f"Starting server with public URL: {public_url}") class Session: def __init__(self, id): self.id = id self.short = id[:8] self.name = "" self.lobbies: dict[str, Lobby] = {} self.ws: WebSocket | None = None self.has_audio = False self.has_video = False def getName(session: Session | None) -> str | None: if session and session.name: return session.name return None class Lobby: def __init__(self, name: str): self.id = secrets.token_hex(16) self.short = self.id[:8] self.name = name self.sessions: dict[str, Session] = {} # All lobby members self.peers: dict[str, Session] = {} # RTC joined peers only def addSession(self, session: Session): if session.id not in self.sessions: self.sessions[session.id] = session def removeSession(self, session: Session): if session.id in self.sessions: del self.sessions[session.id] def getSession(self, id) -> Session | None: return self.sessions.get(id, None) lobbies: dict[str, Lobby] = {} sessions: dict[str, Session] = {} def getSession(session_id) -> Session | None: return sessions.get(session_id, None) def getLobby(lobby_id) -> Lobby | None: return lobbies.get(lobby_id, None) def getLobbyByName(lobby_name) -> Lobby | None: for lobby in lobbies.values(): if lobby.name == lobby_name: return lobby return None # API endpoints @app.get(f"{public_url}api/health") def health(): logger.info("Health check endpoint called.") return {"status": "ok", "sessions": len(sessions), "lobbies": len(lobbies)} # A session (cookie) is bound to a single user (name). # A user can be in multiple lobbies, but a session is unique to a single user. # A user can change their name, but the session ID remains the same and the name # updates for all lobbies. @app.get(f"{public_url}api/session") async def session( request: Request, response: Response, session_id: str = Cookie(default=None) ): if session_id is None: session_id = secrets.token_hex(16) response.set_cookie(key="session_id", value=session_id) # Validate that session_id is a hex string of length 32 elif ( not isinstance(session_id, str) or len(session_id) != 32 or not all(c in "0123456789abcdef" for c in session_id) ): return {"error": "Invalid session_id"} print(f"[{session_id[:8]}]: Browser hand-shake achieved.") session = getSession(session_id) if not session: session = Session(session_id) sessions[session_id] = session logger.info(f"{getSessionName(session)}: New session created.") else: logger.info(f"{getSessionName(session)}: Existing session resumed.") return { "id": session_id, "name": session.name if session.name else None, "lobbies": [lobby.name for lobby in sessions[session_id].lobbies.values()], } @app.get(public_url + "api/lobby/{lobby_name}/{session_id}") async def lobby( request: Request, response: Response, lobby_name: str | None = Path(...), session_id: str | None = Path(...), ): if lobby_name is None: return {"error": "Missing lobby_name"} if session_id is None: return {"error": "Missing session_id"} session = getSession(session_id) if not session: return {"error": f"Session not found ({session_id})"} lobby = getLobbyByName(lobby_name) if not lobby: lobby = Lobby(lobby_name) lobbies[lobby.id] = lobby logger.info( f"{getSessionName(session)} <- lobby_create({lobby.short}:{lobby.name})" ) lobby.addSession(sessions[session_id]) sessions[session_id].lobbies[lobby.id] = lobby return {"lobby": lobby.id} all_label = "[ all ]" info_label = "[ info ]" todo_label = "[ todo ]" unset_label = "[ ---- ]" # Join the media session in a lobby async def join( lobby: Lobby, session: Session, has_video: bool, has_audio: bool, ): if not session.name: logger.error( f"{session.short}:[UNSET] <- join - No name set yet. Media not available." ) return if not session.ws: logger.error( f"{getSessionName(session)} - No WebSocket connection. Media not available." ) return logger.info(f"{getSessionName(session)} <- join({getLobbyName(lobby)})") # if session.id in lobby.peers: # logger.info(f"{getSessionName(session)} - Already joined to Media.") # return # Notify all existing RTC peers for peer_session in lobby.peers.values(): if peer_session.id == session.id: continue if not peer_session.ws: logger.warning( f"{getSessionName(peer_session)} - No WebSocket connection. Skipping." ) continue logger.info( f"{getSessionName(peer_session)} -> addPeer({getSessionName(session), getLobbyName(lobby)}, video={has_video}, audio={has_audio}, should_create_offer=False)" ) await peer_session.ws.send_json( { "type": "addPeer", "data": { "peer_id": session.id, "peer_name": session.name, "should_create_offer": False, "has_audio": has_audio, "has_video": has_video, }, } ) # Add each other peer to the caller if session.ws: logger.info( f"{getSessionName(session)} -> addPeer({getSessionName(peer_session), getLobbyName(lobby)}, video={peer_session.has_video}, audio={peer_session.has_audio}, should_create_offer=True)" ) await session.ws.send_json( { "type": "addPeer", "data": { "peer_id": peer_session.id, "peer_name": peer_session.name, "should_create_offer": True, "has_audio": peer_session.has_audio, "has_video": peer_session.has_video, }, } ) # Add this user as an RTC peer lobby.peers[session.id] = session await update_users(lobby) async def part( lobby: Lobby, session: Session, ): if session.id not in lobby.peers: logger.info( f"{getSessionName(session)}: <- part({getLobbyName(lobby)}) - Does not exist in RTC peers." ) return logger.info( f"{getSessionName(session)}: <- part({getLobbyName(lobby)}) - Media part." ) del lobby.peers[session.id] # Remove this peer from all other RTC peers, and remove each peer from this peer for peer_session in lobby.peers.values(): if not peer_session.ws: logger.warning( f"{getSessionName(peer_session)} <- part({getLobbyName(lobby)}) - No WebSocket connection. Skipping." ) continue logger.info( f"{getSessionName(peer_session)} <- remove_peer({getSessionName(session)})" ) await peer_session.ws.send_json( {"type": "remove_peer", "data": {"peer_id": session.id}} ) if session.ws: logger.info( f"{getSessionName(session)} <- remove_peer({getSessionName(peer_session)})" ) await session.ws.send_json( {"type": "remove_peer", "data": {"peer_id": peer_session.id}} ) else: logger.error( f"{getSessionName(session)} <- part({getLobbyName(lobby)}) - No WebSocket connection." ) async def update_users(lobby: Lobby, requesting_session: Session | None = None): users = [ {"name": s.name, "live": True if s.ws else False, "session_id": s.id} for s in lobby.sessions.values() if s.name ] if requesting_session: logger.info( f"{requesting_session.short}:{requesting_session.name} -> list_users({lobby.name})" ) if requesting_session.ws: await requesting_session.ws.send_json({"type": "users", "users": users}) else: logger.warning( f"{requesting_session.short}:{requesting_session.name} - No WebSocket connection." ) else: for s in lobby.sessions.values(): logger.info( f"{s.short}:{s.name if s.name else unset_label} -> list_users({lobby.name})" ) if s.ws: await s.ws.send_json({"type": "users", "users": users}) def getSessionName(session: Session) -> str: return f"{session.short}:{session.name if session.name else unset_label}" def getLobbyName(lobby: Lobby) -> str: return f"{lobby.short}:{lobby.name}" # Register websocket endpoint directly on app with full public_url path @app.websocket(f"{public_url}" + "ws/lobby/{lobby_id}/{session_id}") async def websocket_lobby( websocket: WebSocket, lobby_id: str | None = Path(...), session_id: str | None = Path(...), ): await websocket.accept() if lobby_id is None: await websocket.send_json( {"type": "error", "error": "Invalid or missing lobby"} ) await websocket.close() return if session_id is None: await websocket.send_json( {"type": "error", "error": "Invalid or missing session"} ) await websocket.close() return session = getSession(session_id) if not session: # logger.error(f"Invalid session ID {session_id}") await websocket.send_json( {"type": "error", "error": f"Invalid session ID {session_id}"} ) await websocket.close() return lobby = getLobby(lobby_id) if not lobby: logger.error(f"Invalid lobby ID {lobby_id}") await websocket.send_json( {"type": "error", "error": f"Invalid lobby ID {lobby_id}"} ) await websocket.close() return logger.info( f"{getSessionName(session)} <- lobby_connect({lobby.short}:{lobby.name})" ) session.ws = websocket # This user session just went from Dead to Live, so update everyone's user list await update_users(lobby) try: while True: data = await websocket.receive_json() # logger.info(f"{getSessionName(session)} <- RAW Rx: {data}") match data.get("type"): case "set_name": name = data.get("name") if not name: await websocket.send_json( {"type": "error", "error": "Name required"} ) continue # Check for duplicate name if any(s.name.lower() == name.lower() for s in sessions.values()): await websocket.send_json( {"type": "error", "error": "Name already taken"} ) continue session.name = name logger.info( f"{getSessionName(session)} <- set_name({session.name})" ) await websocket.send_json({"type": "update", "name": name}) await update_users(lobby) case "list_users": await update_users(lobby, session) case "media_status": has_audio = data.get("has_audio", False) has_video = data.get("has_video", False) logger.info( f"{getSessionName(session)}: <- media-status(audio: {has_audio}, video: {has_video})" ) session.has_audio = has_audio session.has_video = has_video case "join": logger.info(f"{getSessionName(session)} <- join {data}") has_audio = data.get("has_audio", False) has_video = data.get("has_video", False) await join(lobby, session, has_video, has_audio) case "part": await part(lobby, session) case "relayICECandidate": logger.info(f"{getSessionName(session)} <- relayICECandidate") if session.id not in lobby.peers: logger.error( f"{session.short}:{session.name} <- relayICECandidate - Not an RTC peer ({session.id})" ) await websocket.send_json( {"type": "error", "error": "Not joined to media session"} ) continue peer_id = data.get("config", {}).get("peer_id") if peer_id not in lobby.peers: logger.error( f"{getSessionName(session)} <- relayICECandidate - Not an RTC peer({peer_id})" ) await websocket.send_json( { "type": "error", "error": f"Target peer {peer_id} not found", } ) continue candidate = data.get("config", {}).get("candidate") message = { "type": "iceCandidate", "data": {"peer_id": session.id, "candidate": candidate}, } if peer_id in lobby.peers: ws = lobby.peers[peer_id].ws if not ws: logger.warning( f"{lobby.peers[peer_id].short}:{lobby.peers[peer_id].name} - No WebSocket connection. Skipping." ) break logger.info( f"{getSessionName(session)} -> iceCandidate({getSessionName(lobby.peers[peer_id])})" ) await ws.send_json(message) case "relaySessionDescription": logger.info(f"{getSessionName(session)} <- relaySessionDescription") if session.id not in lobby.peers: logger.error( f"{session.short}:{session.name} - relaySessionDescription - Not an RTC peer" ) break peer_id = data.get("config", {}).get("peer_id") peer = lobby.peers.get(peer_id, None) if not peer: logger.error( f"{getSessionName(session)} <- relaySessionDescription - Not an RTC peer({peer_id})" ) break session_description = data.get("config", {}).get( "session_description" ) message = { "type": "sessionDescription", "data": { "peer_id": session.id, "session_description": session_description, }, } if not peer.ws: logger.warning( f"{lobby.peers[peer_id].short}:{lobby.peers[peer_id].name} - No WebSocket connection. Skipping." ) break logger.info( f"{getSessionName(session)} -> sessionDescription({getSessionName(lobby.peers[peer_id])})" ) await peer.ws.send_json(message) case _: await websocket.send_json( { "type": "error", "error": f"Unknown request type: {data.get('type')}", } ) except WebSocketDisconnect: logger.info(f"{getSessionName(session)} <- WebSocket disconnected for user.") # Cleanup: remove session from lobby and sessions dict session.ws = None if session.id in lobby.peers: await part(lobby, session) await update_users(lobby) # Clean up empty lobbies if not lobby.sessions: if lobby.id in lobbies: del lobbies[lobby.id] logger.info(f"Cleaned up empty lobby {lobby.short}") # Serve static files or proxy to frontend development server PRODUCTION = os.getenv("PRODUCTION", "false").lower() == "true" client_build_path = os.path.join(os.path.dirname(__file__), "/client/build") if PRODUCTION: logger.info(f"Serving static files from: {client_build_path} at {public_url}") app.mount( public_url, StaticFiles(directory=client_build_path, html=True), name="static" ) else: logger.info(f"Proxying static files to http://static-frontend:3000 at {public_url}") import ssl @app.api_route( f"{public_url}{{path:path}}", methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "HEAD", "PATCH"], ) async def proxy_static(request: Request, path: str): # Do not proxy API or websocket paths if path.startswith("api/") or path.startswith("ws/"): return Response(status_code=404) url = f"{request.url.scheme}://static-frontend:3000/{public_url.strip('/')}/{path}" if not path: url = f"{request.url.scheme}://static-frontend:3000/{public_url.strip('/')}" headers = dict(request.headers) try: # Accept self-signed certs in dev async with httpx.AsyncClient(verify=False) as client: proxy_req = client.build_request( request.method, url, headers=headers, content=await request.body() ) proxy_resp = await client.send(proxy_req, stream=True) content = await proxy_resp.aread() # Remove problematic headers for browser decoding filtered_headers = { k: v for k, v in proxy_resp.headers.items() if k.lower() not in ["content-encoding", "transfer-encoding", "content-length"] } return Response( content=content, status_code=proxy_resp.status_code, headers=filtered_headers, ) except Exception as e: logger.error(f"Proxy error for {url}: {e}") return Response("Proxy error", status_code=502) # WebSocket proxy for /ws (for React DevTools, etc.) import websockets import asyncio from starlette.websockets import WebSocket as StarletteWebSocket @app.websocket("/ws") async def websocket_proxy(websocket: StarletteWebSocket): logger.info("REACT: WebSocket proxy connection established.") # Get scheme from websocket.url (should be 'ws' or 'wss') scheme = websocket.url.scheme if hasattr(websocket, "url") else "ws" target_url = f"{scheme}://static-frontend:3000/ws" await websocket.accept() try: # Accept self-signed certs in dev for WSS ssl_ctx = ssl.create_default_context() ssl_ctx.check_hostname = False ssl_ctx.verify_mode = ssl.CERT_NONE async with websockets.connect(target_url, ssl=ssl_ctx) as target_ws: async def client_to_server(): while True: msg = await websocket.receive_text() await target_ws.send(msg) async def server_to_client(): while True: msg = await target_ws.recv() if isinstance(msg, str): await websocket.send_text(msg) else: await websocket.send_bytes(msg) try: await asyncio.gather(client_to_server(), server_to_client()) except (WebSocketDisconnect, websockets.ConnectionClosed): logger.info("REACT: WebSocket proxy connection closed.") except Exception as e: logger.error(f"REACT: WebSocket proxy error: {e}") await websocket.close()