Prior to refactor MediaControl

This commit is contained in:
James Ketr 2025-08-27 17:47:01 -07:00
parent 6588672a3c
commit 19c5e03ab2
7 changed files with 1014 additions and 575 deletions

2
.gitignore vendored
View File

@ -1,3 +1,5 @@
server/sessions.json
# Node
node_modules/
build/

153
README.md
View File

@ -123,3 +123,156 @@ The response is then passed through the text-to-speech processor, with the outpu
Contributions and feature requests are welcome!
# Message sequence for WebRTC application
This application provides session management, lobby management, and WebRTC signaling.
## Phase 1: Initial Connection & Session Management
```
Frontend Backend
| |
|----- HTTP Request ------>| (Initial page load)
| | Check session cookie
| | If no cookie -> create new session
| | If cookie exists -> validate session
|<---- HTTP Response ------| Set/update session cookie
| |
|----- WebSocket Conn ---->| Upgrade to WebSocket
| | Associate WebSocket with session
|<---- session_established-| { sessionId }
```
## Phase 2: Lobby Management
### Creating a Lobby:
```
Frontend A Backend
| |
|----- create_lobby ------>| { lobbyName, settings }
| | Create lobby instance
| | Add user to lobby
|<---- lobby_created ------| { lobbyId, lobbyInfo }
```
### Joining a Lobby:
```
Frontend B Backend Frontend A
|----- ws:join_lobby ----->| { lobbyId } |
| | Add user to lobby |
|<---- ws:lobby_joined ----| { lobbyInfo } |
|<---- ws:lobby_state -----| { participants: [...] } |
| | |
| |--- ws: user_joined ----->| { newUser }
```
## Phase 3: WebRTC Signaling Initiation
When all required participants are in the lobby, the backend initiates WebRTC negotiation:
```
Frontend A Backend Frontend B
| | |
| | Check if conditions |
| | are met for WebRTC |
|<--- start_webrtc_nego ---| { participants } |
| |--- start_webrtc_nego --->| { participants }
| | |
| Create RTCPeerConnection | | Create RTCPeerConnection
| Set up local media | | Set up local media
| | |
|<-- negotiation_needed ---| |--- negotiation_needed --->|
```
## Phase 4: WebRTC Offer/Answer Exchange
```
Frontend A (Initiator) Backend Frontend B (Receiver)
| | |
| createOffer() | |
| setLocalDescription() | |
| | |
|----- webrtc_offer ------>| { offer, targetUser } |
| |------ webrtc_offer ----->|
| | | setRemoteDescription()
| | | createAnswer()
| | | setLocalDescription()
| | |
| |<----- webrtc_answer -----| { answer, targetUser }
|<----- webrtc_answer -----| |
| setRemoteDescription() | |
```
## Phase 5: ICE Candidate Exchange
```
Frontend A Backend Frontend B
| | |
| ICE gathering starts | | ICE gathering starts
| | |
|------ ice_candidate ---->| { candidate, target } |
| |----- ice_candidate ----->| addIceCandidate()
| | |
| |<---- ice_candidate ------| { candidate, target }
|<----- ice_candidate -----| | addIceCandidate()
| | |
| (Repeat for all ICE candidates collected) |
```
## Phase 6: Connection Establishment & State Management
```
Frontend A Backend Frontend B
| | |
| onconnectionstatechange | | onconnectionstatechange
| | |
|--- webrtc_state_change ->| { state: "connecting" } |
| |-- webrtc_state_change -->| { state: "connecting" }
| | |
| P2P Connection Established (WebRTC direct) |
|<===================== Direct Media Flow ===========>|
| | |
|-- webrtc_state_change -->| { state: "connected" } |
| |-- webrtc_state_change -->| { state: "connected" }
| | |
|<---- connection_ready ---| |
| |----- connection_ready -->|
```
## Key Message Types
### Session Management:
- `session_established` - Confirms session creation/restoration
- `session_expired` - Session timeout notification
### Lobby Management:
- `create_lobby` / `lobby_created`
- `join_lobby` / `lobby_joined`
- `leave_lobby` / `user_left`
- `lobby_state` - Current lobby participants and settings
- `lobby_destroyed` - Lobby cleanup
### WebRTC Signaling:
- `start_webrtc_negotiation` - Triggers WebRTC setup
- `webrtc_offer` - SDP offer
- `webrtc_answer` - SDP answer
- `ice_candidate` - ICE candidate exchange
- `webrtc_state_change` - Connection state updates
- `connection_ready` - P2P connection established
### Error Handling:
- `error` - Generic error message
- `lobby_full` - Lobby at capacity
- `webrtc_failed` - WebRTC negotiation failure
- `session_invalid` - Session validation failed
## Implementation Considerations:
1. **Session Persistence**: Store session data in Redis/database for horizontal scaling
2. **Lobby State**: Maintain lobby state in memory with periodic persistence
3. **WebSocket Management**: Handle reconnections and cleanup properly
4. **WebRTC Timeout**: Implement timeouts for offer/answer and ICE gathering
5. **Error Recovery**: Graceful fallbacks when WebRTC negotiation fails
6. **Security**: Validate session cookies and sanitize all incoming messages
The backend acts as the signaling server, routing WebRTC negotiation messages between peers while managing application state. Once the P2P connection is established, media flows directly between clients, but the WebSocket connection remains for application-level messaging.

View File

@ -13,35 +13,42 @@ console.log(`AI Voice Chat Build: ${process.env.REACT_APP_AI_VOICECHAT_BUILD}`);
type LobbyProps = {
session: Session;
setSession: React.Dispatch<React.SetStateAction<Session | null>>;
setError: React.Dispatch<React.SetStateAction<string | null>>;
};
const Lobby: React.FC<LobbyProps> = (props: LobbyProps) => {
const { session } = props;
type Lobby = {
id: string;
name: string;
private: boolean;
};
const LobbyView: React.FC<LobbyProps> = (props: LobbyProps) => {
const { session, setSession, setError } = props;
const { lobbyName = "default" } = useParams<{ lobbyName: string }>();
const [lobbyId, setLobbyId] = useState<string | null>(null);
const [lobby, setLobby] = useState<Lobby | null>(null);
const [editName, setEditName] = useState<string>("");
const [error, setError] = useState<string | null>(null);
const [socketUrl, setSocketUrl] = useState<string | null>(null);
const socket = useWebSocket(socketUrl, {
onOpen: () => console.log("WebSocket connection opened."),
onClose: () => console.log("WebSocket connection closed."),
onError: (event) => console.error("WebSocket error observed:", event),
onMessage: (event) => console.log("WebSocket message received:"),
shouldReconnect: (closeEvent) => true, // Will attempt to reconnect on all close events.
onOpen: () => console.log("app - WebSocket connection opened."),
onClose: () => console.log("app - WebSocket connection closed."),
onError: (event) => console.error("app - WebSocket error observed:", event),
// onMessage: (event) => console.log("WebSocket message received:"),
// shouldReconnect: (closeEvent) => true, // Will attempt to reconnect on all close events.
reconnectInterval: 3000,
share: true,
});
const { sendJsonMessage, lastJsonMessage, readyState } = socket;
useEffect(() => {
if (lobbyId && session) {
setSocketUrl(`${ws_base}/${lobbyId}/${session.id}`);
if (lobby && session) {
setSocketUrl(`${ws_base}/${lobby.id}/${session.id}`);
}
}, [lobbyId, session]);
}, [lobby, session]);
useEffect(() => {
if (!lastJsonMessage) {
if (!lastJsonMessage || !session) {
return;
}
const data: any = lastJsonMessage;
@ -49,7 +56,7 @@ const Lobby: React.FC<LobbyProps> = (props: LobbyProps) => {
case "update":
if ("name" in data) {
console.log(`Lobby - name set to ${data.name}`);
session.name = data.name;
setSession((s) => (s ? { ...s, name: data.name } : null));
}
break;
case "error":
@ -59,24 +66,31 @@ const Lobby: React.FC<LobbyProps> = (props: LobbyProps) => {
default:
break;
}
}, [lastJsonMessage]);
}, [lastJsonMessage, session, setError, setSession]);
useEffect(() => {
console.log("WebSocket connection status: ", readyState);
console.log("app - WebSocket connection status: ", readyState);
}, [readyState]);
useEffect(() => {
if (!session || !lobbyName) {
return;
}
const getLobbyId = async (lobbyName: string, session: Session) => {
const res = await fetch(`${base}/api/lobby/${lobbyName}/${session.id}`, {
method: "GET",
const getLobby = async (lobbyName: string, session: Session) => {
const res = await fetch(`${base}/api/lobby/${session.id}`, {
method: "POST",
cache: "no-cache",
credentials: "same-origin",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
type: "lobby_create",
data: {
name: lobbyName,
private: false,
},
}),
});
if (res.status >= 400) {
@ -91,17 +105,23 @@ const Lobby: React.FC<LobbyProps> = (props: LobbyProps) => {
setError(data.error);
return;
}
setLobbyId(data.lobby);
if (data.type !== "lobby_created") {
console.error(`Lobby - Unexpected response type: ${data.type}`);
setError(`Unexpected response from server`);
return;
}
const lobby: Lobby = data.data;
console.log(`Lobby - Joined lobby`, lobby);
setLobby(lobby);
};
getLobbyId(lobbyName, session);
}, [session, lobbyName, setLobbyId]);
getLobby(lobbyName, session);
}, [session, lobbyName, setLobby, setError]);
const setName = (name: string) => {
sendJsonMessage({
type: "set_name",
name: name,
data: { name },
});
};
@ -160,29 +180,19 @@ const Lobby: React.FC<LobbyProps> = (props: LobbyProps) => {
{session.name && (
<>
{session.lobbies.map((lobby: string) => (
{/* {session.lobbies.map((lobby: string) => (
<Box key={lobby}>
<Button
variant="contained"
href={`${base}/${lobby}`}
disabled={lobby === lobbyName}
sx={{ mr: 1, mb: 1 }}
>
<Button variant="contained" disabled={lobby === lobbyName} sx={{ mr: 1, mb: 1 }}>
{lobby === lobbyName ? `In Lobby: ${lobby}` : `Join Lobby: ${lobby}`}
</Button>
</Box>
))}
))} */}
{session && socketUrl && <UserList socketUrl={socketUrl} session={session} />}
</>
)}
</>
)}
{error && (
<Paper className="Error" sx={{ p: 2, m: 2, width: "fit-content", backgroundColor: "#ffdddd" }}>
<Typography color="red">{error}</Typography>
</Paper>
)}
</Paper>
);
};
@ -191,6 +201,12 @@ const App = () => {
const [session, setSession] = useState<Session | null>(null);
const [error, setError] = useState<string | null>(null);
useEffect(() => {
if (error) {
setTimeout(() => setError(null), 5000);
}
}, [error]);
useEffect(() => {
if (!session) {
return;
@ -236,8 +252,8 @@ const App = () => {
{session && (
<Router>
<Routes>
<Route element={<Lobby session={session} />} path={`${base}/:lobbyName`} />
<Route element={<Lobby session={session} />} path={`${base}`} />
<Route element={<LobbyView {...{ setError, session, setSession }} />} path={`${base}/:lobbyName`} />
<Route element={<LobbyView {...{ setError, session, setSession }} />} path={`${base}`} />
</Routes>
</Router>
)}

View File

@ -1 +0,0 @@
type LobbyMessage = {}

View File

@ -8,7 +8,7 @@ import Mic from "@mui/icons-material/Mic";
import VideocamOff from "@mui/icons-material/VideocamOff";
import Videocam from "@mui/icons-material/Videocam";
import Box from "@mui/material/Box";
import useWebSocket from "react-use-websocket";
import useWebSocket, { ReadyState } from "react-use-websocket";
import { Session } from "./GlobalContext";
const debug = true;
@ -75,7 +75,8 @@ const createAnimatedVideoTrack = ({ width = 320, height = 240 } = {}): MediaStre
// Start animation - CRITICAL: Request animation frame for better performance
function animate() {
drawFrame();
requestAnimationFrame(animate);
//requestAnimationFrame(animate);
setTimeout(animate, 1000 / 10); // 10 FPS
}
animate();
@ -124,8 +125,6 @@ const createSilentAudioTrack = (): MediaStreamTrack => {
interface Peer {
session_id: string;
peerName: string;
has_audio: boolean;
has_video: boolean;
attributes: Record<string, any>;
muted: boolean;
video_on: boolean /* Set by client */;
@ -136,17 +135,9 @@ interface Peer {
}
export type { Peer };
interface TrackContext {
media: MediaStream | null;
has_audio: boolean;
has_video: boolean;
}
interface AddPeerConfig {
peer_id: string;
peer_name: string;
has_audio: boolean;
has_video: boolean;
should_create_offer?: boolean;
}
@ -216,12 +207,18 @@ type MediaAgentProps = {
setPeers: (peers: Record<string, Peer>) => void;
};
type JoinStatus = {
status: "Not joined" | "Joining" | "Joined" | "Error";
message?: string;
};
const MediaAgent = (props: MediaAgentProps) => {
const { peers, setPeers, socketUrl, session } = props;
// track: null = no local media, TrackContext = local media
const [context, setContext] = useState<TrackContext | null>(null);
const [joinStatus, setJoinStatus] = useState<JoinStatus>({ status: "Not joined" });
// track: null = no local media, MediaStream = local media
const [media, setMedia] = useState<MediaStream | null>(null);
const { sendJsonMessage, lastJsonMessage } = useWebSocket(socketUrl, {
const { sendJsonMessage, lastJsonMessage, readyState } = useWebSocket(socketUrl, {
share: true,
onError: (err) => {
console.error(err);
@ -238,7 +235,7 @@ const MediaAgent = (props: MediaAgentProps) => {
continue;
}
if (peers[peer_id].connection) {
peers[peer_id].connection.close();
peers[peer_id].connection?.close();
peers[peer_id].connection = undefined;
}
}
@ -249,54 +246,48 @@ const MediaAgent = (props: MediaAgentProps) => {
}
if (debug) console.log(`media-agent - close`, peers);
setPeers(Object.assign({}, peers));
setPeers({ ...peers });
},
});
const onTrack = useCallback(
(event: RTCTrackEvent) => {
const connection = event.target as RTCPeerConnection;
console.log("media-agent - ontrack", event);
for (let peer in peers) {
if (peers[peer].connection === connection) {
console.log(`media-agent - ontrack - remote ${peer} track assigned.`);
Object.assign(peers[peer].attributes, {
srcObject: event.streams[0] || event.track,
});
setPeers(Object.assign({}, peers));
}
}
},
[peers, setPeers]
);
const refOnTrack = useRef(onTrack);
const addPeer = useCallback(
(config: AddPeerConfig) => {
console.log("media-agent - addPeer - ", { config, peers });
if (config.peer_id in peers) {
if (!media) {
console.log("media-agent - addPeer - No local media yet, deferring");
return;
}
const peer_id = config.peer_id;
if (peer_id in peers) {
if (!peers[config.peer_id].dead) {
console.log(`media-agent - addPeer - ${config.peer_id} already in peers`);
return;
}
}
const peer: Peer = {
session_id: config.peer_id,
session_id: peer_id,
peerName: config.peer_name,
has_audio: config.has_audio,
has_video: config.has_video,
attributes: {},
muted: false,
video_on: true,
local: false,
dead: false,
};
if (config.peer_id in peers) {
if (peer_id in peers) {
peer.muted = peers[config.peer_id].muted;
peer.video_on = peers[config.peer_id].video_on;
console.log(`media-agent - addPeer - reviving dead peer ${peer.peerName}`);
} else {
peer.muted = false;
peer.video_on = true;
peers[peer_id] = peer;
console.log(`media-agent - addPeer - starting new peer ${peer.peerName}`);
}
peers[config.peer_id] = peer;
console.log(`media-agent - addPeer - remote`, peers);
setPeers({ ...peers });
const connection = new RTCPeerConnection({
iceServers: [
{
@ -306,7 +297,9 @@ const MediaAgent = (props: MediaAgentProps) => {
},
],
});
peer.connection = connection;
connection.addEventListener("connectionstatechange", (event) => {
console.log(`media-agent - connectionstatechange - `, connection.connectionState, event);
if (connection.connectionState === "failed") {
@ -316,43 +309,80 @@ const MediaAgent = (props: MediaAgentProps) => {
setPeers({ ...peers });
}
});
connection.addEventListener("negotiationneeded", (event) => {
console.log(`media-agent - negotiationneeded - `, connection.connectionState, event);
});
connection.addEventListener("icecandidateerror", (event: RTCPeerConnectionIceErrorEvent) => {
if (event.errorCode === 701) {
connection.addEventListener("icecandidateerror", (event: Event) => {
const evt = event as RTCPeerConnectionIceErrorEvent;
if (evt.errorCode === 701) {
if (connection.iceGatheringState === "gathering") {
console.log(`media-agent - Unable to reach host: ${event.url}`);
console.error(`media-agent - Unable to reach host negotiating for ${peer.peerName}:`, evt);
} else {
console.error(
`media-agent - icecandidateerror - `,
event.errorCode,
(event as any).hostcandidate,
event.url,
event.errorText
);
console.error(`media-agent - icecandidateerror negotiating for ${peer.peerName}:`, evt);
}
} else {
console.error(`media-agent - icecandidateerror for ${peer.peerName}:`, evt);
}
console.log("media-agent - icecandidateerror attempting addPeer again in 3 seconds");
peer.dead = true;
});
connection.onicecandidate = (event: RTCPeerConnectionIceEvent) => {
if (!event.candidate) {
console.log(`media-agent - icecanditate - gathering is complete: ${connection.connectionState}`);
return;
}
/* If a srflx candidate was found, notify that the STUN server works! */
if (event.candidate.type === "srflx") {
console.log("media-agent - The STUN server is reachable!");
console.log(`media-agent - Your Public IP Address is: ${event.candidate.address}`);
}
/* If a relay candidate was found, notify that the TURN server works! */
if (event.candidate.type === "relay") {
console.log("media-agent - The TURN server is reachable !");
}
console.log(`media-agent - onicecandidate - `, event.candidate);
sendJsonMessage({
type: "relayICECandidate",
config: {
peer_id: config.peer_id,
data: {
peer_id,
candidate: event.candidate,
},
});
};
connection.ontrack = (e: RTCTrackEvent) => {
console.log("media-agent - ontrack event received", e);
console.log("Stream:", e.streams[0]);
console.log("Track:", e.track);
refOnTrack.current(e);
connection.ontrack = (event: RTCTrackEvent) => {
console.log("media-agent - ontrack event received", event);
// Always normalize to a MediaStream
let stream: MediaStream;
if (event.streams && event.streams[0]) {
stream = event.streams[0];
} else {
stream = new MediaStream();
stream.addTrack(event.track);
}
console.log("media-agent - ontrack - Stream:", stream);
console.log("media-agent - ontrack - Track:", event.track);
for (let peerId in peers) {
if (peers[peerId].connection === connection) {
console.log(`media-agent - ontrack - remote ${peerId} track assigned.`);
peers[peerId].attributes = {
...peers[peerId].attributes,
srcObject: stream, // ✅ always a MediaStream
};
setPeers({ ...peers });
}
}
};
connection.oniceconnectionstatechange = (event) => {
console.log(`media-agent - iceconnectionstatechange - `, connection.iceConnectionState, event);
if (connection.iceConnectionState === "failed") {
@ -363,25 +393,23 @@ const MediaAgent = (props: MediaAgentProps) => {
}
};
// Only add local tracks if present
if (context && context.media) {
console.log("Adding local tracks to new peer connection");
context.media.getTracks().forEach((t) => {
console.log("Adding track:", t.kind, t.enabled);
connection.addTrack(t, context.media!);
});
} else {
console.log("No local tracks available when creating peer");
}
console.log("Adding local tracks to new peer connection");
media.getTracks().forEach((t) => {
console.log("Adding track:", t.kind, t.enabled);
connection.addTrack(t, media!);
});
if (config.should_create_offer) {
console.log(`media-agent - Creating RTC offer to ${peer.peerName}`);
connection
.createOffer()
.then((local_description) => {
console.log(`media-agent - Local offer description is: `, local_description);
return connection.setLocalDescription(local_description).then(() => {
sendJsonMessage({
type: "relaySessionDescription",
config: {
peer_id: config.peer_id,
data: {
peer_id,
session_description: local_description,
},
});
@ -392,90 +420,116 @@ const MediaAgent = (props: MediaAgentProps) => {
});
}
},
[peers, setPeers, context, sendJsonMessage]
[peers, setPeers, media, sendJsonMessage]
);
const sessionDescription = useCallback(
({ peer_id, session_description }: SessionDescriptionData) => {
async (props: SessionDescriptionData) => {
const { peer_id, session_description } = props;
const peer = peers[peer_id];
if (!peer || !peer.connection) {
console.error(`media-agent - sessionDescription - No peer for ${peer.peerName}`);
return;
}
console.log(`media-agent - sessionDescription - `, { peer_id, session_description, peer });
if (!peer?.connection) return;
const pc = peer.connection;
const desc = new RTCSessionDescription(session_description);
peer.connection
.setRemoteDescription(desc)
.then(() => {
console.log("Remote description set successfully");
// --- Decide if we're allowed to apply it, before setRemoteDescription ---
if (desc.type === "answer") {
// Only valid when we're the offerer waiting for an answer
if (pc.signalingState !== "have-local-offer") {
console.warn(`media-agent - sessionDescription - Ignoring remote answer; signalingState=${pc.signalingState}`);
return;
}
} else if (desc.type === "offer") {
// Offers are only valid when we're ready to accept them.
// If we also have a local offer, it's glare; rollback first.
if (pc.signalingState === "have-local-offer") {
console.log(
"media-agent - sessionDescription - Glare detected: performing rollback before applying remote offer"
);
await pc.setLocalDescription({ type: "rollback" } as any);
} else if (pc.signalingState !== "stable") {
console.warn(`media-agent - sessionDescription - Ignoring offer; signalingState=${pc.signalingState}`);
return;
}
}
// --- Now it is safe to apply the remote description ---
try {
await pc.setRemoteDescription(desc);
console.log("media-agent - sessionDescription - setRemoteDescription succeeded");
} catch (err) {
console.error("media-agent - sessionDescription - Failed to set remote description:", err);
return;
}
// Process queued ICE candidates after remote description is set
if (peer.queuedCandidates && peer.queuedCandidates.length > 0) {
console.log(`Processing ${peer.queuedCandidates.length} queued candidates`);
const candidatePromises = peer.queuedCandidates.map((candidate) =>
peer.connection!.addIceCandidate(new RTCIceCandidate(candidate))
);
// Process queued candidates after a successful remote description
if (peer.queuedCandidates?.length) {
console.log(`media-agent - sessionDescription - Processing ${peer.queuedCandidates.length} queued candidates`);
try {
await Promise.all(peer.queuedCandidates.map((c) => pc.addIceCandidate(new RTCIceCandidate(c))));
peer.queuedCandidates = [];
console.log("media-agent - sessionDescription - All queued candidates processed");
} catch (err) {
console.error("media-agent - sessionDescription - Error processing queued candidates:", err);
}
}
Promise.all(candidatePromises)
.then(() => {
console.log("All queued candidates processed");
peer.queuedCandidates = [];
})
.catch((err) => console.error("Error processing queued candidates:", err));
}
// Answer only if we just accepted an offer and we're in have-remote-offer
if (desc.type === "offer" && pc.signalingState === "have-remote-offer") {
console.log("media-agent - sessionDescription - Creating answer for received offer");
try {
const answer = await pc.createAnswer();
await pc.setLocalDescription(answer);
sendJsonMessage({
type: "relaySessionDescription",
data: { peer_id, session_description: answer },
});
console.log("media-agent - sessionDescription - Answer setLocalDescription succeeded; sent answer");
} catch (err) {
console.error("media-agent - sessionDescription - Failed to create/send answer:", err);
}
}
// Handle offer/answer logic...
if (session_description.type === "offer") {
console.log("Creating answer for received offer");
return peer.connection!.createAnswer();
}
return null;
})
.then((answer) => {
if (answer && session_description.type === "offer") {
return peer.connection!.setLocalDescription(answer).then(() => {
sendJsonMessage({
type: "relaySessionDescription",
config: {
peer_id: peer_id,
session_description: answer,
},
});
console.log("Answer sent successfully");
});
}
})
.catch((error) => {
console.error("Failed to set remote description:", error);
});
if (desc.type === "answer") {
console.log("media-agent - sessionDescription - Remote answered our offer");
}
},
[peers, sendJsonMessage]
);
const removePeer = useCallback(
({ peer_id }: RemovePeerData) => {
console.log(`media-agent - removePeer - Signaling server said to remove peer ${peer_id}`);
(props: RemovePeerData) => {
const { peer_id } = props;
const peer = peers[peer_id];
if (!peer) {
console.error(`media-agent - removePeer - No peer for ${peer_id}`, peers);
return;
}
console.log(`media-agent - removePeer - Signaling server said to remove peer ${peer.peerName}`);
if (peer_id in peers) {
/* To maintain mute/videoOn states, we don't remove the peer but
* instead mark it as dead */
peers[peer_id].dead = true;
if (peers[peer_id].connection) {
peers[peer_id].connection.close();
peers[peer_id].connection?.close();
peers[peer_id].connection = undefined;
}
}
/* To maintain mute/videoOn states, we don't remove the peer but
* instead mark it as dead */
peers[peer_id].dead = true;
if (debug) console.log(`media-agent - removePeer`, peers);
setPeers(Object.assign({}, peers));
console.log(`media-agent - removePeer`, peers);
setPeers({ ...peers });
},
[peers, setPeers]
);
const iceCandidate = useCallback(
({ peer_id, candidate }: IceCandidateData) => {
(props: IceCandidateData) => {
const { peer_id, candidate } = props;
const peer = peers[peer_id];
console.log(`media-agent - iceCandidate - `, { peer_id, candidate, peer });
if (!peer?.connection) {
console.error(`No peer or connection for ${peer_id}`);
console.error(`media-agent - iceCandidate - No peer for ${peer_id}`, peers);
return;
}
@ -483,26 +537,30 @@ const MediaAgent = (props: MediaAgentProps) => {
if (peer.connection.remoteDescription) {
peer.connection
.addIceCandidate(new RTCIceCandidate(candidate))
.then(() => console.log(`Added ICE candidate for ${peer_id}`))
.catch((err) => console.error("Failed to add ICE candidate:", err));
.then(() => console.log(`media-agent - iceCandidate - Successfully added Ice Candidate for ${peer.peerName}`))
.catch((err) => console.error("media-agent - iceCandidate - Failed to add ICE candidate:", err));
} else {
// Queue the candidate for later processing
if (!peer.queuedCandidates) peer.queuedCandidates = [];
peer.queuedCandidates.push(candidate);
console.log(`Queued ICE candidate for ${peer_id} (no remote description yet)`);
console.log(
`media-agent - iceCandidate - Queued ICE candidate for ${peer.peerName} (no remote description yet)`
);
}
},
[peers]
);
useEffect(() => {
if (!lastJsonMessage) {
if (!lastJsonMessage || !session) {
return;
}
const data: any = lastJsonMessage;
if (!session) {
return;
}
switch (data.type) {
case "join_status":
setJoinStatus({ status: data.status, message: data.message });
break;
case "addPeer":
addPeer(data.data);
break;
@ -521,24 +579,23 @@ const MediaAgent = (props: MediaAgentProps) => {
}, [lastJsonMessage, addPeer, removePeer, iceCandidate, sessionDescription, peers, session]);
useEffect(() => {
refOnTrack.current = onTrack;
});
useEffect(() => {
console.log(`media-control - Context changed`, context);
console.log(`media-control - Context changed`, media, joinStatus);
const join = () => {
if (joinStatus.status === "Joined" || joinStatus.status === "Joining" || joinStatus.status === "Error") {
console.log(
`media-control - Join status: ${joinStatus.status} - ${joinStatus.message || "No message"}, skipping`
);
return;
}
setJoinStatus({ status: "Joining" });
sendJsonMessage({
type: "join",
data: {
has_audio: context && context.has_audio ? context.has_audio : false,
has_video: context && context.has_video ? context.has_video : false,
},
});
};
if (context) {
console.log(`media-control - issuing join request: `, context);
if (media && joinStatus.status === "Not joined") {
console.log(`media-control - issuing join request: `, media);
for (let peer in peers) {
if (peers[peer].local && peers[peer].dead) {
// Mark as alive
@ -548,39 +605,38 @@ const MediaAgent = (props: MediaAgentProps) => {
}
join();
}
}, [context, peers, setPeers, sendJsonMessage]);
}, [media, peers, setPeers, sendJsonMessage, joinStatus]);
useEffect(() => {
if (!context || !context.media) return;
if (!media) return;
console.log("Track changed, updating all peer connections");
console.log("media-agent - Track changed, updating all peer connections");
// Add tracks to all existing peer connections
for (let peer_id in peers) {
const peer = peers[peer_id];
if (peer.connection && !peer.local && !peer.dead) {
console.log(`Adding tracks to existing peer ${peer.peerName}`);
if (!context || !context.media) return;
context.media.getTracks().forEach((t) => {
console.log(`media-agent - Adding tracks to existing peer ${peer.peerName}`);
media?.getTracks().forEach((t) => {
// Check if track is already added
const senders = peer.connection!.getSenders();
const trackAlreadyAdded = senders.some((sender) => sender.track === t);
if (!trackAlreadyAdded) {
console.log(`Adding ${t.kind} track to ${peer.peerName}`);
peer.connection!.addTrack(t, context.media!);
console.log(`media-agent - Adding ${t.kind} track to ${peer.peerName}`);
peer.connection!.addTrack(t, media!);
}
});
}
}
}, [context, peers]);
}, [media, peers]);
useEffect(() => {
if (!session) {
return;
}
let update = false;
if (context && !(session.id in peers)) {
if (media && !(session.id in peers)) {
update = true;
peers[session.id] = {
peerName: session.name || "Unknown",
@ -588,11 +644,9 @@ const MediaAgent = (props: MediaAgentProps) => {
local: true,
muted: true,
video_on: false,
has_video: context.has_video,
has_audio: context.has_audio,
attributes: {
local: true,
srcObject: context.media,
srcObject: media,
},
dead: false,
};
@ -611,43 +665,43 @@ const MediaAgent = (props: MediaAgentProps) => {
if (debug) console.log(`media-agent - Setting global peers`, peers);
setPeers(Object.assign({}, peers));
}
}, [peers, setPeers, context, session]);
}, [peers, session, setPeers, media]);
const setup_local_media = async (): Promise<TrackContext> => {
const setup_local_media = async (): Promise<MediaStream> => {
console.log(`media-agent - Requesting access to local audio / video inputs`);
const context: TrackContext = { media: null, has_audio: true, has_video: true };
const attempt = { get_audio: true, get_video: true };
let media = null;
// Try to get user media with fallback logic
while (context.has_audio || context.has_video) {
console.log(context);
while (attempt.get_audio || attempt.get_video) {
try {
const constraints: any = {};
if (context.has_audio) {
if (attempt.get_audio) {
constraints.audio = true;
}
if (context.has_video) {
if (attempt.get_video) {
constraints.video = true;
}
console.log(
`media-agent - Attempting to get user media: audio=${context.has_audio}, video=${context.has_video}`
`media-agent - Attempting to get user media: audio=${attempt.get_audio}, video=${attempt.get_video}`
);
context.media = await navigator.mediaDevices.getUserMedia(constraints);
/* Success -- on failure, an exception is thrown */
return context;
media = await navigator.mediaDevices.getUserMedia(constraints);
break;
} catch (error) {
if (context.has_video && context.has_audio) {
if (attempt.get_video && attempt.get_audio) {
console.log(`media-agent - Disabling video and trying just audio`);
context.has_video = false;
context.has_audio = true;
} else if (context.has_audio && !context.has_video) {
attempt.get_video = false;
attempt.get_audio = true;
} else if (attempt.get_audio && !attempt.get_video) {
console.log(`media-agent - Disabling audio and trying just video`);
context.has_video = true;
context.has_audio = false;
attempt.get_video = true;
attempt.get_audio = false;
} else {
console.log(`media-agent - No media available`);
context.has_video = false;
context.has_audio = false;
attempt.get_video = false;
attempt.get_audio = false;
}
}
}
@ -657,9 +711,9 @@ const MediaAgent = (props: MediaAgentProps) => {
let hasRealAudio = false;
let hasRealVideo = false;
if (context.media) {
const audioTracks = context.media.getAudioTracks();
const videoTracks = context.media.getVideoTracks();
if (media) {
const audioTracks = media.getAudioTracks();
const videoTracks = media.getVideoTracks();
if (audioTracks.length > 0) {
tracks.push(audioTracks[0]);
@ -691,11 +745,7 @@ const MediaAgent = (props: MediaAgentProps) => {
}
// Create final media stream
context.media = new MediaStream(tracks);
// Update context flags to reflect what we actually have
context.has_audio = true; //hasRealAudio;
context.has_video = true; //hasRealVideo;
media = new MediaStream(tracks);
const mediaType =
hasRealAudio && hasRealVideo
@ -708,27 +758,25 @@ const MediaAgent = (props: MediaAgentProps) => {
console.log(`media-agent - Final media setup: ${mediaType}`);
return context;
return media;
};
useEffect(() => {
if (!session || !session.name) {
if (media !== null || readyState !== ReadyState.OPEN) {
return;
}
if (context === null) {
setup_local_media()
.then((context) => {
sendJsonMessage({ type: "media_status", ...context, media: undefined });
setContext(context);
})
.catch((error) => {
console.error("media-agent - Failed to get local media:", error);
sendJsonMessage({ type: "media_status", has_audio: false, has_video: false, media: undefined });
setContext(null);
});
}
}, [context, session, sendJsonMessage]);
console.log(`media-agent - Setting up local media (socket readyState=${readyState})`);
setup_local_media()
.then((media) => {
setMedia(media);
})
.catch((error) => {
console.error("media-agent - Failed to get local media:", error);
setMedia(null);
});
}, [readyState, setMedia, media]);
return <></>;
};
@ -740,8 +788,8 @@ interface MediaControlProps {
}
const MediaControl: React.FC<MediaControlProps> = ({ isSelf, peer, className }) => {
const [media, setMedia] = useState<Peer | undefined>(undefined);
const [muted, setMuted] = useState<boolean | undefined>(undefined);
const [videoOn, setVideoOn] = useState<boolean | undefined>(undefined);
const [muted, setMuted] = useState<boolean>(false);
const [videoOn, setVideoOn] = useState<boolean>(true);
const [target, setTarget] = useState<Element | undefined>();
const [isValid, setIsValid] = useState<boolean>(false);
const [frame, setFrame] = useState<{ translate: [number, number] }>({
@ -752,8 +800,13 @@ const MediaControl: React.FC<MediaControlProps> = ({ isSelf, peer, className })
console.log(`media-control - peer changed`, peer);
if (peer && peer.peerName) {
const el = document.querySelector(`.MediaControl[data-peer="${peer.session_id}"]`);
console.log(`media-control - setting target for ${peer.peerName}`, el);
setTarget(el ?? undefined);
if (el) {
console.log(`media-control - setting target for ${peer.peerName}`);
setTarget(el);
} else {
console.warn(`media-control - no target for ${peer.peerName}`);
setTarget(undefined);
}
}
}, [setTarget, peer]);
@ -801,7 +854,7 @@ const MediaControl: React.FC<MediaControlProps> = ({ isSelf, peer, className })
console.log(`media-control - media changed`, media);
if (media.attributes.srcObject) {
(media.attributes.srcObject.getAudioTracks() as MediaStreamTrack[]).forEach((track: MediaStreamTrack) => {
track.enabled = media.has_audio && !muted;
track.enabled = !muted;
});
}
}, [muted, media, peer]);
@ -813,10 +866,11 @@ const MediaControl: React.FC<MediaControlProps> = ({ isSelf, peer, className })
if (media.attributes.srcObject) {
console.log(`media-control - video enable - ${peer.peerName}:${videoOn}`);
(media.attributes.srcObject.getVideoTracks() as MediaStreamTrack[]).forEach((track: MediaStreamTrack) => {
track.enabled = Boolean(media.has_video) && Boolean(videoOn);
track.enabled = videoOn;
});
}
});
}, [videoOn, media, peer]);
useEffect(() => {
if (!media || !peer || media.dead || !media.attributes || !media.attributes.srcObject) {
setIsValid(false);
@ -827,8 +881,8 @@ const MediaControl: React.FC<MediaControlProps> = ({ isSelf, peer, className })
setIsValid(true);
}, [media, peer, setIsValid]);
const colorAudio = isValid && media?.has_audio ? "primary" : "disabled",
colorVideo = isValid && media?.has_video ? "primary" : "disabled";
const colorAudio = isValid ? "primary" : "disabled",
colorVideo = isValid ? "primary" : "disabled";
if (!peer) {
console.log(`media-control - no peer`);
@ -867,7 +921,6 @@ const MediaControl: React.FC<MediaControlProps> = ({ isSelf, peer, className })
{isValid && (
<>
<Moveable
sx={{ border: "3px solid blue" }}
pinchable={true}
draggable={true}
// Moveable expects HTMLElement or SVGElement, not just Element

View File

@ -11,7 +11,7 @@ type User = {
name: string;
session_id: string;
live: boolean;
is_self: boolean /* Client side variable */;
local: boolean /* Client side variable */;
};
type UserListProps = {
@ -60,17 +60,21 @@ const UserList: React.FC<UserListProps> = (props: UserListProps) => {
if (!session) {
return;
}
const data = JSON.parse(event.data);
switch (data.type) {
case "users":
console.log(`users - lobby update`, data.users);
const u: User[] = data.users;
u.forEach((user) => {
user.is_self = user.session_id === session.id;
const message = JSON.parse(event.data);
const data: any = message.data;
switch (message.type) {
case "lobby_state":
type LobbyStateData = {
participants: User[];
};
const lobby_state = data as LobbyStateData;
console.log(`users - lobby_state`, lobby_state.participants);
lobby_state.participants.forEach((user) => {
user.local = user.session_id === session.id;
});
u.sort(sortUsers);
setVideoClass(u.length <= 2 ? "Medium" : "Small");
setUsers(u);
lobby_state.participants.sort(sortUsers);
setVideoClass(lobby_state.participants.length <= 2 ? "Medium" : "Small");
setUsers(lobby_state.participants);
break;
default:
break;
@ -87,33 +91,28 @@ const UserList: React.FC<UserListProps> = (props: UserListProps) => {
});
}, [users, sendJsonMessage]);
const userElements: JSX.Element[] = [];
users?.forEach((user: User) => {
console.log(`User: ${user.name}, Is Self: ${user.is_self}, hasPeer: ${peers[user.session_id] ? "Yes" : "No"}`);
userElements.push(
<Box
key={user.name}
sx={{ display: "flex", flexDirection: "column", alignItems: "center", border: "3px solid magenta" }}
className={`UserEntry ${user.is_self ? "UserSelf" : ""}`}
>
<div>
<div className="Name">{user.name ? user.name : user.session_id}</div>
{user.name && !user.live && <div className="NoNetwork"></div>}
</div>
{user.name && user.live && peers[user.session_id] ? (
<MediaControl className={videoClass} peer={peers[user.session_id]} isSelf={user.is_self} />
) : (
<video className="Video"></video>
)}
</Box>
);
});
return (
<Paper className={`UserList ${videoClass}`}>
<MediaAgent {...{ session, socketUrl, peers, setPeers }} />
<List className="UserSelector">{userElements}</List>
<List className="UserSelector">
{users?.map((user) => (
<Box
key={user.name}
sx={{ display: "flex", flexDirection: "column", alignItems: "center", border: "3px solid magenta" }}
className={`UserEntry ${user.local ? "UserSelf" : ""}`}
>
<div>
<div className="Name">{user.name ? user.name : user.session_id}</div>
{user.name && !user.live && <div className="NoNetwork"></div>}
</div>
{user.name && user.live && peers[user.session_id] ? (
<MediaControl className={videoClass} peer={peers[user.session_id]} isSelf={user.local} />
) : (
<video className="Video"></video>
)}
</Box>
))}
</List>
</Paper>
);
};

View File

@ -1,4 +1,7 @@
from __future__ import annotations
from typing import Any, Literal, Optional, TypedDict
from fastapi import (
Body,
Cookie,
FastAPI,
Path,
@ -11,6 +14,9 @@ from fastapi.staticfiles import StaticFiles
import secrets
import os
import httpx
import json
from pydantic import BaseModel
from logger import logger
@ -23,15 +29,282 @@ app = FastAPI()
logger.info(f"Starting server with public URL: {public_url}")
class LobbyResponse(TypedDict):
id: str
name: str
private: bool
class Session:
def __init__(self, id):
_instances: list[Session] = []
_save_file = "sessions.json"
_loaded = False
def __init__(self, id: str):
logger.info(f"Instantiating new session {id}")
self._instances.append(self)
self.id = id
self.short = id[:8]
self.name = ""
self.lobbies: dict[str, Lobby] = {}
self.lobbies: list[Lobby] = [] # List of lobby IDs this session is in
self.lobby_peers: dict[
str, list[str]
] = {} # lobby ID -> list of peer session IDs
self.ws: WebSocket | None = None
self.has_audio = False
self.has_video = False
self.save()
@classmethod
def save(cls):
data: list[dict[str, str | list[LobbyResponse]]] = [
{
"id": s.id,
"name": s.name,
"lobbies": [
{"id": lobby.id, "name": lobby.name, "private": lobby.private}
for lobby in s.lobbies
],
}
for s in cls._instances
]
with open(cls._save_file, "w") as f:
json.dump(data, f, indent=2)
logger.info(f"Saved {len(data)} sessions to {cls._save_file}")
@classmethod
def load(cls):
if not os.path.exists(cls._save_file):
logger.info(f"No session save file found: {cls._save_file}")
return
with open(cls._save_file, "r") as f:
data = json.load(f)
for sdata in data:
session = Session(sdata["id"])
session.name = sdata["name"]
for lobby in sdata.get("lobbies", []):
session.lobbies.append(
Lobby(
name=lobby.get("name"),
id=lobby.get("id"),
private=lobby.get("private", False),
)
)
logger.info(
f"Loaded session {session.getName()} with {len(session.lobbies)} lobbies"
)
for lobby in session.lobbies:
lobbies[lobby.id] = Lobby(
name=lobby.name, id=lobby.id
) # Ensure lobby exists
logger.info(f"Loaded {len(data)} sessions from {cls._save_file}")
@classmethod
def getSession(cls, id: str) -> Session | None:
if not cls._loaded:
cls.load()
logger.info(f"Loaded {len(cls._instances)} sessions from disk...")
cls._loaded = True
for s in cls._instances:
if s.id == id:
return s
return None
@classmethod
def isUniqueName(cls, name: str) -> bool:
if not name:
return False
for s in cls._instances:
if s.name.lower() == name.lower():
return False
return True
def getName(self) -> str:
return f"{self.short}:{self.name if self.name else unset_label}"
def setName(self, name: str):
self.name = name
self.save()
async def join(self, lobby: Lobby):
if not self.ws:
logger.error(
f"{self.getName()} - No WebSocket connection. Lobby not available."
)
return
if lobby.id in self.lobby_peers or self.id in lobby.sessions:
logger.info(f"{self.getName()} - Already joined to {lobby.getName()}.")
await self.ws.send_json(
{
"type": "join_status",
"status": "Error",
"message": f"Already joined to lobby {lobby.getName()}",
}
)
return
# Initialize the peer list for this lobby
self.lobbies.append(lobby)
self.lobby_peers[lobby.id] = []
for peer_id in lobby.sessions:
if peer_id == self.id:
raise Exception(
"Should not happen: self in lobby.sessions while not in lobby."
)
peer_session = lobby.getSession(peer_id)
if not peer_session or not peer_session.ws:
logger.warning(
f"{self.getName()} - Live peer session {peer_id} not found in lobby {lobby.getName()}. Removing."
)
del lobby.sessions[peer_id]
continue
# Add the peer to session's RTC peer list
self.lobby_peers[lobby.id].append(peer_id)
# Add this user as an RTC peer to each existing peer
peer_session.lobby_peers[lobby.id].append(self.id)
logger.info(
f"{self.getName()} -> addPeer({peer_session.getName(), lobby.getName()}, should_create_offer=False)"
)
await peer_session.ws.send_json(
{
"type": "addPeer",
"data": {
"peer_id": self.id,
"peer_name": self.name,
"should_create_offer": False,
},
}
)
# Add each other peer to the caller
logger.info(
f"{self.getName()} -> addPeer({peer_session.getName(), lobby.getName()}, should_create_offer=True)"
)
await self.ws.send_json(
{
"type": "addPeer",
"data": {
"peer_id": peer_session.id,
"peer_name": peer_session.name,
"should_create_offer": True,
},
}
)
# Add this user as an RTC peer
await lobby.addSession(self)
Session.save()
await self.ws.send_json({"type": "join_status", "status": "Joined"})
async def part(self, lobby: Lobby):
if lobby.id not in self.lobby_peers or self.id not in lobby.sessions:
logger.info(
f"{self.getName()} - Attempt to part non-joined lobby {lobby.getName()}."
)
if self.ws:
await self.ws.send_json(
{"type": "error", "error": "Attempt to part non-joined lobby"}
)
return
logger.info(f"{self.getName()} <- part({lobby.getName()}) - Lobby part.")
lobby_peers = self.lobby_peers[lobby.id]
del self.lobby_peers[lobby.id]
self.lobbies.remove(lobby)
# Remove this peer from all other RTC peers, and remove each peer from this peer
for peer_session_id in lobby_peers:
peer_session = getSession(peer_session_id)
if not peer_session:
logger.warning(
f"{self.getName()} <- part({lobby.getName()}) - Peer session {peer_session_id} not found. Skipping."
)
continue
if not peer_session.ws:
logger.warning(
f"{self.getName()} <- part({lobby.getName()}) - No WebSocket connection for {peer_session.getName()}. Skipping."
)
continue
logger.info(f"{peer_session.getName()} <- remove_peer({self.getName()})")
await peer_session.ws.send_json(
{"type": "remove_peer", "data": {"peer_id": self.id}}
)
if not self.ws:
logger.error(
f"{self.getName()} <- part({lobby.getName()}) - No WebSocket connection."
)
continue
logger.info(f"{self.getName()} <- remove_peer({peer_session.getName()})")
await self.ws.send_json(
{"type": "remove_peer", "data": {"peer_id": peer_session.id}}
)
await lobby.removeSession(self)
Session.save()
class Lobby:
def __init__(self, name: str, id: str | None = None, private: bool = False):
self.id = secrets.token_hex(16) if id is None else id
self.short = self.id[:8]
self.name = name
self.sessions: dict[str, Session] = {} # All lobby members
self.private = private
def getName(self) -> str:
return f"{self.short}:{self.name}"
async def update_state(self, requesting_session: Session | None = None):
users: list[dict[str, str | bool]] = [
{"name": s.name, "live": True if s.ws else False, "session_id": s.id}
for s in self.sessions.values()
if s.name
]
if requesting_session:
logger.info(
f"{requesting_session.getName()} -> lobby_state({self.getName()})"
)
if requesting_session.ws:
await requesting_session.ws.send_json(
{"type": "lobby_state", "data": {"participants": users}}
)
else:
logger.warning(
f"{requesting_session.getName()} - No WebSocket connection."
)
else:
for s in self.sessions.values():
logger.info(f"{s.getName()} -> lobby_state({self.getName()})")
if s.ws:
await s.ws.send_json(
{"type": "lobby_state", "data": {"participants": users}}
)
def getSession(self, id: str) -> Session | None:
return self.sessions.get(id, None)
async def addSession(self, session: Session) -> None:
if session.id in self.sessions:
logger.warning(f"{session.getName()} - Already in lobby {self.getName()}.")
return None
self.sessions[session.id] = session
await self.update_state()
async def removeSession(self, session: Session) -> None:
if session.id not in self.sessions:
logger.warning(f"{session.getName()} - Not in lobby {self.getName()}.")
return None
del self.sessions[session.id]
await self.update_state()
def getName(session: Session | None) -> str | None:
@ -40,39 +313,22 @@ def getName(session: Session | None) -> str | None:
return None
class Lobby:
def __init__(self, name: str):
self.id = secrets.token_hex(16)
self.short = self.id[:8]
self.name = name
self.sessions: dict[str, Session] = {} # All lobby members
self.peers: dict[str, Session] = {} # RTC joined peers only
def addSession(self, session: Session):
if session.id not in self.sessions:
self.sessions[session.id] = session
def removeSession(self, session: Session):
if session.id in self.sessions:
del self.sessions[session.id]
def getSession(self, id) -> Session | None:
return self.sessions.get(id, None)
lobbies: dict[str, Lobby] = {}
sessions: dict[str, Session] = {}
def getSession(session_id) -> Session | None:
return sessions.get(session_id, None)
def getSession(session_id: str) -> Session | None:
return Session.getSession(session_id)
def getLobby(lobby_id) -> Lobby | None:
return lobbies.get(lobby_id, None)
def getLobby(lobby_id: str) -> Lobby:
lobby = lobbies.get(lobby_id, None)
if not lobby:
logger.error(f"Lobby not found: {lobby_id}")
raise Exception(f"Lobby not found: {lobby_id}")
return lobby
def getLobbyByName(lobby_name) -> Lobby | None:
def getLobbyByName(lobby_name: str) -> Lobby | None:
for lobby in lobbies.values():
if lobby.name == lobby_name:
return lobby
@ -83,7 +339,9 @@ def getLobbyByName(lobby_name) -> Lobby | None:
@app.get(f"{public_url}api/health")
def health():
logger.info("Health check endpoint called.")
return {"status": "ok", "sessions": len(sessions), "lobbies": len(lobbies)}
return {
"status": "ok",
}
# A session (cookie) is bound to a single user (name).
@ -92,17 +350,13 @@ def health():
# updates for all lobbies.
@app.get(f"{public_url}api/session")
async def session(
request: Request, response: Response, session_id: str = Cookie(default=None)
):
request: Request, response: Response, session_id: str | None = Cookie(default=None)
) -> dict[str, str | list[LobbyResponse]]:
if session_id is None:
session_id = secrets.token_hex(16)
response.set_cookie(key="session_id", value=session_id)
# Validate that session_id is a hex string of length 32
elif (
not isinstance(session_id, str)
or len(session_id) != 32
or not all(c in "0123456789abcdef" for c in session_id)
):
elif len(session_id) != 32 or not all(c in "0123456789abcdef" for c in session_id):
return {"error": "Invalid session_id"}
print(f"[{session_id[:8]}]: Browser hand-shake achieved.")
@ -110,45 +364,85 @@ async def session(
session = getSession(session_id)
if not session:
session = Session(session_id)
sessions[session_id] = session
logger.info(f"{getSessionName(session)}: New session created.")
logger.info(f"{session.getName()}: New session created.")
else:
logger.info(f"{getSessionName(session)}: Existing session resumed.")
logger.info(f"{session.getName()}: Existing session resumed.")
# Part all lobbies for this session that have no active websocket
for lobby_id in list(session.lobby_peers.keys()):
lobby = None
try:
lobby = getLobby(lobby_id)
except Exception as e:
logger.error(
f"{session.getName()} - Error getting lobby {lobby_id}: {e}"
)
continue
await session.part(lobby)
return {
"id": session_id,
"name": session.name if session.name else None,
"lobbies": [lobby.name for lobby in sessions[session_id].lobbies.values()],
"name": session.name if session.name else "",
"lobbies": [
{"id": lobby.id, "name": lobby.name, "private": lobby.private}
for lobby in session.lobbies
],
}
@app.get(public_url + "api/lobby/{lobby_name}/{session_id}")
async def lobby(
@app.get(public_url + "api/lobby")
async def get_lobbies(request: Request, response: Response):
return {
"lobbies": [
{"id": lobby.id, "name": lobby.name}
for lobby in lobbies.values()
if not lobby.private
]
}
class LobbyCreateData(BaseModel):
name: str
private: Optional[bool] = False
class LobbyCreateRequest(BaseModel):
type: Literal["lobby_create"]
data: LobbyCreateData
@app.post(public_url + "api/lobby/{session_id}")
async def lobby_create(
request: Request,
response: Response,
lobby_name: str | None = Path(...),
session_id: str | None = Path(...),
):
if lobby_name is None:
return {"error": "Missing lobby_name"}
if session_id is None:
return {"error": "Missing session_id"}
session_id: str = Path(...),
create_request: LobbyCreateRequest = Body(...),
) -> dict[str, str | dict[str, str | bool | int]]:
if create_request.type != "lobby_create":
return {"error": "Invalid request type"}
data = create_request.data
logger.info(f"lobby_create: {data.name} (private={data.private})")
session = getSession(session_id)
if not session:
return {"error": f"Session not found ({session_id})"}
lobby = getLobbyByName(lobby_name)
lobby = getLobbyByName(data.name)
if not lobby:
lobby = Lobby(lobby_name)
lobbies[lobby.id] = lobby
logger.info(
f"{getSessionName(session)} <- lobby_create({lobby.short}:{lobby.name})"
lobby = Lobby(
data.name,
private=data.private if data.private is not None else False,
)
lobbies[lobby.id] = lobby
logger.info(f"{session.getName()} <- lobby_create({lobby.short}:{lobby.name})")
lobby.addSession(sessions[session_id])
sessions[session_id].lobbies[lobby.id] = lobby
return {"lobby": lobby.id}
return {
"type": "lobby_created",
"data": {
"id": lobby.id,
"name": lobby.name,
"private": lobby.private,
},
}
all_label = "[ all ]"
@ -157,159 +451,9 @@ todo_label = "[ todo ]"
unset_label = "[ ---- ]"
# Join the media session in a lobby
async def join(
lobby: Lobby,
session: Session,
has_video: bool,
has_audio: bool,
):
if not session.name:
logger.error(
f"{session.short}:[UNSET] <- join - No name set yet. Media not available."
)
return
if not session.ws:
logger.error(
f"{getSessionName(session)} - No WebSocket connection. Media not available."
)
return
logger.info(f"{getSessionName(session)} <- join({getLobbyName(lobby)})")
# if session.id in lobby.peers:
# logger.info(f"{getSessionName(session)} - Already joined to Media.")
# return
# Notify all existing RTC peers
for peer_session in lobby.peers.values():
if peer_session.id == session.id:
continue
if not peer_session.ws:
logger.warning(
f"{getSessionName(peer_session)} - No WebSocket connection. Skipping."
)
continue
logger.info(
f"{getSessionName(peer_session)} -> addPeer({getSessionName(session), getLobbyName(lobby)}, video={has_video}, audio={has_audio}, should_create_offer=False)"
)
await peer_session.ws.send_json(
{
"type": "addPeer",
"data": {
"peer_id": session.id,
"peer_name": session.name,
"should_create_offer": False,
"has_audio": has_audio,
"has_video": has_video,
},
}
)
# Add each other peer to the caller
if session.ws:
logger.info(
f"{getSessionName(session)} -> addPeer({getSessionName(peer_session), getLobbyName(lobby)}, video={peer_session.has_video}, audio={peer_session.has_audio}, should_create_offer=True)"
)
await session.ws.send_json(
{
"type": "addPeer",
"data": {
"peer_id": peer_session.id,
"peer_name": peer_session.name,
"should_create_offer": True,
"has_audio": peer_session.has_audio,
"has_video": peer_session.has_video,
},
}
)
# Add this user as an RTC peer
lobby.peers[session.id] = session
await update_users(lobby)
async def part(
lobby: Lobby,
session: Session,
):
if session.id not in lobby.peers:
logger.info(
f"{getSessionName(session)}: <- part({getLobbyName(lobby)}) - Does not exist in RTC peers."
)
return
logger.info(
f"{getSessionName(session)}: <- part({getLobbyName(lobby)}) - Media part."
)
del lobby.peers[session.id]
# Remove this peer from all other RTC peers, and remove each peer from this peer
for peer_session in lobby.peers.values():
if not peer_session.ws:
logger.warning(
f"{getSessionName(peer_session)} <- part({getLobbyName(lobby)}) - No WebSocket connection. Skipping."
)
continue
logger.info(
f"{getSessionName(peer_session)} <- remove_peer({getSessionName(session)})"
)
await peer_session.ws.send_json(
{"type": "remove_peer", "data": {"peer_id": session.id}}
)
if session.ws:
logger.info(
f"{getSessionName(session)} <- remove_peer({getSessionName(peer_session)})"
)
await session.ws.send_json(
{"type": "remove_peer", "data": {"peer_id": peer_session.id}}
)
else:
logger.error(
f"{getSessionName(session)} <- part({getLobbyName(lobby)}) - No WebSocket connection."
)
async def update_users(lobby: Lobby, requesting_session: Session | None = None):
users = [
{"name": s.name, "live": True if s.ws else False, "session_id": s.id}
for s in lobby.sessions.values()
if s.name
]
if requesting_session:
logger.info(
f"{requesting_session.short}:{requesting_session.name} -> list_users({lobby.name})"
)
if requesting_session.ws:
await requesting_session.ws.send_json({"type": "users", "users": users})
else:
logger.warning(
f"{requesting_session.short}:{requesting_session.name} - No WebSocket connection."
)
else:
for s in lobby.sessions.values():
logger.info(
f"{s.short}:{s.name if s.name else unset_label} -> list_users({lobby.name})"
)
if s.ws:
await s.ws.send_json({"type": "users", "users": users})
def getSessionName(session: Session) -> str:
return f"{session.short}:{session.name if session.name else unset_label}"
def getLobbyName(lobby: Lobby) -> str:
return f"{lobby.short}:{lobby.name}"
# Register websocket endpoint directly on app with full public_url path
@app.websocket(f"{public_url}" + "ws/lobby/{lobby_id}/{session_id}")
async def websocket_lobby(
async def lobby_join(
websocket: WebSocket,
lobby_id: str | None = Path(...),
session_id: str | None = Path(...),
@ -335,86 +479,122 @@ async def websocket_lobby(
)
await websocket.close()
return
lobby = getLobby(lobby_id)
if not lobby:
logger.error(f"Invalid lobby ID {lobby_id}")
await websocket.send_json(
{"type": "error", "error": f"Invalid lobby ID {lobby_id}"}
)
lobby = None
try:
lobby = getLobby(lobby_id)
except Exception as e:
await websocket.send_json({"type": "error", "error": str(e)})
await websocket.close()
return
logger.info(
f"{getSessionName(session)} <- lobby_connect({lobby.short}:{lobby.name})"
)
logger.info(f"{session.getName()} <- lobby_joined({lobby.getName()})")
session.ws = websocket
if session.id in lobby.sessions:
logger.info(
f"{session.getName()} - Stale session in lobby {lobby.getName()}. Re-joining."
)
await session.part(lobby)
await lobby.removeSession(session)
# This user session just went from Dead to Live, so update everyone's user list
await update_users(lobby)
for peer_id in lobby.sessions:
peer_session = lobby.getSession(peer_id)
if not peer_session or not peer_session.ws:
logger.warning(
f"{session.getName()} - Live peer session {peer_id} not found in lobby {lobby.getName()}. Removing."
)
del lobby.sessions[peer_id]
continue
logger.info(f"{session.getName()} -> user_joined({peer_session.getName()})")
await peer_session.ws.send_json(
{
"type": "user_joined",
"data": {
"session_id": session.id,
"name": session.name,
},
}
)
try:
while True:
data = await websocket.receive_json()
# logger.info(f"{getSessionName(session)} <- RAW Rx: {data}")
match data.get("type"):
message = await websocket.receive_json()
type = message.get("type", None)
data: dict[str, Any] | None = message.get("data", None)
if not type:
logger.error(f"{session.getName()} - Invalid request: {message}")
await websocket.send_json({"type": "error", "error": "Invalid request"})
continue
# logger.info(f"{session.getName()} <- RAW Rx: {data}")
match type:
case "set_name":
if not data:
logger.error(f"{session.getName()} - set_name missing data")
await websocket.send_json(
{"type": "error", "error": "set_name missing data"}
)
continue
name = data.get("name")
logger.info(f"{session.getName()} <- set_name({name})")
if not name:
logger.error(f"{session.getName()} - Name required")
await websocket.send_json(
{"type": "error", "error": "Name required"}
)
continue
# Check for duplicate name
if any(s.name.lower() == name.lower() for s in sessions.values()):
if not Session.isUniqueName(name):
logger.warning(f"{session.getName()} - Name already taken")
await websocket.send_json(
{"type": "error", "error": "Name already taken"}
)
continue
session.name = name
logger.info(
f"{getSessionName(session)} <- set_name({session.name})"
)
session.setName(name)
logger.info(f"{session.getName()}: -> update('name', {name})")
await websocket.send_json({"type": "update", "name": name})
await update_users(lobby)
# For any clients in any lobby with this session, update their user lists
await lobby.update_state()
case "list_users":
await update_users(lobby, session)
case "media_status":
has_audio = data.get("has_audio", False)
has_video = data.get("has_video", False)
logger.info(
f"{getSessionName(session)}: <- media-status(audio: {has_audio}, video: {has_video})"
)
session.has_audio = has_audio
session.has_video = has_video
await lobby.update_state(session)
case "join":
logger.info(f"{getSessionName(session)} <- join {data}")
has_audio = data.get("has_audio", False)
has_video = data.get("has_video", False)
await join(lobby, session, has_video, has_audio)
logger.info(f"{session.getName()} <- join({lobby.getName()})")
await session.join(lobby=lobby)
case "part":
await part(lobby, session)
logger.info(f"{session.getName()} <- part {lobby.getName()}")
await session.part(lobby=lobby)
case "relayICECandidate":
logger.info(f"{getSessionName(session)} <- relayICECandidate")
if session.id not in lobby.peers:
logger.info(f"{session.getName()} <- relayICECandidate")
if not data:
logger.error(
f"{session.getName()} - relayICECandidate missing data"
)
await websocket.send_json(
{"type": "error", "error": "relayICECandidate missing data"}
)
continue
if (
lobby.id not in session.lobby_peers
or session.id not in lobby.sessions
):
logger.error(
f"{session.short}:{session.name} <- relayICECandidate - Not an RTC peer ({session.id})"
)
await websocket.send_json(
{"type": "error", "error": "Not joined to media session"}
{"type": "error", "error": "Not joined to lobby"}
)
continue
peer_id = data.get("config", {}).get("peer_id")
if peer_id not in lobby.peers:
session_peers = session.lobby_peers[lobby.id]
peer_id = data.get("peer_id")
if peer_id not in session_peers:
logger.error(
f"{getSessionName(session)} <- relayICECandidate - Not an RTC peer({peer_id})"
f"{session.getName()} <- relayICECandidate - Not an RTC peer({peer_id}) in {session_peers}"
)
await websocket.send_json(
{
@ -424,42 +604,84 @@ async def websocket_lobby(
)
continue
candidate = data.get("config", {}).get("candidate")
candidate = data.get("candidate")
message = {
message: dict[str, Any] = {
"type": "iceCandidate",
"data": {"peer_id": session.id, "candidate": candidate},
}
if peer_id in lobby.peers:
ws = lobby.peers[peer_id].ws
if not ws:
logger.warning(
f"{lobby.peers[peer_id].short}:{lobby.peers[peer_id].name} - No WebSocket connection. Skipping."
)
break
logger.info(
f"{getSessionName(session)} -> iceCandidate({getSessionName(lobby.peers[peer_id])})"
peer_session = lobby.getSession(peer_id)
if not peer_session or not peer_session.ws:
logger.warning(
f"{session.getName()} - Live peer session {peer_id} not found in lobby {lobby.getName()}."
)
await ws.send_json(message)
break
logger.info(
f"{session.getName()} -> iceCandidate({peer_session.getName()})"
)
await peer_session.ws.send_json(message)
case "relaySessionDescription":
logger.info(f"{getSessionName(session)} <- relaySessionDescription")
if session.id not in lobby.peers:
logger.info(f"{session.getName()} <- relaySessionDescription")
if not data:
logger.error(
f"{session.short}:{session.name} - relaySessionDescription - Not an RTC peer"
f"{session.getName()} - relaySessionDescription missing data"
)
await websocket.send_json(
{
"type": "error",
"error": "relaySessionDescription missing data",
}
)
continue
if (
lobby.id not in session.lobby_peers
or session.id not in lobby.sessions
):
logger.error(
f"{session.short}:{session.name} <- relaySessionDescription - Not an RTC peer ({session.id})"
)
await websocket.send_json(
{"type": "error", "error": "Not joined to lobby"}
)
continue
lobby_peers = session.lobby_peers[lobby.id]
peer_id = data.get("peer_id")
if peer_id not in lobby_peers:
logger.error(
f"{session.getName()} <- relaySessionDescription - Not an RTC peer({peer_id}) in {lobby_peers}"
)
await websocket.send_json(
{
"type": "error",
"error": f"Target peer {peer_id} not found",
}
)
continue
peer_id = data.get("peer_id", None)
if not peer_id:
logger.error(
f"{session.getName()} - relaySessionDescription missing peer_id"
)
await websocket.send_json(
{
"type": "error",
"error": "relaySessionDescription missing peer_id",
}
)
continue
peer_session = lobby.getSession(peer_id)
if not peer_session or not peer_session.ws:
logger.warning(
f"{session.getName()} - Live peer session {peer_id} not found in lobby {lobby.getName()}."
)
break
peer_id = data.get("config", {}).get("peer_id")
peer = lobby.peers.get(peer_id, None)
if not peer:
logger.error(
f"{getSessionName(session)} <- relaySessionDescription - Not an RTC peer({peer_id})"
)
break
session_description = data.get("config", {}).get(
"session_description"
)
session_description = data.get("session_description")
message = {
"type": "sessionDescription",
"data": {
@ -467,39 +689,34 @@ async def websocket_lobby(
"session_description": session_description,
},
}
if not peer.ws:
logger.warning(
f"{lobby.peers[peer_id].short}:{lobby.peers[peer_id].name} - No WebSocket connection. Skipping."
)
break
logger.info(
f"{getSessionName(session)} -> sessionDescription({getSessionName(lobby.peers[peer_id])})"
f"{session.getName()} -> sessionDescription({peer_session.getName()})"
)
await peer.ws.send_json(message)
await peer_session.ws.send_json(message)
case _:
await websocket.send_json(
{
"type": "error",
"error": f"Unknown request type: {data.get('type')}",
"error": f"Unknown request type: {type}",
}
)
except WebSocketDisconnect:
logger.info(f"{getSessionName(session)} <- WebSocket disconnected for user.")
logger.info(f"{session.getName()} <- WebSocket disconnected for user.")
# Cleanup: remove session from lobby and sessions dict
session.ws = None
if session.id in lobby.peers:
await part(lobby, session)
if session.id in lobby.sessions:
await session.part(lobby)
await update_users(lobby)
await lobby.update_state()
# Clean up empty lobbies
if not lobby.sessions:
if lobby.id in lobbies:
del lobbies[lobby.id]
logger.info(f"Cleaned up empty lobby {lobby.short}")
logger.info(f"Cleaned up empty lobby {lobby.getName()}")
# Serve static files or proxy to frontend development server