From a3b9e7fa3997c9851e59b5fd96f9ea902246e48b Mon Sep 17 00:00:00 2001 From: James Ketrenos Date: Wed, 3 Sep 2025 18:06:02 -0700 Subject: [PATCH] Fixed blocking ice gathering --- client/src/App.tsx | 3 +++ server/main.py | 4 +-- voicebot/logger.py | 3 ++- voicebot/webrtc_signaling.py | 48 +++++++++++++++++++++++++++++++++--- 4 files changed, 52 insertions(+), 6 deletions(-) diff --git a/client/src/App.tsx b/client/src/App.tsx index 7538a43..86288cc 100644 --- a/client/src/App.tsx +++ b/client/src/App.tsx @@ -166,6 +166,9 @@ const LobbyView: React.FC = (props: LobbyProps) => { ) { console.log("Lobby - Server error detected, will retry when session is restored"); setShouldRetryLobby(true); + } else { + console.log("Lobby - Non-retryable error, clearing session"); + setSession(null); } } }; diff --git a/server/main.py b/server/main.py index af27d5d..87470fd 100644 --- a/server/main.py +++ b/server/main.py @@ -2010,10 +2010,10 @@ async def lobby_join( # Add the message to the lobby and broadcast it chat_message = lobby.add_chat_message(session, message_text) - await lobby.broadcast_chat_message(chat_message) logger.info( - f"{session.getName()} sent chat message to {lobby.getName()}: {message_text[:50]}..." + f"{session.getName()} -> broadcast_chat_message({lobby.getName()}, {message_text[:50]}...)" ) + await lobby.broadcast_chat_message(chat_message) case "join": logger.info(f"{session.getName()} <- join({lobby.getName()})") diff --git a/voicebot/logger.py b/voicebot/logger.py index 007d195..b6116e3 100644 --- a/voicebot/logger.py +++ b/voicebot/logger.py @@ -4,7 +4,8 @@ import logging import time from typing import Optional, Tuple -logging_level = os.getenv("LOGGING_LEVEL", "INFO").upper() +#logging_level = os.getenv("LOGGING_LEVEL", "INFO").upper() +logging_level = os.getenv("LOGGING_LEVEL", "DEBUG").upper() class RelativePathFormatter(logging.Formatter): diff --git a/voicebot/webrtc_signaling.py b/voicebot/webrtc_signaling.py index f469d5b..97d9e7d 100644 --- a/voicebot/webrtc_signaling.py +++ b/voicebot/webrtc_signaling.py @@ -493,23 +493,44 @@ class WebRTCSignalingClient: async def _handle_messages(self): """Handle incoming messages from signaling server""" + logger.info("_handle_messages: Starting message handling loop") + message_count = 0 try: ws = cast(WebSocketProtocol, self.websocket) + logger.info("_handle_messages: WebSocket cast successful, entering message loop") async for message in ws: + message_count += 1 + logger.debug(f"_handle_messages: [#{message_count}] Entering message processing") + # Check for shutdown request if self.shutdown_requested: logger.info("Shutdown requested, breaking message loop") break - logger.debug(f"_handle_messages: Received raw message: {message}") + logger.debug(f"_handle_messages: [#{message_count}] Received raw message: {message}") try: data = cast(MessageData, json.loads(message)) + logger.debug(f"_handle_messages: [#{message_count}] Successfully parsed message data: {data}") except Exception as e: logger.error( - f"_handle_messages: Failed to parse message: {e}", exc_info=True + f"_handle_messages: [#{message_count}] Failed to parse message: {e}", exc_info=True ) continue - await self._process_message(data) + + logger.debug(f"_handle_messages: [#{message_count}] About to call _process_message") + + # Add timeout to message processing to prevent blocking + processing_timeout = 30.0 # 30 seconds timeout + try: + await asyncio.wait_for(self._process_message(data), timeout=processing_timeout) + logger.debug(f"_handle_messages: [#{message_count}] Completed _process_message") + except asyncio.TimeoutError: + logger.error(f"_handle_messages: [#{message_count}] Message processing timeout ({processing_timeout}s) for message type: {data.get('type', 'unknown')}") + except Exception as e: + logger.error(f"_handle_messages: [#{message_count}] Error in _process_message: {e}", exc_info=True) + + logger.info(f"_handle_messages: Exited message loop (async for ended) after processing {message_count} messages") + logger.info("_handle_messages: Exited message loop (async for ended)") except websockets.exceptions.ConnectionClosed as e: logger.warning(f"WebSocket connection closed: {e}") self.is_registered = False @@ -520,6 +541,7 @@ class WebRTCSignalingClient: async def _process_message(self, message: MessageData): """Process incoming signaling messages""" + logger.debug(f"_process_message: ENTRY - Processing message: {message}") try: # Handle error messages specially since they have a different structure if message.get("type") == "error" and "error" in message: @@ -531,6 +553,7 @@ class WebRTCSignalingClient: validated_message = WebSocketMessageModel.model_validate(message) msg_type = validated_message.type data = validated_message.data + logger.debug(f"_process_message: Validated message - type: {msg_type}, data: {data}") except ValidationError as e: logger.error(f"Invalid message structure: {e}", exc_info=True) return @@ -607,6 +630,7 @@ class WebRTCSignalingClient: logger.debug(f"Received status_ok from server: {data}") # This confirms the connection is healthy elif msg_type == "chat_message": + logger.info("Received chat message") try: validated = ChatMessageModel.model_validate(data) except ValidationError as e: @@ -783,7 +807,16 @@ class WebRTCSignalingClient: logger.debug( f"_handle_add_peer: Waiting for ICE gathering to complete for {peer_name}" ) + + # Add timeout to prevent blocking message processing indefinitely + ice_gathering_timeout = 10.0 # 10 seconds timeout + start_time = asyncio.get_event_loop().time() + while pc.iceGatheringState != "complete": + current_time = asyncio.get_event_loop().time() + if current_time - start_time > ice_gathering_timeout: + logger.warning(f"ICE gathering timeout ({ice_gathering_timeout}s) for {peer_name}, proceeding anyway") + break await asyncio.sleep(0.1) logger.debug( @@ -975,7 +1008,16 @@ class WebRTCSignalingClient: logger.debug( f"_create_and_send_answer: Waiting for ICE gathering to complete for {peer_name} (answer)" ) + + # Add timeout to prevent blocking message processing indefinitely + ice_gathering_timeout = 10.0 # 10 seconds timeout + start_time = asyncio.get_event_loop().time() + while pc.iceGatheringState != "complete": + current_time = asyncio.get_event_loop().time() + if current_time - start_time > ice_gathering_timeout: + logger.warning(f"ICE gathering timeout ({ice_gathering_timeout}s) for {peer_name} (answer), proceeding anyway") + break await asyncio.sleep(0.1) logger.debug(