diff --git a/client/src/UserList.tsx b/client/src/UserList.tsx index fc3f20f..f431fca 100644 --- a/client/src/UserList.tsx +++ b/client/src/UserList.tsx @@ -1,11 +1,13 @@ import React, { useState, useEffect, useCallback } from "react"; import Paper from "@mui/material/Paper"; import List from "@mui/material/List"; +import Button from "@mui/material/Button"; import "./UserList.css"; import { MediaControl, MediaAgent, Peer } from "./MediaControl"; import Box from "@mui/material/Box"; import { Session } from "./GlobalContext"; import useWebSocket from "react-use-websocket"; +import { ApiClient, BotLeaveLobbyRequest } from "./api-client"; type User = { name: string; @@ -13,6 +15,9 @@ type User = { live: boolean; local: boolean /* Client side variable */; protected?: boolean; + is_bot?: boolean; + bot_run_id?: string; + bot_provider_id?: string; }; type UserListProps = { @@ -25,6 +30,32 @@ const UserList: React.FC = (props: UserListProps) => { const [users, setUsers] = useState(null); const [peers, setPeers] = useState>({}); const [videoClass, setVideoClass] = useState("Large"); + const [leavingBots, setLeavingBots] = useState>(new Set()); + + const apiClient = new ApiClient(); + + const handleBotLeave = async (user: User) => { + if (!user.is_bot) return; + + setLeavingBots((prev) => new Set(prev).add(user.session_id)); + + try { + const request: BotLeaveLobbyRequest = { + session_id: user.session_id, + }; + + await apiClient.requestBotLeaveLobby(request); + console.log(`Bot ${user.name} leave requested successfully`); + } catch (error) { + console.error("Failed to request bot leave:", error); + } finally { + setLeavingBots((prev) => { + const newSet = new Set(prev); + newSet.delete(user.session_id); + return newSet; + }); + } + }; const sortUsers = useCallback( (A: any, B: any) => { @@ -105,15 +136,34 @@ const UserList: React.FC = (props: UserListProps) => { className={`UserEntry ${user.local ? "UserSelf" : ""}`} >
-
-
{user.name ? user.name : user.session_id}
- {user.protected && ( -
+
+
{user.name ? user.name : user.session_id}
+ {user.protected && ( +
+ 🔒 +
+ )} + {user.is_bot && ( +
+ 🤖 +
+ )} +
+ {user.is_bot && !user.local && ( +
+ {leavingBots.has(user.session_id) ? "..." : "Leave"} + )}
{user.name && !user.live &&
} diff --git a/client/src/api-client.ts b/client/src/api-client.ts index 52ac194..fff0a67 100644 --- a/client/src/api-client.ts +++ b/client/src/api-client.ts @@ -57,6 +57,16 @@ export interface BotJoinLobbyResponse { provider_id: string; } +export interface BotLeaveLobbyRequest { + session_id: string; +} + +export interface BotLeaveLobbyResponse { + status: string; + session_id: string; + run_id?: string; +} + export class ApiError extends Error { constructor(public status: number, public statusText: string, public data?: any) { super(`HTTP ${status}: ${statusText}`); @@ -194,6 +204,13 @@ export class ApiClient { ); } + async requestBotLeaveLobby(request: BotLeaveLobbyRequest): Promise { + return this.request(this.getApiPath("/ai-voicebot/api/bots/leave"), { + method: "POST", + body: request, + }); + } + // Auto-generated endpoints will be added here by update-api-client.js // DO NOT MANUALLY EDIT BELOW THIS LINE diff --git a/server/main.py b/server/main.py index 9dda2f6..22946c9 100644 --- a/server/main.py +++ b/server/main.py @@ -62,6 +62,8 @@ from shared.models import ( BotJoinLobbyRequest, BotJoinLobbyResponse, BotJoinPayload, + BotLeaveLobbyRequest, + BotLeaveLobbyResponse, ) @@ -327,6 +329,9 @@ class Lobby: protected=True if s.name and s.name.lower() in name_passwords else False, + is_bot=s.is_bot, + bot_run_id=s.bot_run_id, + bot_provider_id=s.bot_provider_id, ) for s in self.sessions.values() if s.name @@ -453,8 +458,8 @@ class Session: _loaded = False lock = threading.RLock() # Thread safety for class-level operations - def __init__(self, id: str): - logger.info(f"Instantiating new session {id}") + def __init__(self, id: str, is_bot: bool = False): + logger.info(f"Instantiating new session {id} (bot: {is_bot})") with Session.lock: self._instances.append(self) self.id = id @@ -468,6 +473,9 @@ class Session: self.created_at = time.time() self.last_used = time.time() self.displaced_at: float | None = None # When name was taken over + self.is_bot = is_bot # Whether this session represents a bot + self.bot_run_id: str | None = None # Bot run ID for tracking + self.bot_provider_id: str | None = None # Bot provider ID self.session_lock = threading.RLock() # Instance-level lock self.save() @@ -492,6 +500,9 @@ class Session: created_at=s.created_at, last_used=s.last_used, displaced_at=s.displaced_at, + is_bot=s.is_bot, + bot_run_id=s.bot_run_id, + bot_provider_id=s.bot_provider_id, ) ) @@ -573,12 +584,16 @@ class Session: logger.info(f"Expiring session {s_saved.id[:8]}:{name} during load") continue # Skip loading this expired session - session = Session(s_saved.id) + session = Session(s_saved.id, is_bot=getattr(s_saved, "is_bot", False)) session.name = name # Load timestamps, with defaults for backward compatibility session.created_at = created_at session.last_used = last_used session.displaced_at = displaced_at + # Load bot information with defaults for backward compatibility + session.is_bot = getattr(s_saved, "is_bot", False) + session.bot_run_id = getattr(s_saved, "bot_run_id", None) + session.bot_provider_id = getattr(s_saved, "bot_provider_id", None) for lobby_saved in s_saved.lobbies: session.lobbies.append( Lobby( @@ -1434,8 +1449,8 @@ async def request_bot_join_lobby( bot_session_id = secrets.token_hex(16) # Create the Session object for the bot - bot_session = Session(bot_session_id) - logger.info(f"Created session for bot: {bot_session.getName()}") + bot_session = Session(bot_session_id, is_bot=True) + logger.info(f"Created bot session for: {bot_session.getName()}") # Determine server URL for the bot to connect back to # Use the server's public URL or construct from request @@ -1467,6 +1482,12 @@ async def request_bot_join_lobby( result = response.json() run_id = result.get("run_id", "unknown") + # Update bot session with run and provider information + with bot_session.session_lock: + bot_session.bot_run_id = run_id + bot_session.bot_provider_id = target_provider_id + bot_session.setName(bot_nick) + logger.info( f"Bot {bot_name} requested to join lobby {request.lobby_id}" ) @@ -1493,6 +1514,70 @@ async def request_bot_join_lobby( raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") +@app.post(public_url + "api/bots/leave", response_model=BotLeaveLobbyResponse) +async def request_bot_leave_lobby( + request: BotLeaveLobbyRequest, +) -> BotLeaveLobbyResponse: + """Request a bot to leave from all lobbies and disconnect""" + + # Find the bot session + bot_session = getSession(request.session_id) + if not bot_session: + raise HTTPException(status_code=404, detail="Bot session not found") + + if not bot_session.is_bot: + raise HTTPException(status_code=400, detail="Session is not a bot") + + run_id = bot_session.bot_run_id + provider_id = bot_session.bot_provider_id + + logger.info(f"Requesting bot {bot_session.getName()} to leave all lobbies") + + # Try to stop the bot at the provider level if we have the information + if provider_id and run_id and provider_id in bot_providers: + provider = bot_providers[provider_id] + try: + async with httpx.AsyncClient() as client: + response = await client.post( + f"{provider.base_url}/bots/runs/{run_id}/stop", + timeout=5.0, + ) + if response.status_code == 200: + logger.info( + f"Successfully requested bot provider to stop run {run_id}" + ) + else: + logger.warning( + f"Bot provider returned error when stopping: HTTP {response.status_code}" + ) + except Exception as e: + logger.warning(f"Failed to request bot stop from provider: {e}") + + # Force disconnect the bot session from all lobbies + with bot_session.session_lock: + lobbies_to_part = bot_session.lobbies[:] + + for lobby in lobbies_to_part: + try: + await bot_session.part(lobby) + except Exception as e: + logger.warning(f"Error parting bot from lobby {lobby.getName()}: {e}") + + # Close WebSocket connection if it exists + if bot_session.ws: + try: + await bot_session.ws.close() + except Exception as e: + logger.warning(f"Error closing bot WebSocket: {e}") + bot_session.ws = None + + return BotLeaveLobbyResponse( + status="disconnected", + session_id=request.session_id, + run_id=run_id, + ) + + # Register websocket endpoint directly on app with full public_url path @app.websocket(f"{public_url}" + "ws/lobby/{lobby_id}/{session_id}") async def lobby_join( diff --git a/shared/models.py b/shared/models.py index 5effd3d..4341401 100644 --- a/shared/models.py +++ b/shared/models.py @@ -43,6 +43,9 @@ class ParticipantModel(BaseModel): session_id: str live: bool protected: bool + is_bot: bool = False + bot_run_id: Optional[str] = None + bot_provider_id: Optional[str] = None # ============================================================================= @@ -312,6 +315,9 @@ class SessionSaved(BaseModel): created_at: float = 0.0 last_used: float = 0.0 displaced_at: Optional[float] = None # When name was taken over + is_bot: bool = False # Whether this session represents a bot + bot_run_id: Optional[str] = None # Bot run ID for tracking + bot_provider_id: Optional[str] = None # Bot provider ID class SessionsPayload(BaseModel): @@ -397,3 +403,17 @@ class BotJoinLobbyResponse(BaseModel): bot_name: str run_id: str provider_id: str + + +class BotLeaveLobbyRequest(BaseModel): + """Request to make a bot leave a lobby""" + + session_id: str # The session ID of the bot to remove + + +class BotLeaveLobbyResponse(BaseModel): + """Response after requesting a bot to leave a lobby""" + + status: str + session_id: str + run_id: Optional[str] = None diff --git a/voicebot/debug_aioice.py b/voicebot/debug_aioice.py deleted file mode 100644 index 86c5bcc..0000000 --- a/voicebot/debug_aioice.py +++ /dev/null @@ -1,110 +0,0 @@ -import time -from logger import logger - -# Defensive monkeypatch: aioice Transaction.__retry may run after the -# underlying datagram transport or loop was torn down which results in -# AttributeError being raised and flooding logs. Wrap the original -# implementation to catch and suppress AttributeError while preserving -# other exceptions. This is a temporary mitigation to keep logs readable -# while we investigate/upstream a proper fix or upgrade aioice. -try: - import aioice.stun as _aioice_stun # type: ignore - - # The method is defined with a double-underscore name (__retry) which - # gets name-mangled. Detect the actual attribute name robustly. - retry_attr_name = None - for name in dir(_aioice_stun.Transaction): - if name.endswith("retry"): - obj = getattr(_aioice_stun.Transaction, name) - if callable(obj): - retry_attr_name = name - _orig_retry = obj - break - - if retry_attr_name is not None: - # Simple in-process dedupe cache so we only log the same AttributeError - # once per interval. This prevents flooding the logs when many - # transactions race to run after shutdown. - _MONKEYPATCH_LOG_CACHE: dict[str, float] = {} - _MONKEYPATCH_LOG_SUPPRESSION_INTERVAL = 5.0 - - def _should_log_once(key: str) -> bool: - now = time.time() - last = _MONKEYPATCH_LOG_CACHE.get(key) - if last is None or (now - last) > _MONKEYPATCH_LOG_SUPPRESSION_INTERVAL: - _MONKEYPATCH_LOG_CACHE[key] = now - return True - return False - - def _safe_transaction_retry(self, *args, **kwargs): # type: ignore - try: - return _orig_retry(self, *args, **kwargs) # type: ignore - except AttributeError as e: # type: ignore - # Transport or event-loop already closed; log once per key - key = f"Transaction.{retry_attr_name}:{e}" - if _should_log_once(key): - logger.warning( - "aioice Transaction.%s AttributeError suppressed: %s", - retry_attr_name, - e, - ) - except Exception: # type: ignore - # Preserve visibility for other unexpected exceptions - logger.exception( - "aioice Transaction.%s raised an unexpected exception", - retry_attr_name, - ) - - setattr(_aioice_stun.Transaction, retry_attr_name, _safe_transaction_retry) # type: ignore - logger.info("Applied safe aioice Transaction.%s monkeypatch", retry_attr_name) - else: - logger.warning("aioice Transaction.__retry not found; skipping monkeypatch") -except Exception as e: - logger.exception("Failed to apply aioice Transaction.__retry monkeypatch: %s", e) - -# Additional defensive patch: wrap the protocol-level send_stun implementation -# (e.g. StunProtocol.send_stun) which ultimately calls the datagram transport's -# sendto. If the transport or its loop is already torn down, sendto can raise -# AttributeError which then triggers asyncio's fatal error path (calling a None -# loop). Wrapping here prevents the flood of selector_events/_fatal_error -# AttributeError traces. -try: - import aioice.ice as _aioice_ice # type: ignore - - # Prefer to patch StunProtocol.send_stun which is used by the ICE code. - send_attr_name = None - if hasattr(_aioice_ice, "StunProtocol"): - proto_cls = getattr(_aioice_ice, "StunProtocol") - for name in dir(proto_cls): - if name.endswith("send_stun"): - attr = getattr(proto_cls, name) - if callable(attr): - send_attr_name = name - _orig_send_stun = attr - break - - if send_attr_name is not None: - - def _safe_send_stun(self, message, addr): # type: ignore - try: - return _orig_send_stun(self, message, addr) # type: ignore - except AttributeError as e: # type: ignore - # Likely transport._sock or transport._loop is None; log once - key = f"StunProtocol.{send_attr_name}:{e}" - if _should_log_once(key): - logger.warning( - "aioice StunProtocol.%s AttributeError suppressed: %s", - send_attr_name, - e, - ) - except Exception: # type: ignore - logger.exception( - "aioice StunProtocol.%s raised unexpected exception", send_attr_name - ) - - setattr(proto_cls, send_attr_name, _safe_send_stun) # type: ignore - logger.info("Applied safe aioice StunProtocol.%s monkeypatch", send_attr_name) - else: - logger.warning("aioice StunProtocol.send_stun not found; skipping monkeypatch") -except Exception as e: - logger.exception("Failed to apply aioice StunProtocol.send_stun monkeypatch: %s", e) diff --git a/voicebot/webrtc_signaling.py b/voicebot/webrtc_signaling.py index 8024f75..65701fc 100644 --- a/voicebot/webrtc_signaling.py +++ b/voicebot/webrtc_signaling.py @@ -315,22 +315,39 @@ class WebRTCSignalingClient: if self.registration_check_task and not self.registration_check_task.done(): self.registration_check_task.cancel() try: - await self.registration_check_task + # Only await if we're in the same event loop + current_loop = asyncio.get_running_loop() + task_loop = self.registration_check_task.get_loop() + if current_loop == task_loop: + await self.registration_check_task + else: + logger.warning("Registration check task in different event loop, skipping await") except asyncio.CancelledError: pass + except Exception as e: + logger.warning(f"Error cancelling registration check task: {e}") self.registration_check_task = None if self.websocket: ws = cast(WebSocketProtocol, self.websocket) - await ws.close() + try: + await ws.close() + except Exception as e: + logger.warning(f"Error closing websocket: {e}") # Close all peer connections for pc in self.peer_connections.values(): - await pc.close() + try: + await pc.close() + except Exception as e: + logger.warning(f"Error closing peer connection: {e}") # Stop local tracks for track in self.local_tracks.values(): - track.stop() + try: + track.stop() + except Exception as e: + logger.warning(f"Error stopping track: {e}") # Reset registration status self.is_registered = False