2025-08-26 17:11:42 -07:00

563 lines
19 KiB
Python

from fastapi import (
Cookie,
FastAPI,
Path,
WebSocket,
WebSocketDisconnect,
Request,
Response,
)
from fastapi.staticfiles import StaticFiles
import secrets
import os
import httpx
from logger import logger
public_url = os.getenv("PUBLIC_URL", "/")
if not public_url.endswith("/"):
public_url += "/"
app = FastAPI()
logger.info(f"Starting server with public URL: {public_url}")
class Session:
def __init__(self, id):
self.id = id
self.short = id[:8]
self.name = ""
self.lobbies: dict[str, Lobby] = {}
self.ws: WebSocket | None = None
self.has_audio = False
self.has_video = False
def getName(session: Session | None) -> str | None:
if session and session.name:
return session.name
return None
class Lobby:
def __init__(self, name: str):
self.id = secrets.token_hex(16)
self.short = self.id[:8]
self.name = name
self.sessions: dict[str, Session] = {} # All lobby members
self.peers: dict[str, Session] = {} # RTC joined peers only
def addSession(self, session: Session):
if session.id not in self.sessions:
self.sessions[session.id] = session
def removeSession(self, session: Session):
if session.id in self.sessions:
del self.sessions[session.id]
def getSession(self, id) -> Session | None:
return self.sessions.get(id, None)
lobbies: dict[str, Lobby] = {}
sessions: dict[str, Session] = {}
def getSession(session_id) -> Session | None:
return sessions.get(session_id, None)
def getLobby(lobby_id) -> Lobby | None:
return lobbies.get(lobby_id, None)
def getLobbyByName(lobby_name) -> Lobby | None:
for lobby in lobbies.values():
if lobby.name == lobby_name:
return lobby
return None
# API endpoints
@app.get(f"{public_url}api/health")
def health():
logger.info("Health check endpoint called.")
return {"status": "ok", "sessions": len(sessions), "lobbies": len(lobbies)}
# A session (cookie) is bound to a single user (name).
# A user can be in multiple lobbies, but a session is unique to a single user.
# A user can change their name, but the session ID remains the same and the name
# updates for all lobbies.
@app.get(f"{public_url}api/session")
async def session(
request: Request, response: Response, session_id: str = Cookie(default=None)
):
if session_id is None:
session_id = secrets.token_hex(16)
response.set_cookie(key="session_id", value=session_id)
# Validate that session_id is a hex string of length 32
elif (
not isinstance(session_id, str)
or len(session_id) != 32
or not all(c in "0123456789abcdef" for c in session_id)
):
return {"error": "Invalid session_id"}
print(f"[{session_id[:8]}]: Browser hand-shake achieved.")
session = getSession(session_id)
if not session:
session = Session(session_id)
sessions[session_id] = session
logger.info(f"{getSessionName(session)}: New session created.")
else:
logger.info(f"{getSessionName(session)}: Existing session resumed.")
return {
"id": session_id,
"name": session.name if session.name else None,
"lobbies": [lobby.name for lobby in sessions[session_id].lobbies.values()],
}
@app.get(public_url + "api/lobby/{lobby_name}/{session_id}")
async def lobby(
request: Request,
response: Response,
lobby_name: str | None = Path(...),
session_id: str | None = Path(...),
):
if lobby_name is None:
return {"error": "Missing lobby_name"}
if session_id is None:
return {"error": "Missing session_id"}
session = getSession(session_id)
if not session:
return {"error": f"Session not found ({session_id})"}
lobby = getLobbyByName(lobby_name)
if not lobby:
lobby = Lobby(lobby_name)
lobbies[lobby.id] = lobby
logger.info(
f"{getSessionName(session)} <- lobby_create({lobby.short}:{lobby.name})"
)
lobby.addSession(sessions[session_id])
sessions[session_id].lobbies[lobby.id] = lobby
return {"lobby": lobby.id}
all_label = "[ all ]"
info_label = "[ info ]"
todo_label = "[ todo ]"
unset_label = "[ ---- ]"
# Join the media session in a lobby
async def join(
lobby: Lobby,
session: Session,
has_video: bool,
has_audio: bool,
):
if not session.name:
logger.error(
f"{session.short}:[UNSET] <- join - No name set yet. Media not available."
)
return
if not session.ws:
logger.error(
f"{session.short}:{session.name} - No WebSocket connection. Media not available."
)
return
logger.info(f"{getSessionName(session)} <- join({getLobbyName(lobby)})")
if session.id in lobby.peers:
logger.info(f"{getSessionName(session)} - Already joined to Media.")
return
# Notify all existing RTC peers
for peer_session in lobby.peers.values():
if not peer_session.ws:
logger.warning(
f"{getSessionName(peer_session)} - No WebSocket connection. Skipping."
)
continue
logger.info(
f"{getSessionName(peer_session)} -> addPeer({getSessionName(session), getLobbyName(lobby)})"
)
await peer_session.ws.send_json(
{
"type": "addPeer",
"data": {
"peer_id": session.id,
"peer_name": session.name,
"should_create_offer": False,
"has_audio": has_audio,
"has_video": has_video,
},
}
)
# Add each other peer to the caller
if session.ws:
logger.info(
f"{getSessionName(session)} -> addPeer({getSessionName(peer_session), getLobbyName(lobby)})"
)
await session.ws.send_json(
{
"type": "addPeer",
"data": {
"peer_id": peer_session.id,
"peer_name": peer_session.name,
"should_create_offer": True,
"has_audio": peer_session.has_audio,
"has_video": peer_session.has_video,
},
}
)
# Add this user as an RTC peer
lobby.peers[session.id] = session
await update_users(lobby)
async def part(
lobby: Lobby,
session: Session,
):
if session.id not in lobby.peers:
logger.info(
f"{getSessionName(session)}: <- part({getLobbyName(lobby)}) - Does not exist in RTC peers."
)
return
logger.info(
f"{getSessionName(session)}: <- part({getLobbyName(lobby)}) - Media part."
)
del lobby.peers[session.id]
# Remove this peer from all other RTC peers, and remove each peer from this peer
for peer_session in lobby.peers.values():
if not peer_session.ws:
logger.warning(
f"{getSessionName(peer_session)} <- part({getLobbyName(lobby)}) - No WebSocket connection. Skipping."
)
continue
logger.info(
f"{getSessionName(peer_session)} <- remove_peer({getSessionName(session)})"
)
await peer_session.ws.send_json(
{"type": "remove_peer", "data": {"peer_id": session.id}}
)
if session.ws:
logger.info(
f"{getSessionName(session)} <- remove_peer({getSessionName(peer_session)})"
)
await session.ws.send_json(
{"type": "remove_peer", "data": {"peer_id": peer_session.id}}
)
else:
logger.error(
f"{getSessionName(session)} <- part({getLobbyName(lobby)}) - No WebSocket connection."
)
async def update_users(lobby: Lobby, requesting_session: Session | None = None):
users = [
{"name": s.name, "live": True if s.ws else False, "session_id": s.id}
for s in lobby.sessions.values()
if s.name
]
if requesting_session:
logger.info(
f"{requesting_session.short}:{requesting_session.name} -> list_users({lobby.name})"
)
if requesting_session.ws:
await requesting_session.ws.send_json({"type": "users", "users": users})
else:
logger.warning(
f"{requesting_session.short}:{requesting_session.name} - No WebSocket connection."
)
else:
for s in lobby.sessions.values():
logger.info(
f"{s.short}:{s.name if s.name else unset_label} -> list_users({lobby.name})"
)
if s.ws:
await s.ws.send_json({"type": "users", "users": users})
def getSessionName(session: Session) -> str:
return f"{session.short}:{session.name if session.name else unset_label}"
def getLobbyName(lobby: Lobby) -> str:
return f"{lobby.short}:{lobby.name}"
# Register websocket endpoint directly on app with full public_url path
@app.websocket(f"{public_url}" + "ws/lobby/{lobby_id}/{session_id}")
async def websocket_lobby(
websocket: WebSocket,
lobby_id: str | None = Path(...),
session_id: str | None = Path(...),
):
await websocket.accept()
if lobby_id is None:
await websocket.send_json(
{"type": "error", "error": "Invalid or missing lobby"}
)
await websocket.close()
return
if session_id is None:
await websocket.send_json(
{"type": "error", "error": "Invalid or missing session"}
)
await websocket.close()
return
session = getSession(session_id)
if not session:
logger.error(f"Invalid session ID {session_id}")
await websocket.send_json(
{"type": "error", "error": f"Invalid session ID {session_id}"}
)
await websocket.close()
return
lobby = getLobby(lobby_id)
if not lobby:
logger.error(f"Invalid lobby ID {lobby_id}")
await websocket.send_json(
{"type": "error", "error": f"Invalid lobby ID {lobby_id}"}
)
await websocket.close()
return
logger.info(
f"{getSessionName(session)} <- lobby_connect({lobby.short}:{lobby.name})"
)
session.ws = websocket
# This user session just went from Dead to Live, so update everyone's user list
await update_users(lobby)
try:
while True:
data = await websocket.receive_json()
# logger.info(f"{getSessionName(session)} <- RAW Rx: {data}")
match data.get("type"):
case "set_name":
name = data.get("name")
if not name:
await websocket.send_json(
{"type": "error", "error": "Name required"}
)
continue
# Check for duplicate name
if any(s.name.lower() == name.lower() for s in sessions.values()):
await websocket.send_json(
{"type": "error", "error": "Name already taken"}
)
continue
session.name = name
logger.info(
f"{getSessionName(session)} <- set_name({session.name})"
)
await websocket.send_json({"type": "update", "name": name})
await update_users(lobby)
case "list_users":
await update_users(lobby, session)
case "media_status":
has_audio = data.get("audio", False)
has_video = data.get("video", False)
logger.info(
f"{getSessionName(session)}: <- media-status(audio: {has_audio}, video: {has_video})"
)
session.has_audio = has_audio
session.has_video = has_video
case "join":
has_audio = data.get("audio", False)
has_video = data.get("video", False)
await join(lobby, session, has_video, has_audio)
case "part":
await part(lobby, session)
case "relayICECandidate":
logger.info(f"{getSessionName(session)} <- relayICECandidate")
if session.id not in lobby.peers:
logger.error(
f"{session.short}:{session.name} <- relayICECandidate - Not an RTC peer"
)
return
peer_id = data.get("config", {}).get("peer_id")
candidate = data.get("config", {}).get("candidate")
message = {
"type": "iceCandidate",
"data": {"peer_id": session.id, "candidate": candidate},
}
if peer_id in lobby.peers:
ws = lobby.peers[peer_id].ws
if not ws:
logger.warning(
f"{lobby.peers[peer_id].short}:{lobby.peers[peer_id].name} - No WebSocket connection. Skipping."
)
return
await ws.send_json(message)
case "relaySessionDescription":
logger.info(f"{getSessionName(session)} <- relaySessionDescription")
if session.id not in lobby.peers:
logger.error(
f"{session.short}:{session.name} - relaySessionDescription - Not an RTC peer"
)
return
peer_id = data.get("config", {}).get("peer_id")
session_description = data.get("config", {}).get(
"session_description"
)
message = {
"type": "sessionDescription",
"data": {
"peer_id": session.id,
"session_description": session_description,
},
}
if peer_id in lobby.peers:
ws = lobby.peers[peer_id].ws
if not ws:
logger.warning(
f"{lobby.peers[peer_id].short}:{lobby.peers[peer_id].name} - No WebSocket connection. Skipping."
)
return
await ws.send_json(message)
case _:
await websocket.send_json(
{
"type": "error",
"error": f"Unknown request type: {data.get('type')}",
}
)
except WebSocketDisconnect:
logger.info(f"{getSessionName(session)} <- WebSocket disconnected for user.")
# Cleanup: remove session from lobby and sessions dict
session.ws = None
if lobby and session:
await part(lobby, session)
await update_users(lobby)
# if session_id in sessions:
# del sessions[session_id]
# Serve static files or proxy to frontend development server
PRODUCTION = os.getenv("PRODUCTION", "false").lower() == "true"
client_build_path = os.path.join(os.path.dirname(__file__), "/client/build")
if PRODUCTION:
logger.info(f"Serving static files from: {client_build_path} at {public_url}")
app.mount(
public_url, StaticFiles(directory=client_build_path, html=True), name="static"
)
else:
logger.info(f"Proxying static files to http://static-frontend:3000 at {public_url}")
import ssl
@app.api_route(
f"{public_url}{{path:path}}",
methods=["GET", "POST", "PUT", "DELETE", "OPTIONS", "HEAD", "PATCH"],
)
async def proxy_static(request: Request, path: str):
# Do not proxy API or websocket paths
if path.startswith("api/") or path.startswith("ws/"):
return Response(status_code=404)
url = f"{request.url.scheme}://static-frontend:3000/{public_url.strip('/')}/{path}"
if not path:
url = f"{request.url.scheme}://static-frontend:3000/{public_url.strip('/')}"
headers = dict(request.headers)
try:
# Accept self-signed certs in dev
async with httpx.AsyncClient(verify=False) as client:
proxy_req = client.build_request(
request.method, url, headers=headers, content=await request.body()
)
proxy_resp = await client.send(proxy_req, stream=True)
content = await proxy_resp.aread()
# Remove problematic headers for browser decoding
filtered_headers = {
k: v
for k, v in proxy_resp.headers.items()
if k.lower()
not in ["content-encoding", "transfer-encoding", "content-length"]
}
return Response(
content=content,
status_code=proxy_resp.status_code,
headers=filtered_headers,
)
except Exception as e:
logger.error(f"Proxy error for {url}: {e}")
return Response("Proxy error", status_code=502)
# WebSocket proxy for /ws (for React DevTools, etc.)
import websockets
import asyncio
from starlette.websockets import WebSocket as StarletteWebSocket
@app.websocket("/ws")
async def websocket_proxy(websocket: StarletteWebSocket):
logger.info("REACT: WebSocket proxy connection established.")
# Get scheme from websocket.url (should be 'ws' or 'wss')
scheme = websocket.url.scheme if hasattr(websocket, "url") else "ws"
target_url = f"{scheme}://static-frontend:3000/ws"
await websocket.accept()
try:
# Accept self-signed certs in dev for WSS
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
async with websockets.connect(target_url, ssl=ssl_ctx) as target_ws:
async def client_to_server():
while True:
msg = await websocket.receive_text()
await target_ws.send(msg)
async def server_to_client():
while True:
msg = await target_ws.recv()
if isinstance(msg, str):
await websocket.send_text(msg)
else:
await websocket.send_bytes(msg)
try:
await asyncio.gather(client_to_server(), server_to_client())
except (WebSocketDisconnect, websockets.ConnectionClosed):
logger.info("REACT: WebSocket proxy connection closed.")
except Exception as e:
logger.error(f"REACT: WebSocket proxy error: {e}")
await websocket.close()