From 4d218864d8c7589f9233d5d79aea5eee6fc5283d Mon Sep 17 00:00:00 2001 From: James Ketrenos Date: Fri, 5 Sep 2025 12:00:00 -0700 Subject: [PATCH] Fixed WebRTC in bots --- voicebot/bot_orchestrator.py | 6 +- voicebot/webrtc_signaling.py | 207 +++++++++++++++++++++++++++-------- 2 files changed, 165 insertions(+), 48 deletions(-) diff --git a/voicebot/bot_orchestrator.py b/voicebot/bot_orchestrator.py index fa70bff..5d97663 100644 --- a/voicebot/bot_orchestrator.py +++ b/voicebot/bot_orchestrator.py @@ -35,12 +35,12 @@ _server_url: Optional[str] = None _voicebot_url: Optional[str] = None _insecure: bool = False _provider_id: Optional[str] = None -_reconnect_task: Optional[asyncio.Task] = None +_reconnect_task: Optional[asyncio.Task[None]] = None _shutdown_event = asyncio.Event() _provider_registration_status: bool = False -def get_provider_registration_status() -> dict: +def get_provider_registration_status() -> dict[str, bool | str | float | None]: """Get the current provider registration status for use by bot clients.""" return { "is_registered": _provider_registration_status, @@ -410,7 +410,7 @@ def get_bot_config_schema(bot_name: str): @app.post("/bots/{bot_name}/config") -async def update_bot_config(bot_name: str, config_data: dict): +async def update_bot_config(bot_name: str, config_data: dict[str, Any]) -> dict[str, str | bool]: """Update bot configuration for a specific lobby.""" if bot_name not in _bot_registry: raise HTTPException(status_code=404, detail=f"Bot '{bot_name}' not found") diff --git a/voicebot/webrtc_signaling.py b/voicebot/webrtc_signaling.py index c014208..bc2d0e3 100644 --- a/voicebot/webrtc_signaling.py +++ b/voicebot/webrtc_signaling.py @@ -791,12 +791,14 @@ class WebRTCSignalingClient: pc.on("icecandidate")(on_ice_candidate) - # Add local tracks + # Add local tracks with proper transceiver configuration for track in self.local_tracks.values(): logger.debug( f"_handle_add_peer: Adding local track {track.kind} to {peer_name}" ) - pc.addTrack(track) + # Add track with explicit transceiver direction to ensure proper SDP generation + transceiver = pc.addTransceiver(track, direction="sendrecv") + logger.debug(f"_handle_add_peer: Added transceiver for {track.kind}, mid: {transceiver.mid}") # Create offer if needed if should_create_offer: @@ -805,6 +807,29 @@ class WebRTCSignalingClient: if self.on_peer_added: await self.on_peer_added(peer) + def _clean_sdp(self, sdp: str) -> str: + """Clean and validate SDP to ensure proper BUNDLE groups""" + lines = sdp.split('\n') + cleaned_lines: list[str] = [] + bundle_mids: list[str] = [] + current_mid: str | None = None + + for line in lines: + if line.startswith('a=mid:'): + current_mid = line.split(':', 1)[1].strip() + if current_mid: # Only add non-empty MIDs + bundle_mids.append(current_mid) + cleaned_lines.append(line) + elif line.startswith('a=group:BUNDLE'): + # Rebuild BUNDLE group with valid MIDs only + if bundle_mids: + cleaned_lines.append(f'a=group:BUNDLE {" ".join(bundle_mids)}') + # Skip original BUNDLE line to avoid duplicates + else: + cleaned_lines.append(line) + + return '\n'.join(cleaned_lines) + async def _create_and_send_offer(self, peer_id: str, peer_name: str, pc: RTCPeerConnection): """Create and send an offer to a peer""" self.initiated_offer.add(peer_id) @@ -820,6 +845,14 @@ class WebRTCSignalingClient: await pc.setLocalDescription(offer) logger.debug(f"_handle_add_peer: Local description set for {peer_name}") + # Clean the SDP to ensure proper BUNDLE groups + cleaned_sdp = self._clean_sdp(offer.sdp) + if cleaned_sdp != offer.sdp: + logger.debug(f"_create_and_send_offer: Cleaned SDP for {peer_name}") + # Update the offer with cleaned SDP + offer = RTCSessionDescription(sdp=cleaned_sdp, type=offer.type) + await pc.setLocalDescription(offer) + # WORKAROUND for aiortc icecandidate event not firing (GitHub issue #1344) # Use Method 2: Complete SDP approach to extract ICE candidates logger.debug( @@ -851,6 +884,10 @@ class WebRTCSignalingClient: peer_name=peer_name, session_description=session_desc_typed, ) + + # Debug log the SDP to help diagnose issues + logger.debug(f"_create_and_send_offer: SDP for {peer_name}:\n{offer.sdp[:500]}...") + await self._send_message( "relaySessionDescription", session_desc_model.model_dump(), @@ -866,58 +903,106 @@ class WebRTCSignalingClient: async def _extract_and_send_candidates(self, peer_id: str, peer_name: str, pc: RTCPeerConnection): """Extract ICE candidates from SDP and send them""" - # Parse ICE candidates from the local SDP + if not pc.localDescription or not pc.localDescription.sdp: + logger.warning(f"_extract_and_send_candidates: No local description for {peer_name}") + return + sdp_lines = pc.localDescription.sdp.split("\n") - candidate_lines = [ - line for line in sdp_lines if line.startswith("a=candidate:") - ] - - # Track which media section we're in to determine sdpMid and sdpMLineIndex + + # Parse SDP structure to map media sections to their MIDs + media_sections: list[tuple[int, str]] = [] # List of (media_index, mid) current_media_index = -1 - current_mid = None - + current_mid: str | None = None + + # First pass: identify media sections and their MIDs for line in sdp_lines: - if line.startswith("m="): # Media section + line = line.strip() + if line.startswith("m="): # Media section start current_media_index += 1 + current_mid = None # Reset MID for this section elif line.startswith("a=mid:"): # Media ID current_mid = line.split(":", 1)[1].strip() + if current_mid: + media_sections.append((current_media_index, current_mid)) + + logger.debug(f"_extract_and_send_candidates: Found media sections for {peer_name}: {media_sections}") + + # If no MIDs found, fall back to using the transceivers + if not media_sections: + transceivers = pc.getTransceivers() + for i, transceiver in enumerate(transceivers): + if hasattr(transceiver, 'mid') and transceiver.mid: + media_sections.append((i, str(transceiver.mid))) + else: + # Fallback to default MID pattern + media_sections.append((i, str(i))) + logger.debug(f"_extract_and_send_candidates: Using transceiver MIDs for {peer_name}: {media_sections}") + + # Second pass: extract candidates and assign them to media sections + current_media_index = -1 + current_section_mid: str | None = None + candidates_sent = 0 + + for line in sdp_lines: + line = line.strip() + if line.startswith("m="): # Media section start + current_media_index += 1 + # Find the MID for this media section + current_section_mid = None + for media_idx, mid in media_sections: + if media_idx == current_media_index: + current_section_mid = mid + break + + # If no MID found, use the media index as string + if current_section_mid is None: + current_section_mid = str(current_media_index) + elif line.startswith("a=candidate:"): candidate_sdp = line[2:] # Remove 'a=' prefix + + # Only send if we have valid MID and media index + if current_section_mid is not None and current_media_index >= 0: + candidate_model = ICECandidateDictModel( + candidate=candidate_sdp, + sdpMid=current_section_mid, + sdpMLineIndex=current_media_index, + ) + payload_candidate = IceCandidateModel( + peer_id=peer_id, + peer_name=peer_name, + candidate=candidate_model, + ) - candidate_model = ICECandidateDictModel( - candidate=candidate_sdp, - sdpMid=current_mid, - sdpMLineIndex=current_media_index, - ) - payload_candidate = IceCandidateModel( - peer_id=peer_id, - peer_name=peer_name, - candidate=candidate_model, - ) + logger.debug( + f"_extract_and_send_candidates: Sending ICE candidate for {peer_name} (mid={current_section_mid}, idx={current_media_index}): {candidate_sdp[:60]}..." + ) + await self._send_message( + "relayICECandidate", payload_candidate.model_dump() + ) + candidates_sent += 1 + else: + logger.warning(f"_extract_and_send_candidates: Skipping candidate with invalid MID/index for {peer_name}") - logger.debug( - f"_extract_and_send_candidates: Sending extracted ICE candidate for {peer_name}: {candidate_sdp[:60]}..." - ) - await self._send_message( - "relayICECandidate", payload_candidate.model_dump() - ) - - # Send end-of-candidates signal (empty candidate) - end_candidate_model = ICECandidateDictModel( - candidate="", - sdpMid=None, - sdpMLineIndex=None, - ) - payload_end = IceCandidateModel( - peer_id=peer_id, peer_name=peer_name, candidate=end_candidate_model - ) - logger.debug( - f"_extract_and_send_candidates: Sending end-of-candidates signal for {peer_name}" - ) - await self._send_message("relayICECandidate", payload_end.model_dump()) + # Send end-of-candidates signal only if we have valid media sections + if media_sections: + # Use the first media section for end-of-candidates + first_media_idx, first_mid = media_sections[0] + end_candidate_model = ICECandidateDictModel( + candidate="", + sdpMid=first_mid, + sdpMLineIndex=first_media_idx, + ) + payload_end = IceCandidateModel( + peer_id=peer_id, peer_name=peer_name, candidate=end_candidate_model + ) + logger.debug( + f"_extract_and_send_candidates: Sending end-of-candidates signal for {peer_name} (mid={first_mid}, idx={first_media_idx})" + ) + await self._send_message("relayICECandidate", payload_end.model_dump()) logger.debug( - f"_extract_and_send_candidates: Sent {len(candidate_lines)} ICE candidates to {peer_name}" + f"_extract_and_send_candidates: Sent {candidates_sent} ICE candidates to {peer_name}" ) async def _handle_remove_peer(self, data: RemovePeerModel): @@ -951,6 +1036,9 @@ class WebRTCSignalingClient: session_description = data.session_description.model_dump() logger.info(f"Received {session_description['type']} from {peer_name}") + + # Debug log the received SDP to help diagnose issues + logger.debug(f"Received SDP from {peer_name}:\n{session_description['sdp'][:500]}...") pc = self.peer_connections.get(peer_id) if not pc: @@ -967,10 +1055,27 @@ class WebRTCSignalingClient: making_offer or pc.signalingState != "stable" ) we_initiated = peer_id in self.initiated_offer - ignore_offer = we_initiated and offer_collision - + + # For bots, be more polite - always yield to human users in collision + # Bots should generally be the polite peer + ignore_offer = offer_collision and we_initiated + if ignore_offer: - logger.info(f"Ignoring offer from {peer_name} due to collision") + logger.info(f"Ignoring offer from {peer_name} due to collision (bot being polite)") + # Reset our offer state to allow the remote offer to proceed + if peer_id in self.initiated_offer: + self.initiated_offer.remove(peer_id) + self.making_offer[peer_id] = False + self.is_negotiating[peer_id] = False + + # Wait a bit and then retry if connection isn't established + async def retry_connection(): + await asyncio.sleep(1.0) # Wait 1 second + if pc.connectionState not in ["connected", "closed", "failed"]: + logger.info(f"Retrying connection setup for {peer_name} after collision") + # Don't create offer, let the remote peer drive + + asyncio.create_task(retry_connection()) return try: @@ -1019,6 +1124,14 @@ class WebRTCSignalingClient: """Create and send an answer to a peer""" try: answer = await pc.createAnswer() + + # Clean the SDP to ensure proper BUNDLE groups + cleaned_sdp = self._clean_sdp(answer.sdp) + if cleaned_sdp != answer.sdp: + logger.debug(f"_create_and_send_answer: Cleaned SDP for {peer_name}") + # Update the answer with cleaned SDP + answer = RTCSessionDescription(sdp=cleaned_sdp, type=answer.type) + await pc.setLocalDescription(answer) # WORKAROUND for aiortc icecandidate event not firing (GitHub issue #1344) @@ -1052,6 +1165,10 @@ class WebRTCSignalingClient: peer_name=peer_name, session_description=session_desc_typed, ) + + # Debug log the SDP to help diagnose issues + logger.debug(f"_create_and_send_answer: SDP for {peer_name}:\n{answer.sdp[:500]}...") + await self._send_message( "relaySessionDescription", session_desc_model.model_dump(),