From afdf856f2b0ef5f540661db17e72e4bf4f77c3dd Mon Sep 17 00:00:00 2001 From: James Ketrenos Date: Mon, 8 Sep 2025 16:03:53 -0700 Subject: [PATCH] Maybe working? --- docker-compose.yml | 6 + server/websocket/connection.py | 20 ++- server/websocket/webrtc_signaling.py | 71 +++++------ voicebot/webrtc_signaling.py | 175 +++++++++------------------ 4 files changed, 115 insertions(+), 157 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 4d15b14..bc66e5c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -17,6 +17,8 @@ services: - ./server:/server:ro # So the frontend can read the OpenAPI spec - ./client:/client:rw - ./dev-keys:/keys:ro # So the frontend entrypoint can check for SSL files + - /etc/localtime:/etc/localtime:ro + - /etc/timezone:/etc/timezone:ro networks: - ai-voicebot-net @@ -52,6 +54,8 @@ services: - ./server/.venv:/server/.venv:rw - ./client/build:/client/build:ro - ./dev-keys:/keys:ro + - /etc/localtime:/etc/localtime:ro + - /etc/timezone:/etc/timezone:ro networks: - ai-voicebot-net @@ -75,6 +79,8 @@ services: - ./shared:/shared:ro - ./voicebot:/voicebot:rw - ./voicebot/.venv:/voicebot/.venv:rw + - /etc/localtime:/etc/localtime:ro + - /etc/timezone:/etc/timezone:ro # network_mode: host networks: - ai-voicebot-net diff --git a/server/websocket/connection.py b/server/websocket/connection.py index 3839c42..b0beb53 100644 --- a/server/websocket/connection.py +++ b/server/websocket/connection.py @@ -97,8 +97,12 @@ class WebSocketConnectionManager: session.ws = websocket session.update_last_used() - # Clean up stale session in lobby if needed + # Clean up stale session in lobby if needed. If a stale session + # was present, perform a full re-join so WebRTC peer setup runs + # (this ensures bots and other peers receive addPeer signaling). + stale_rejoin = False if session.id in lobby.sessions: + stale_rejoin = True logger.info(f"{session.getName()} - Stale session in lobby {lobby.getName()}. Re-joining.") try: # Leave the lobby to clean up peer connections @@ -120,9 +124,17 @@ class WebSocketConnectionManager: break except Exception as e: logger.warning(f"Error cleaning up stale session: {e}") - - # Notify existing peers about new user - await self._notify_peers_of_join(session, lobby) + + # If this was a stale rejoin, re-add the session to the lobby which will + # trigger WebRTC signaling (addPeer) with existing peers (including bots). + if stale_rejoin: + try: + await session.join_lobby(lobby) + except Exception as e: + logger.warning(f"Error re-joining stale session to lobby: {e}") + else: + # Notify existing peers about a fresh new connection (no automatic join yet) + await self._notify_peers_of_join(session, lobby) try: # Message processing loop diff --git a/server/websocket/webrtc_signaling.py b/server/websocket/webrtc_signaling.py index c8b9871..5874d36 100644 --- a/server/websocket/webrtc_signaling.py +++ b/server/websocket/webrtc_signaling.py @@ -216,9 +216,6 @@ class WebRTCSignalingHandlers: peer_session: The existing peer session in the lobby lobby: The lobby context """ - logger.info( - f"[TRACE] handle_add_peer called: session={session.getName()} (id={session.id}), peer_session={peer_session.getName()} (id={peer_session.id}), lobby={lobby.getName()}" - ) # Only establish WebRTC connections if at least one has media if session.has_media or peer_session.has_media: # Add peer_session to session's peer list @@ -232,27 +229,23 @@ class WebRTCSignalingHandlers: if lobby.id not in peer_session.lobby_peers: peer_session.lobby_peers[lobby.id] = [] peer_session.lobby_peers[lobby.id].append(session.id) - logger.info( - f"[TRACE] {session.getName()} lobby_peers after add: {session.lobby_peers}" - ) - # Determine offer roles: bots should never create offers - # When a bot joins: existing humans create offers to the bot - # When a human joins: human creates offers to existing bots, bots don't create offers to human - - # Determine who should create the offer based on timing - # In our polite peer implementation, the newer peer typically creates the offer - existing_peer_should_offer = False - new_session_should_offer = True - - # Notify existing peer about new peer - logger.info( - f"{session.getName()} -> {peer_session.getName()}:addPeer(" - f"{session.getName()}, {lobby.getName()}, should_create_offer={existing_peer_should_offer}, " - f"has_media={session.has_media})" - ) + # Determine deterministic offerer to avoid glare/collision: + # the peer with the lexicographically smaller session ID will create the offer. try: + session_is_offer = session.id < peer_session.id + + # Notify existing peer about new peer + peer_should_create_offer = peer_session.id < session.id + logger.info( + f"{session.getName()} -> {peer_session.getName()}:addPeer(" + f"{session.getName()}, {lobby.getName()}, should_create_offer={peer_should_create_offer}, " + f"has_media={session.has_media})" + ) if peer_session.ws: + logger.debug( + f"Sending addPeer to {peer_session.getName()} -> peer_id={session.id}, peer_name={session.name}, has_media={session.has_media}, should_create_offer={peer_should_create_offer}" + ) await peer_session.ws.send_json( { "type": "addPeer", @@ -260,23 +253,26 @@ class WebRTCSignalingHandlers: "peer_id": session.id, "peer_name": session.name, "has_media": session.has_media, - "should_create_offer": existing_peer_should_offer, + "should_create_offer": peer_should_create_offer, }, } ) - except Exception as e: - logger.warning( - f"Failed to send addPeer to {peer_session.getName()}: {e}" - ) + else: + logger.warning( + f"Not sending addPeer to {peer_session.getName()} because ws is None" + ) - # Notify new session about existing peer - logger.info( - f"{session.getName()} -> {session.getName()}:addPeer(" - f"{peer_session.getName()}, {lobby.getName()}, should_create_offer={new_session_should_offer}, " - f"has_media={peer_session.has_media})" - ) - try: + # Notify new session about existing peer + session_should_create_offer = session_is_offer + logger.info( + f"{session.getName()} -> {session.getName()}:addPeer(" + f"{peer_session.getName()}, {lobby.getName()}, should_create_offer={session_should_create_offer}, " + f"has_media={peer_session.has_media})" + ) if session.ws: + logger.debug( + f"Sending addPeer to {session.getName()} -> peer_id={peer_session.id}, peer_name={peer_session.name}, has_media={peer_session.has_media}, should_create_offer={session_should_create_offer}" + ) await session.ws.send_json( { "type": "addPeer", @@ -284,12 +280,14 @@ class WebRTCSignalingHandlers: "peer_id": peer_session.id, "peer_name": peer_session.name, "has_media": peer_session.has_media, - "should_create_offer": new_session_should_offer, + "should_create_offer": session_should_create_offer, }, } ) + else: + logger.warning(f"Not sending addPeer to {session.getName()} because ws is None") except Exception as e: - logger.warning(f"Failed to send addPeer to {session.getName()}: {e}") + logger.warning(f"Failed to send addPeer messages for pair {session.getName()} <-> {peer_session.getName()}: {e}") else: logger.info( f"{session.getName()} - Skipping WebRTC connection with " @@ -312,9 +310,6 @@ class WebRTCSignalingHandlers: peer_session: The peer session to disconnect from lobby: The lobby context """ - logger.info( - f"[TRACE] handle_remove_peer called: session={session.getName()} (id={session.id}), peer_session={peer_session.getName()} (id={peer_session.id}), lobby={lobby.getName()}" - ) # Notify peer about session removal if peer_session.ws: logger.info( diff --git a/voicebot/webrtc_signaling.py b/voicebot/webrtc_signaling.py index c16ddf0..b2c71bc 100644 --- a/voicebot/webrtc_signaling.py +++ b/voicebot/webrtc_signaling.py @@ -442,8 +442,8 @@ class WebRTCSignalingClient: async def _setup_local_media(self): """Create local media tracks""" - # Always clear out old tracks to avoid reusing stopped or stale tracks - self.local_tracks.clear() + # If a bot provided a create_tracks callable, use it to create tracks. + # Otherwise, use default synthetic tracks. try: if self.create_tracks: tracks = self.create_tracks(self.session_name) @@ -619,6 +619,7 @@ class WebRTCSignalingClient: logger.info( f"User joined: {validated.name} (session: {validated.session_id})" ) + logger.debug(f"user_joined payload: {validated}") elif msg_type == "lobby_state": try: validated = LobbyStateModel.model_validate(data) @@ -665,6 +666,34 @@ class WebRTCSignalingClient: logger.error(f"Error in chat message callback: {e}", exc_info=True) elif msg_type == "error": logger.error(f"Received error from signaling server: {data}") + elif msg_type == "addPeer": + # Log raw addPeer receipt for debugging + try: + validated = AddPeerModel.model_validate(data) + except ValidationError as e: + logger.error(f"Invalid addPeer payload: {e}", exc_info=True) + return + logger.info(f"Received addPeer for peer {validated.peer_name} (peer_id={validated.peer_id}) should_create_offer={validated.should_create_offer}") + logger.debug(f"addPeer payload: {validated}") + await self._handle_add_peer(validated) + elif msg_type == "sessionDescription": + try: + validated = SessionDescriptionModel.model_validate(data) + except ValidationError as e: + logger.error(f"Invalid sessionDescription payload: {e}", exc_info=True) + return + logger.info(f"Received sessionDescription from {validated.peer_name} (peer_id={validated.peer_id})") + logger.debug(f"sessionDescription payload keys: {list(data.keys())}") + await self._handle_session_description(validated) + elif msg_type == "iceCandidate": + try: + validated = IceCandidateModel.model_validate(data) + except ValidationError as e: + logger.error(f"Invalid iceCandidate payload: {e}", exc_info=True) + return + logger.info(f"Received iceCandidate from {validated.peer_name} (peer_id={validated.peer_id})") + logger.debug(f"iceCandidate payload: {validated}") + await self._handle_ice_candidate(validated) else: logger.info(f"Unhandled message type: {msg_type} with data: {data}") @@ -679,66 +708,26 @@ class WebRTCSignalingClient: logger.info( f"Adding peer: {peer_name} (should_create_offer: {should_create_offer})" ) - logger.info(f"_handle_add_peer: Current peer_connections count: {len(self.peer_connections)}") - logger.info(f"_handle_add_peer: Current active peers: {list(self.peer_connections.keys())}") logger.debug( f"_handle_add_peer: peer_id={peer_id}, peer_name={peer_name}, should_create_offer={should_create_offer}" ) - # Check if peer already exists and clean up stale connections + # Check if peer already exists if peer_id in self.peer_connections: pc = self.peer_connections[peer_id] - logger.info(f"_handle_add_peer: Found existing connection for {peer_name} (state: {pc.connectionState})") - logger.info("_handle_add_peer: Cleaning up existing connection to ensure fresh start") - - # Always clean up existing connection for fresh start - # This prevents stale connection issues during page refresh - try: - await pc.close() - except Exception as e: - logger.warning(f"Error closing existing connection for {peer_name}: {e}") - - # Clean up all associated state - self.is_negotiating.pop(peer_id, None) - self.making_offer.pop(peer_id, None) - self.initiated_offer.discard(peer_id) - self.pending_ice_candidates.pop(peer_id, None) - self.peers.pop(peer_id, None) # Also clean up peers dictionary - del self.peer_connections[peer_id] - - # Give the connection a moment to close, but don't block indefinitely - # If it doesn't close quickly, proceed anyway to avoid blocking other operations - try: - await asyncio.wait_for( - self._wait_for_connection_close(pc, peer_name), - timeout=1.0 + logger.debug( + f"_handle_add_peer: Existing connection state: {pc.connectionState}" + ) + if pc.connectionState in ["new", "connected", "connecting"]: + logger.info(f"Peer connection already exists for {peer_name}") + return + else: + # Clean up stale connection + logger.debug( + f"_handle_add_peer: Closing stale connection for {peer_name}" ) - except asyncio.TimeoutError: - logger.warning(f"Connection to {peer_name} did not close within timeout, proceeding anyway") - - # Also check for any existing connections with the same peer name but different ID - # This handles cases where the frontend gets a new session ID after refresh - connections_to_remove: list[str] = [] - for existing_peer_id, existing_peer in self.peers.items(): - if existing_peer.peer_name == peer_name and existing_peer_id != peer_id: - logger.info(f"_handle_add_peer: Found existing connection for peer name {peer_name} with different ID {existing_peer_id}, cleaning up") - connections_to_remove.append(existing_peer_id) - - for old_peer_id in connections_to_remove: - if old_peer_id in self.peer_connections: - pc = self.peer_connections[old_peer_id] - try: - await pc.close() - except Exception as e: - logger.warning(f"Error closing old connection for {peer_name}: {e}") - - # Clean up all associated state for old connection - self.is_negotiating.pop(old_peer_id, None) - self.making_offer.pop(old_peer_id, None) - self.initiated_offer.discard(old_peer_id) - self.pending_ice_candidates.pop(old_peer_id, None) - self.peers.pop(old_peer_id, None) - del self.peer_connections[old_peer_id] + await pc.close() + del self.peer_connections[peer_id] # Create new peer peer = Peer(session_id=peer_id, peer_name=peer_name, local=False) @@ -753,6 +742,8 @@ class WebRTCSignalingClient: username="ketra", credential="ketran", ), + # Add Google's public STUN server as fallback + RTCIceServer(urls="stun:stun.l.google.com:19302"), ], ) logger.debug( @@ -772,39 +763,12 @@ class WebRTCSignalingClient: # Add connection state change handler def on_connection_state_change() -> None: - logger.info(f"🔄 Connection state changed for {peer_name}: {pc.connectionState}") - - # Log additional details for debugging connection closure/failure - logger.info(f" ICE connection state: {pc.iceConnectionState}") - logger.info(f" Signaling state: {pc.signalingState}") - # If available, log the last ICE candidate pair and any error details - selected_pair = None - try: - if hasattr(pc, 'sctp') and pc.sctp: - selected_pair = getattr(pc.sctp.transport, 'iceTransport', None) - elif hasattr(pc, 'iceTransport'): - selected_pair = pc.iceTransport - except Exception as e: - logger.warning(f"Error accessing selected ICE pair: {e}") - if selected_pair: - logger.info(f" Selected ICE candidate pair: {getattr(selected_pair, 'selectedCandidatePair', None)}") - - # Handle failed connections - could be due to network issues during refresh - if pc.connectionState == "failed": - logger.warning(f"❌ Connection to {peer_name} failed - this might be due to page refresh") - elif pc.connectionState == "disconnected": - logger.info(f"⚠️ Connection to {peer_name} disconnected") - elif pc.connectionState == "connected": - logger.info(f"✅ Connection to {peer_name} established successfully") - elif pc.connectionState == "closed": - logger.info(f"🔒 Connection to {peer_name} closed") + logger.info(f"Connection state: {pc.connectionState}") pc.on("connectionstatechange")(on_connection_state_change) - logger.info(f"📝 Storing peer connection for {peer_name} (ID: {peer_id})") self.peer_connections[peer_id] = pc peer.connection = pc - logger.info(f"📊 Total peer connections now: {len(self.peer_connections)}") # Set up event handlers def on_track(track: MediaStreamTrack) -> None: @@ -1108,33 +1072,19 @@ class WebRTCSignalingClient: # Close peer connection if peer_id in self.peer_connections: pc = self.peer_connections[peer_id] - try: - await pc.close() - # Wait for connection to be properly closed - while pc.connectionState not in ["closed", "failed"]: - logger.debug(f"Waiting for connection to {peer_name} to close (current state: {pc.connectionState})") - # Brief yield to allow cleanup - await asyncio.sleep(0.01) - except Exception as e: - logger.warning(f"Error closing peer connection for {peer_name}: {e}") + await pc.close() del self.peer_connections[peer_id] - # Extra forced cleanup: remove all state for this peer_id - for state_dict in [self.is_negotiating, self.making_offer, self.pending_ice_candidates]: - if peer_id in state_dict: - logger.debug(f"Force removing {peer_id} from {state_dict}") - state_dict.pop(peer_id, None) + # Clean up state + self.is_negotiating.pop(peer_id, None) + self.making_offer.pop(peer_id, None) self.initiated_offer.discard(peer_id) + self.pending_ice_candidates.pop(peer_id, None) # Remove peer peer = self.peers.pop(peer_id, None) if peer and self.on_peer_removed: - try: - await self.on_peer_removed(peer) - except Exception as e: - logger.warning(f"Error in on_peer_removed callback for {peer_name}: {e}") - - logger.info(f"Completed removing peer: {peer_name} (all state reset)") + await self.on_peer_removed(peer) async def _handle_session_description(self, data: SessionDescriptionModel): """Handle sessionDescription message""" @@ -1163,12 +1113,12 @@ class WebRTCSignalingClient: ) we_initiated = peer_id in self.initiated_offer - # For bots, be more polite - accept remote offers even in collision scenarios - # Only ignore if we're in the middle of sending our own offer AND it's actually conflicting - ignore_offer = offer_collision and we_initiated and pc.signalingState != "stable" + # For bots, be more polite - always yield to human users in collision + # Bots should generally be the polite peer + ignore_offer = offer_collision and we_initiated if ignore_offer: - logger.info(f"Ignoring offer from {peer_name} due to active collision (bot being polite)") + logger.info(f"Ignoring offer from {peer_name} due to collision (bot being polite)") # Reset our offer state to allow the remote offer to proceed if peer_id in self.initiated_offer: self.initiated_offer.remove(peer_id) @@ -1180,11 +1130,7 @@ class WebRTCSignalingClient: await asyncio.sleep(1.0) # Wait 1 second if pc.connectionState not in ["connected", "closed", "failed"]: logger.info(f"Retrying connection setup for {peer_name} after collision") - # Reset state to allow the remote peer to retry - self.making_offer[peer_id] = False - self.is_negotiating[peer_id] = False - if peer_id in self.initiated_offer: - self.initiated_offer.remove(peer_id) + # Don't create offer, let the remote peer drive asyncio.create_task(retry_connection()) return @@ -1295,7 +1241,7 @@ class WebRTCSignalingClient: peer_name = data.peer_name candidate_data = data.candidate - logger.info(f"Received ICE candidate from {peer_name}: {candidate_data}") + logger.info(f"Received ICE candidate from {peer_name}") pc = self.peer_connections.get(peer_id) if not pc: @@ -1333,7 +1279,6 @@ class WebRTCSignalingClient: except Exception: cand_type = "unknown" - logger.info(f"Attempting to add ICE candidate for {peer_name}: type={cand_type}, sdp='{sdp_part}'") try: rtc_candidate = candidate_from_sdp(sdp_part) rtc_candidate.sdpMid = candidate_data.sdpMid @@ -1341,7 +1286,7 @@ class WebRTCSignalingClient: # aiortc expects an object with attributes (RTCIceCandidate) await pc.addIceCandidate(rtc_candidate) - logger.info(f"ICE candidate successfully added for {peer_name}: type={cand_type}") + logger.info(f"ICE candidate added for {peer_name}: type={cand_type}") except Exception as e: logger.error( f"Failed to add ICE candidate for {peer_name}: type={cand_type} error={e} sdp='{sdp_part}'",