Maybe working?

This commit is contained in:
James Ketr 2025-09-08 16:03:53 -07:00
parent b8929a9a5e
commit afdf856f2b
4 changed files with 115 additions and 157 deletions

View File

@ -17,6 +17,8 @@ services:
- ./server:/server:ro # So the frontend can read the OpenAPI spec
- ./client:/client:rw
- ./dev-keys:/keys:ro # So the frontend entrypoint can check for SSL files
- /etc/localtime:/etc/localtime:ro
- /etc/timezone:/etc/timezone:ro
networks:
- ai-voicebot-net
@ -52,6 +54,8 @@ services:
- ./server/.venv:/server/.venv:rw
- ./client/build:/client/build:ro
- ./dev-keys:/keys:ro
- /etc/localtime:/etc/localtime:ro
- /etc/timezone:/etc/timezone:ro
networks:
- ai-voicebot-net
@ -75,6 +79,8 @@ services:
- ./shared:/shared:ro
- ./voicebot:/voicebot:rw
- ./voicebot/.venv:/voicebot/.venv:rw
- /etc/localtime:/etc/localtime:ro
- /etc/timezone:/etc/timezone:ro
# network_mode: host
networks:
- ai-voicebot-net

View File

@ -97,8 +97,12 @@ class WebSocketConnectionManager:
session.ws = websocket
session.update_last_used()
# Clean up stale session in lobby if needed
# Clean up stale session in lobby if needed. If a stale session
# was present, perform a full re-join so WebRTC peer setup runs
# (this ensures bots and other peers receive addPeer signaling).
stale_rejoin = False
if session.id in lobby.sessions:
stale_rejoin = True
logger.info(f"{session.getName()} - Stale session in lobby {lobby.getName()}. Re-joining.")
try:
# Leave the lobby to clean up peer connections
@ -120,9 +124,17 @@ class WebSocketConnectionManager:
break
except Exception as e:
logger.warning(f"Error cleaning up stale session: {e}")
# Notify existing peers about new user
await self._notify_peers_of_join(session, lobby)
# If this was a stale rejoin, re-add the session to the lobby which will
# trigger WebRTC signaling (addPeer) with existing peers (including bots).
if stale_rejoin:
try:
await session.join_lobby(lobby)
except Exception as e:
logger.warning(f"Error re-joining stale session to lobby: {e}")
else:
# Notify existing peers about a fresh new connection (no automatic join yet)
await self._notify_peers_of_join(session, lobby)
try:
# Message processing loop

View File

@ -216,9 +216,6 @@ class WebRTCSignalingHandlers:
peer_session: The existing peer session in the lobby
lobby: The lobby context
"""
logger.info(
f"[TRACE] handle_add_peer called: session={session.getName()} (id={session.id}), peer_session={peer_session.getName()} (id={peer_session.id}), lobby={lobby.getName()}"
)
# Only establish WebRTC connections if at least one has media
if session.has_media or peer_session.has_media:
# Add peer_session to session's peer list
@ -232,27 +229,23 @@ class WebRTCSignalingHandlers:
if lobby.id not in peer_session.lobby_peers:
peer_session.lobby_peers[lobby.id] = []
peer_session.lobby_peers[lobby.id].append(session.id)
logger.info(
f"[TRACE] {session.getName()} lobby_peers after add: {session.lobby_peers}"
)
# Determine offer roles: bots should never create offers
# When a bot joins: existing humans create offers to the bot
# When a human joins: human creates offers to existing bots, bots don't create offers to human
# Determine who should create the offer based on timing
# In our polite peer implementation, the newer peer typically creates the offer
existing_peer_should_offer = False
new_session_should_offer = True
# Notify existing peer about new peer
logger.info(
f"{session.getName()} -> {peer_session.getName()}:addPeer("
f"{session.getName()}, {lobby.getName()}, should_create_offer={existing_peer_should_offer}, "
f"has_media={session.has_media})"
)
# Determine deterministic offerer to avoid glare/collision:
# the peer with the lexicographically smaller session ID will create the offer.
try:
session_is_offer = session.id < peer_session.id
# Notify existing peer about new peer
peer_should_create_offer = peer_session.id < session.id
logger.info(
f"{session.getName()} -> {peer_session.getName()}:addPeer("
f"{session.getName()}, {lobby.getName()}, should_create_offer={peer_should_create_offer}, "
f"has_media={session.has_media})"
)
if peer_session.ws:
logger.debug(
f"Sending addPeer to {peer_session.getName()} -> peer_id={session.id}, peer_name={session.name}, has_media={session.has_media}, should_create_offer={peer_should_create_offer}"
)
await peer_session.ws.send_json(
{
"type": "addPeer",
@ -260,23 +253,26 @@ class WebRTCSignalingHandlers:
"peer_id": session.id,
"peer_name": session.name,
"has_media": session.has_media,
"should_create_offer": existing_peer_should_offer,
"should_create_offer": peer_should_create_offer,
},
}
)
except Exception as e:
logger.warning(
f"Failed to send addPeer to {peer_session.getName()}: {e}"
)
else:
logger.warning(
f"Not sending addPeer to {peer_session.getName()} because ws is None"
)
# Notify new session about existing peer
logger.info(
f"{session.getName()} -> {session.getName()}:addPeer("
f"{peer_session.getName()}, {lobby.getName()}, should_create_offer={new_session_should_offer}, "
f"has_media={peer_session.has_media})"
)
try:
# Notify new session about existing peer
session_should_create_offer = session_is_offer
logger.info(
f"{session.getName()} -> {session.getName()}:addPeer("
f"{peer_session.getName()}, {lobby.getName()}, should_create_offer={session_should_create_offer}, "
f"has_media={peer_session.has_media})"
)
if session.ws:
logger.debug(
f"Sending addPeer to {session.getName()} -> peer_id={peer_session.id}, peer_name={peer_session.name}, has_media={peer_session.has_media}, should_create_offer={session_should_create_offer}"
)
await session.ws.send_json(
{
"type": "addPeer",
@ -284,12 +280,14 @@ class WebRTCSignalingHandlers:
"peer_id": peer_session.id,
"peer_name": peer_session.name,
"has_media": peer_session.has_media,
"should_create_offer": new_session_should_offer,
"should_create_offer": session_should_create_offer,
},
}
)
else:
logger.warning(f"Not sending addPeer to {session.getName()} because ws is None")
except Exception as e:
logger.warning(f"Failed to send addPeer to {session.getName()}: {e}")
logger.warning(f"Failed to send addPeer messages for pair {session.getName()} <-> {peer_session.getName()}: {e}")
else:
logger.info(
f"{session.getName()} - Skipping WebRTC connection with "
@ -312,9 +310,6 @@ class WebRTCSignalingHandlers:
peer_session: The peer session to disconnect from
lobby: The lobby context
"""
logger.info(
f"[TRACE] handle_remove_peer called: session={session.getName()} (id={session.id}), peer_session={peer_session.getName()} (id={peer_session.id}), lobby={lobby.getName()}"
)
# Notify peer about session removal
if peer_session.ws:
logger.info(

View File

@ -442,8 +442,8 @@ class WebRTCSignalingClient:
async def _setup_local_media(self):
"""Create local media tracks"""
# Always clear out old tracks to avoid reusing stopped or stale tracks
self.local_tracks.clear()
# If a bot provided a create_tracks callable, use it to create tracks.
# Otherwise, use default synthetic tracks.
try:
if self.create_tracks:
tracks = self.create_tracks(self.session_name)
@ -619,6 +619,7 @@ class WebRTCSignalingClient:
logger.info(
f"User joined: {validated.name} (session: {validated.session_id})"
)
logger.debug(f"user_joined payload: {validated}")
elif msg_type == "lobby_state":
try:
validated = LobbyStateModel.model_validate(data)
@ -665,6 +666,34 @@ class WebRTCSignalingClient:
logger.error(f"Error in chat message callback: {e}", exc_info=True)
elif msg_type == "error":
logger.error(f"Received error from signaling server: {data}")
elif msg_type == "addPeer":
# Log raw addPeer receipt for debugging
try:
validated = AddPeerModel.model_validate(data)
except ValidationError as e:
logger.error(f"Invalid addPeer payload: {e}", exc_info=True)
return
logger.info(f"Received addPeer for peer {validated.peer_name} (peer_id={validated.peer_id}) should_create_offer={validated.should_create_offer}")
logger.debug(f"addPeer payload: {validated}")
await self._handle_add_peer(validated)
elif msg_type == "sessionDescription":
try:
validated = SessionDescriptionModel.model_validate(data)
except ValidationError as e:
logger.error(f"Invalid sessionDescription payload: {e}", exc_info=True)
return
logger.info(f"Received sessionDescription from {validated.peer_name} (peer_id={validated.peer_id})")
logger.debug(f"sessionDescription payload keys: {list(data.keys())}")
await self._handle_session_description(validated)
elif msg_type == "iceCandidate":
try:
validated = IceCandidateModel.model_validate(data)
except ValidationError as e:
logger.error(f"Invalid iceCandidate payload: {e}", exc_info=True)
return
logger.info(f"Received iceCandidate from {validated.peer_name} (peer_id={validated.peer_id})")
logger.debug(f"iceCandidate payload: {validated}")
await self._handle_ice_candidate(validated)
else:
logger.info(f"Unhandled message type: {msg_type} with data: {data}")
@ -679,66 +708,26 @@ class WebRTCSignalingClient:
logger.info(
f"Adding peer: {peer_name} (should_create_offer: {should_create_offer})"
)
logger.info(f"_handle_add_peer: Current peer_connections count: {len(self.peer_connections)}")
logger.info(f"_handle_add_peer: Current active peers: {list(self.peer_connections.keys())}")
logger.debug(
f"_handle_add_peer: peer_id={peer_id}, peer_name={peer_name}, should_create_offer={should_create_offer}"
)
# Check if peer already exists and clean up stale connections
# Check if peer already exists
if peer_id in self.peer_connections:
pc = self.peer_connections[peer_id]
logger.info(f"_handle_add_peer: Found existing connection for {peer_name} (state: {pc.connectionState})")
logger.info("_handle_add_peer: Cleaning up existing connection to ensure fresh start")
# Always clean up existing connection for fresh start
# This prevents stale connection issues during page refresh
try:
await pc.close()
except Exception as e:
logger.warning(f"Error closing existing connection for {peer_name}: {e}")
# Clean up all associated state
self.is_negotiating.pop(peer_id, None)
self.making_offer.pop(peer_id, None)
self.initiated_offer.discard(peer_id)
self.pending_ice_candidates.pop(peer_id, None)
self.peers.pop(peer_id, None) # Also clean up peers dictionary
del self.peer_connections[peer_id]
# Give the connection a moment to close, but don't block indefinitely
# If it doesn't close quickly, proceed anyway to avoid blocking other operations
try:
await asyncio.wait_for(
self._wait_for_connection_close(pc, peer_name),
timeout=1.0
logger.debug(
f"_handle_add_peer: Existing connection state: {pc.connectionState}"
)
if pc.connectionState in ["new", "connected", "connecting"]:
logger.info(f"Peer connection already exists for {peer_name}")
return
else:
# Clean up stale connection
logger.debug(
f"_handle_add_peer: Closing stale connection for {peer_name}"
)
except asyncio.TimeoutError:
logger.warning(f"Connection to {peer_name} did not close within timeout, proceeding anyway")
# Also check for any existing connections with the same peer name but different ID
# This handles cases where the frontend gets a new session ID after refresh
connections_to_remove: list[str] = []
for existing_peer_id, existing_peer in self.peers.items():
if existing_peer.peer_name == peer_name and existing_peer_id != peer_id:
logger.info(f"_handle_add_peer: Found existing connection for peer name {peer_name} with different ID {existing_peer_id}, cleaning up")
connections_to_remove.append(existing_peer_id)
for old_peer_id in connections_to_remove:
if old_peer_id in self.peer_connections:
pc = self.peer_connections[old_peer_id]
try:
await pc.close()
except Exception as e:
logger.warning(f"Error closing old connection for {peer_name}: {e}")
# Clean up all associated state for old connection
self.is_negotiating.pop(old_peer_id, None)
self.making_offer.pop(old_peer_id, None)
self.initiated_offer.discard(old_peer_id)
self.pending_ice_candidates.pop(old_peer_id, None)
self.peers.pop(old_peer_id, None)
del self.peer_connections[old_peer_id]
await pc.close()
del self.peer_connections[peer_id]
# Create new peer
peer = Peer(session_id=peer_id, peer_name=peer_name, local=False)
@ -753,6 +742,8 @@ class WebRTCSignalingClient:
username="ketra",
credential="ketran",
),
# Add Google's public STUN server as fallback
RTCIceServer(urls="stun:stun.l.google.com:19302"),
],
)
logger.debug(
@ -772,39 +763,12 @@ class WebRTCSignalingClient:
# Add connection state change handler
def on_connection_state_change() -> None:
logger.info(f"🔄 Connection state changed for {peer_name}: {pc.connectionState}")
# Log additional details for debugging connection closure/failure
logger.info(f" ICE connection state: {pc.iceConnectionState}")
logger.info(f" Signaling state: {pc.signalingState}")
# If available, log the last ICE candidate pair and any error details
selected_pair = None
try:
if hasattr(pc, 'sctp') and pc.sctp:
selected_pair = getattr(pc.sctp.transport, 'iceTransport', None)
elif hasattr(pc, 'iceTransport'):
selected_pair = pc.iceTransport
except Exception as e:
logger.warning(f"Error accessing selected ICE pair: {e}")
if selected_pair:
logger.info(f" Selected ICE candidate pair: {getattr(selected_pair, 'selectedCandidatePair', None)}")
# Handle failed connections - could be due to network issues during refresh
if pc.connectionState == "failed":
logger.warning(f"❌ Connection to {peer_name} failed - this might be due to page refresh")
elif pc.connectionState == "disconnected":
logger.info(f"⚠️ Connection to {peer_name} disconnected")
elif pc.connectionState == "connected":
logger.info(f"✅ Connection to {peer_name} established successfully")
elif pc.connectionState == "closed":
logger.info(f"🔒 Connection to {peer_name} closed")
logger.info(f"Connection state: {pc.connectionState}")
pc.on("connectionstatechange")(on_connection_state_change)
logger.info(f"📝 Storing peer connection for {peer_name} (ID: {peer_id})")
self.peer_connections[peer_id] = pc
peer.connection = pc
logger.info(f"📊 Total peer connections now: {len(self.peer_connections)}")
# Set up event handlers
def on_track(track: MediaStreamTrack) -> None:
@ -1108,33 +1072,19 @@ class WebRTCSignalingClient:
# Close peer connection
if peer_id in self.peer_connections:
pc = self.peer_connections[peer_id]
try:
await pc.close()
# Wait for connection to be properly closed
while pc.connectionState not in ["closed", "failed"]:
logger.debug(f"Waiting for connection to {peer_name} to close (current state: {pc.connectionState})")
# Brief yield to allow cleanup
await asyncio.sleep(0.01)
except Exception as e:
logger.warning(f"Error closing peer connection for {peer_name}: {e}")
await pc.close()
del self.peer_connections[peer_id]
# Extra forced cleanup: remove all state for this peer_id
for state_dict in [self.is_negotiating, self.making_offer, self.pending_ice_candidates]:
if peer_id in state_dict:
logger.debug(f"Force removing {peer_id} from {state_dict}")
state_dict.pop(peer_id, None)
# Clean up state
self.is_negotiating.pop(peer_id, None)
self.making_offer.pop(peer_id, None)
self.initiated_offer.discard(peer_id)
self.pending_ice_candidates.pop(peer_id, None)
# Remove peer
peer = self.peers.pop(peer_id, None)
if peer and self.on_peer_removed:
try:
await self.on_peer_removed(peer)
except Exception as e:
logger.warning(f"Error in on_peer_removed callback for {peer_name}: {e}")
logger.info(f"Completed removing peer: {peer_name} (all state reset)")
await self.on_peer_removed(peer)
async def _handle_session_description(self, data: SessionDescriptionModel):
"""Handle sessionDescription message"""
@ -1163,12 +1113,12 @@ class WebRTCSignalingClient:
)
we_initiated = peer_id in self.initiated_offer
# For bots, be more polite - accept remote offers even in collision scenarios
# Only ignore if we're in the middle of sending our own offer AND it's actually conflicting
ignore_offer = offer_collision and we_initiated and pc.signalingState != "stable"
# For bots, be more polite - always yield to human users in collision
# Bots should generally be the polite peer
ignore_offer = offer_collision and we_initiated
if ignore_offer:
logger.info(f"Ignoring offer from {peer_name} due to active collision (bot being polite)")
logger.info(f"Ignoring offer from {peer_name} due to collision (bot being polite)")
# Reset our offer state to allow the remote offer to proceed
if peer_id in self.initiated_offer:
self.initiated_offer.remove(peer_id)
@ -1180,11 +1130,7 @@ class WebRTCSignalingClient:
await asyncio.sleep(1.0) # Wait 1 second
if pc.connectionState not in ["connected", "closed", "failed"]:
logger.info(f"Retrying connection setup for {peer_name} after collision")
# Reset state to allow the remote peer to retry
self.making_offer[peer_id] = False
self.is_negotiating[peer_id] = False
if peer_id in self.initiated_offer:
self.initiated_offer.remove(peer_id)
# Don't create offer, let the remote peer drive
asyncio.create_task(retry_connection())
return
@ -1295,7 +1241,7 @@ class WebRTCSignalingClient:
peer_name = data.peer_name
candidate_data = data.candidate
logger.info(f"Received ICE candidate from {peer_name}: {candidate_data}")
logger.info(f"Received ICE candidate from {peer_name}")
pc = self.peer_connections.get(peer_id)
if not pc:
@ -1333,7 +1279,6 @@ class WebRTCSignalingClient:
except Exception:
cand_type = "unknown"
logger.info(f"Attempting to add ICE candidate for {peer_name}: type={cand_type}, sdp='{sdp_part}'")
try:
rtc_candidate = candidate_from_sdp(sdp_part)
rtc_candidate.sdpMid = candidate_data.sdpMid
@ -1341,7 +1286,7 @@ class WebRTCSignalingClient:
# aiortc expects an object with attributes (RTCIceCandidate)
await pc.addIceCandidate(rtc_candidate)
logger.info(f"ICE candidate successfully added for {peer_name}: type={cand_type}")
logger.info(f"ICE candidate added for {peer_name}: type={cand_type}")
except Exception as e:
logger.error(
f"Failed to add ICE candidate for {peer_name}: type={cand_type} error={e} sdp='{sdp_part}'",