Fixed WebRTC in bots

This commit is contained in:
James Ketr 2025-09-05 12:00:00 -07:00
parent a525e3467d
commit 4d218864d8
2 changed files with 165 additions and 48 deletions

View File

@ -35,12 +35,12 @@ _server_url: Optional[str] = None
_voicebot_url: Optional[str] = None
_insecure: bool = False
_provider_id: Optional[str] = None
_reconnect_task: Optional[asyncio.Task] = None
_reconnect_task: Optional[asyncio.Task[None]] = None
_shutdown_event = asyncio.Event()
_provider_registration_status: bool = False
def get_provider_registration_status() -> dict:
def get_provider_registration_status() -> dict[str, bool | str | float | None]:
"""Get the current provider registration status for use by bot clients."""
return {
"is_registered": _provider_registration_status,
@ -410,7 +410,7 @@ def get_bot_config_schema(bot_name: str):
@app.post("/bots/{bot_name}/config")
async def update_bot_config(bot_name: str, config_data: dict):
async def update_bot_config(bot_name: str, config_data: dict[str, Any]) -> dict[str, str | bool]:
"""Update bot configuration for a specific lobby."""
if bot_name not in _bot_registry:
raise HTTPException(status_code=404, detail=f"Bot '{bot_name}' not found")

View File

@ -791,12 +791,14 @@ class WebRTCSignalingClient:
pc.on("icecandidate")(on_ice_candidate)
# Add local tracks
# Add local tracks with proper transceiver configuration
for track in self.local_tracks.values():
logger.debug(
f"_handle_add_peer: Adding local track {track.kind} to {peer_name}"
)
pc.addTrack(track)
# Add track with explicit transceiver direction to ensure proper SDP generation
transceiver = pc.addTransceiver(track, direction="sendrecv")
logger.debug(f"_handle_add_peer: Added transceiver for {track.kind}, mid: {transceiver.mid}")
# Create offer if needed
if should_create_offer:
@ -805,6 +807,29 @@ class WebRTCSignalingClient:
if self.on_peer_added:
await self.on_peer_added(peer)
def _clean_sdp(self, sdp: str) -> str:
"""Clean and validate SDP to ensure proper BUNDLE groups"""
lines = sdp.split('\n')
cleaned_lines: list[str] = []
bundle_mids: list[str] = []
current_mid: str | None = None
for line in lines:
if line.startswith('a=mid:'):
current_mid = line.split(':', 1)[1].strip()
if current_mid: # Only add non-empty MIDs
bundle_mids.append(current_mid)
cleaned_lines.append(line)
elif line.startswith('a=group:BUNDLE'):
# Rebuild BUNDLE group with valid MIDs only
if bundle_mids:
cleaned_lines.append(f'a=group:BUNDLE {" ".join(bundle_mids)}')
# Skip original BUNDLE line to avoid duplicates
else:
cleaned_lines.append(line)
return '\n'.join(cleaned_lines)
async def _create_and_send_offer(self, peer_id: str, peer_name: str, pc: RTCPeerConnection):
"""Create and send an offer to a peer"""
self.initiated_offer.add(peer_id)
@ -820,6 +845,14 @@ class WebRTCSignalingClient:
await pc.setLocalDescription(offer)
logger.debug(f"_handle_add_peer: Local description set for {peer_name}")
# Clean the SDP to ensure proper BUNDLE groups
cleaned_sdp = self._clean_sdp(offer.sdp)
if cleaned_sdp != offer.sdp:
logger.debug(f"_create_and_send_offer: Cleaned SDP for {peer_name}")
# Update the offer with cleaned SDP
offer = RTCSessionDescription(sdp=cleaned_sdp, type=offer.type)
await pc.setLocalDescription(offer)
# WORKAROUND for aiortc icecandidate event not firing (GitHub issue #1344)
# Use Method 2: Complete SDP approach to extract ICE candidates
logger.debug(
@ -851,6 +884,10 @@ class WebRTCSignalingClient:
peer_name=peer_name,
session_description=session_desc_typed,
)
# Debug log the SDP to help diagnose issues
logger.debug(f"_create_and_send_offer: SDP for {peer_name}:\n{offer.sdp[:500]}...")
await self._send_message(
"relaySessionDescription",
session_desc_model.model_dump(),
@ -866,58 +903,106 @@ class WebRTCSignalingClient:
async def _extract_and_send_candidates(self, peer_id: str, peer_name: str, pc: RTCPeerConnection):
"""Extract ICE candidates from SDP and send them"""
# Parse ICE candidates from the local SDP
if not pc.localDescription or not pc.localDescription.sdp:
logger.warning(f"_extract_and_send_candidates: No local description for {peer_name}")
return
sdp_lines = pc.localDescription.sdp.split("\n")
candidate_lines = [
line for line in sdp_lines if line.startswith("a=candidate:")
]
# Track which media section we're in to determine sdpMid and sdpMLineIndex
# Parse SDP structure to map media sections to their MIDs
media_sections: list[tuple[int, str]] = [] # List of (media_index, mid)
current_media_index = -1
current_mid = None
current_mid: str | None = None
# First pass: identify media sections and their MIDs
for line in sdp_lines:
if line.startswith("m="): # Media section
line = line.strip()
if line.startswith("m="): # Media section start
current_media_index += 1
current_mid = None # Reset MID for this section
elif line.startswith("a=mid:"): # Media ID
current_mid = line.split(":", 1)[1].strip()
if current_mid:
media_sections.append((current_media_index, current_mid))
logger.debug(f"_extract_and_send_candidates: Found media sections for {peer_name}: {media_sections}")
# If no MIDs found, fall back to using the transceivers
if not media_sections:
transceivers = pc.getTransceivers()
for i, transceiver in enumerate(transceivers):
if hasattr(transceiver, 'mid') and transceiver.mid:
media_sections.append((i, str(transceiver.mid)))
else:
# Fallback to default MID pattern
media_sections.append((i, str(i)))
logger.debug(f"_extract_and_send_candidates: Using transceiver MIDs for {peer_name}: {media_sections}")
# Second pass: extract candidates and assign them to media sections
current_media_index = -1
current_section_mid: str | None = None
candidates_sent = 0
for line in sdp_lines:
line = line.strip()
if line.startswith("m="): # Media section start
current_media_index += 1
# Find the MID for this media section
current_section_mid = None
for media_idx, mid in media_sections:
if media_idx == current_media_index:
current_section_mid = mid
break
# If no MID found, use the media index as string
if current_section_mid is None:
current_section_mid = str(current_media_index)
elif line.startswith("a=candidate:"):
candidate_sdp = line[2:] # Remove 'a=' prefix
# Only send if we have valid MID and media index
if current_section_mid is not None and current_media_index >= 0:
candidate_model = ICECandidateDictModel(
candidate=candidate_sdp,
sdpMid=current_section_mid,
sdpMLineIndex=current_media_index,
)
payload_candidate = IceCandidateModel(
peer_id=peer_id,
peer_name=peer_name,
candidate=candidate_model,
)
candidate_model = ICECandidateDictModel(
candidate=candidate_sdp,
sdpMid=current_mid,
sdpMLineIndex=current_media_index,
)
payload_candidate = IceCandidateModel(
peer_id=peer_id,
peer_name=peer_name,
candidate=candidate_model,
)
logger.debug(
f"_extract_and_send_candidates: Sending ICE candidate for {peer_name} (mid={current_section_mid}, idx={current_media_index}): {candidate_sdp[:60]}..."
)
await self._send_message(
"relayICECandidate", payload_candidate.model_dump()
)
candidates_sent += 1
else:
logger.warning(f"_extract_and_send_candidates: Skipping candidate with invalid MID/index for {peer_name}")
logger.debug(
f"_extract_and_send_candidates: Sending extracted ICE candidate for {peer_name}: {candidate_sdp[:60]}..."
)
await self._send_message(
"relayICECandidate", payload_candidate.model_dump()
)
# Send end-of-candidates signal (empty candidate)
end_candidate_model = ICECandidateDictModel(
candidate="",
sdpMid=None,
sdpMLineIndex=None,
)
payload_end = IceCandidateModel(
peer_id=peer_id, peer_name=peer_name, candidate=end_candidate_model
)
logger.debug(
f"_extract_and_send_candidates: Sending end-of-candidates signal for {peer_name}"
)
await self._send_message("relayICECandidate", payload_end.model_dump())
# Send end-of-candidates signal only if we have valid media sections
if media_sections:
# Use the first media section for end-of-candidates
first_media_idx, first_mid = media_sections[0]
end_candidate_model = ICECandidateDictModel(
candidate="",
sdpMid=first_mid,
sdpMLineIndex=first_media_idx,
)
payload_end = IceCandidateModel(
peer_id=peer_id, peer_name=peer_name, candidate=end_candidate_model
)
logger.debug(
f"_extract_and_send_candidates: Sending end-of-candidates signal for {peer_name} (mid={first_mid}, idx={first_media_idx})"
)
await self._send_message("relayICECandidate", payload_end.model_dump())
logger.debug(
f"_extract_and_send_candidates: Sent {len(candidate_lines)} ICE candidates to {peer_name}"
f"_extract_and_send_candidates: Sent {candidates_sent} ICE candidates to {peer_name}"
)
async def _handle_remove_peer(self, data: RemovePeerModel):
@ -951,6 +1036,9 @@ class WebRTCSignalingClient:
session_description = data.session_description.model_dump()
logger.info(f"Received {session_description['type']} from {peer_name}")
# Debug log the received SDP to help diagnose issues
logger.debug(f"Received SDP from {peer_name}:\n{session_description['sdp'][:500]}...")
pc = self.peer_connections.get(peer_id)
if not pc:
@ -967,10 +1055,27 @@ class WebRTCSignalingClient:
making_offer or pc.signalingState != "stable"
)
we_initiated = peer_id in self.initiated_offer
ignore_offer = we_initiated and offer_collision
# For bots, be more polite - always yield to human users in collision
# Bots should generally be the polite peer
ignore_offer = offer_collision and we_initiated
if ignore_offer:
logger.info(f"Ignoring offer from {peer_name} due to collision")
logger.info(f"Ignoring offer from {peer_name} due to collision (bot being polite)")
# Reset our offer state to allow the remote offer to proceed
if peer_id in self.initiated_offer:
self.initiated_offer.remove(peer_id)
self.making_offer[peer_id] = False
self.is_negotiating[peer_id] = False
# Wait a bit and then retry if connection isn't established
async def retry_connection():
await asyncio.sleep(1.0) # Wait 1 second
if pc.connectionState not in ["connected", "closed", "failed"]:
logger.info(f"Retrying connection setup for {peer_name} after collision")
# Don't create offer, let the remote peer drive
asyncio.create_task(retry_connection())
return
try:
@ -1019,6 +1124,14 @@ class WebRTCSignalingClient:
"""Create and send an answer to a peer"""
try:
answer = await pc.createAnswer()
# Clean the SDP to ensure proper BUNDLE groups
cleaned_sdp = self._clean_sdp(answer.sdp)
if cleaned_sdp != answer.sdp:
logger.debug(f"_create_and_send_answer: Cleaned SDP for {peer_name}")
# Update the answer with cleaned SDP
answer = RTCSessionDescription(sdp=cleaned_sdp, type=answer.type)
await pc.setLocalDescription(answer)
# WORKAROUND for aiortc icecandidate event not firing (GitHub issue #1344)
@ -1052,6 +1165,10 @@ class WebRTCSignalingClient:
peer_name=peer_name,
session_description=session_desc_typed,
)
# Debug log the SDP to help diagnose issues
logger.debug(f"_create_and_send_answer: SDP for {peer_name}:\n{answer.sdp[:500]}...")
await self._send_message(
"relaySessionDescription",
session_desc_model.model_dump(),