Bots can now leave

This commit is contained in:
James Ketr 2025-09-03 16:05:51 -07:00
parent b5614b9d99
commit 8ef309d4f1
6 changed files with 206 additions and 127 deletions

View File

@ -1,11 +1,13 @@
import React, { useState, useEffect, useCallback } from "react"; import React, { useState, useEffect, useCallback } from "react";
import Paper from "@mui/material/Paper"; import Paper from "@mui/material/Paper";
import List from "@mui/material/List"; import List from "@mui/material/List";
import Button from "@mui/material/Button";
import "./UserList.css"; import "./UserList.css";
import { MediaControl, MediaAgent, Peer } from "./MediaControl"; import { MediaControl, MediaAgent, Peer } from "./MediaControl";
import Box from "@mui/material/Box"; import Box from "@mui/material/Box";
import { Session } from "./GlobalContext"; import { Session } from "./GlobalContext";
import useWebSocket from "react-use-websocket"; import useWebSocket from "react-use-websocket";
import { ApiClient, BotLeaveLobbyRequest } from "./api-client";
type User = { type User = {
name: string; name: string;
@ -13,6 +15,9 @@ type User = {
live: boolean; live: boolean;
local: boolean /* Client side variable */; local: boolean /* Client side variable */;
protected?: boolean; protected?: boolean;
is_bot?: boolean;
bot_run_id?: string;
bot_provider_id?: string;
}; };
type UserListProps = { type UserListProps = {
@ -25,6 +30,32 @@ const UserList: React.FC<UserListProps> = (props: UserListProps) => {
const [users, setUsers] = useState<User[] | null>(null); const [users, setUsers] = useState<User[] | null>(null);
const [peers, setPeers] = useState<Record<string, Peer>>({}); const [peers, setPeers] = useState<Record<string, Peer>>({});
const [videoClass, setVideoClass] = useState<string>("Large"); const [videoClass, setVideoClass] = useState<string>("Large");
const [leavingBots, setLeavingBots] = useState<Set<string>>(new Set());
const apiClient = new ApiClient();
const handleBotLeave = async (user: User) => {
if (!user.is_bot) return;
setLeavingBots((prev) => new Set(prev).add(user.session_id));
try {
const request: BotLeaveLobbyRequest = {
session_id: user.session_id,
};
await apiClient.requestBotLeaveLobby(request);
console.log(`Bot ${user.name} leave requested successfully`);
} catch (error) {
console.error("Failed to request bot leave:", error);
} finally {
setLeavingBots((prev) => {
const newSet = new Set(prev);
newSet.delete(user.session_id);
return newSet;
});
}
};
const sortUsers = useCallback( const sortUsers = useCallback(
(A: any, B: any) => { (A: any, B: any) => {
@ -105,6 +136,7 @@ const UserList: React.FC<UserListProps> = (props: UserListProps) => {
className={`UserEntry ${user.local ? "UserSelf" : ""}`} className={`UserEntry ${user.local ? "UserSelf" : ""}`}
> >
<div> <div>
<div style={{ display: "flex", alignItems: "center", justifyContent: "space-between" }}>
<div style={{ display: "flex", alignItems: "center" }}> <div style={{ display: "flex", alignItems: "center" }}>
<div className="Name">{user.name ? user.name : user.session_id}</div> <div className="Name">{user.name ? user.name : user.session_id}</div>
{user.protected && ( {user.protected && (
@ -115,6 +147,24 @@ const UserList: React.FC<UserListProps> = (props: UserListProps) => {
🔒 🔒
</div> </div>
)} )}
{user.is_bot && (
<div style={{ marginLeft: 8, fontSize: "0.8em", color: "#00a" }} title="This is a bot">
🤖
</div>
)}
</div>
{user.is_bot && !user.local && (
<Button
size="small"
variant="outlined"
color="secondary"
onClick={() => handleBotLeave(user)}
disabled={leavingBots.has(user.session_id)}
style={{ fontSize: "0.7em", minWidth: "50px", height: "24px" }}
>
{leavingBots.has(user.session_id) ? "..." : "Leave"}
</Button>
)}
</div> </div>
{user.name && !user.live && <div className="NoNetwork"></div>} {user.name && !user.live && <div className="NoNetwork"></div>}
</div> </div>

View File

@ -57,6 +57,16 @@ export interface BotJoinLobbyResponse {
provider_id: string; provider_id: string;
} }
export interface BotLeaveLobbyRequest {
session_id: string;
}
export interface BotLeaveLobbyResponse {
status: string;
session_id: string;
run_id?: string;
}
export class ApiError extends Error { export class ApiError extends Error {
constructor(public status: number, public statusText: string, public data?: any) { constructor(public status: number, public statusText: string, public data?: any) {
super(`HTTP ${status}: ${statusText}`); super(`HTTP ${status}: ${statusText}`);
@ -194,6 +204,13 @@ export class ApiClient {
); );
} }
async requestBotLeaveLobby(request: BotLeaveLobbyRequest): Promise<BotLeaveLobbyResponse> {
return this.request<BotLeaveLobbyResponse>(this.getApiPath("/ai-voicebot/api/bots/leave"), {
method: "POST",
body: request,
});
}
// Auto-generated endpoints will be added here by update-api-client.js // Auto-generated endpoints will be added here by update-api-client.js
// DO NOT MANUALLY EDIT BELOW THIS LINE // DO NOT MANUALLY EDIT BELOW THIS LINE

View File

@ -62,6 +62,8 @@ from shared.models import (
BotJoinLobbyRequest, BotJoinLobbyRequest,
BotJoinLobbyResponse, BotJoinLobbyResponse,
BotJoinPayload, BotJoinPayload,
BotLeaveLobbyRequest,
BotLeaveLobbyResponse,
) )
@ -327,6 +329,9 @@ class Lobby:
protected=True protected=True
if s.name and s.name.lower() in name_passwords if s.name and s.name.lower() in name_passwords
else False, else False,
is_bot=s.is_bot,
bot_run_id=s.bot_run_id,
bot_provider_id=s.bot_provider_id,
) )
for s in self.sessions.values() for s in self.sessions.values()
if s.name if s.name
@ -453,8 +458,8 @@ class Session:
_loaded = False _loaded = False
lock = threading.RLock() # Thread safety for class-level operations lock = threading.RLock() # Thread safety for class-level operations
def __init__(self, id: str): def __init__(self, id: str, is_bot: bool = False):
logger.info(f"Instantiating new session {id}") logger.info(f"Instantiating new session {id} (bot: {is_bot})")
with Session.lock: with Session.lock:
self._instances.append(self) self._instances.append(self)
self.id = id self.id = id
@ -468,6 +473,9 @@ class Session:
self.created_at = time.time() self.created_at = time.time()
self.last_used = time.time() self.last_used = time.time()
self.displaced_at: float | None = None # When name was taken over self.displaced_at: float | None = None # When name was taken over
self.is_bot = is_bot # Whether this session represents a bot
self.bot_run_id: str | None = None # Bot run ID for tracking
self.bot_provider_id: str | None = None # Bot provider ID
self.session_lock = threading.RLock() # Instance-level lock self.session_lock = threading.RLock() # Instance-level lock
self.save() self.save()
@ -492,6 +500,9 @@ class Session:
created_at=s.created_at, created_at=s.created_at,
last_used=s.last_used, last_used=s.last_used,
displaced_at=s.displaced_at, displaced_at=s.displaced_at,
is_bot=s.is_bot,
bot_run_id=s.bot_run_id,
bot_provider_id=s.bot_provider_id,
) )
) )
@ -573,12 +584,16 @@ class Session:
logger.info(f"Expiring session {s_saved.id[:8]}:{name} during load") logger.info(f"Expiring session {s_saved.id[:8]}:{name} during load")
continue # Skip loading this expired session continue # Skip loading this expired session
session = Session(s_saved.id) session = Session(s_saved.id, is_bot=getattr(s_saved, "is_bot", False))
session.name = name session.name = name
# Load timestamps, with defaults for backward compatibility # Load timestamps, with defaults for backward compatibility
session.created_at = created_at session.created_at = created_at
session.last_used = last_used session.last_used = last_used
session.displaced_at = displaced_at session.displaced_at = displaced_at
# Load bot information with defaults for backward compatibility
session.is_bot = getattr(s_saved, "is_bot", False)
session.bot_run_id = getattr(s_saved, "bot_run_id", None)
session.bot_provider_id = getattr(s_saved, "bot_provider_id", None)
for lobby_saved in s_saved.lobbies: for lobby_saved in s_saved.lobbies:
session.lobbies.append( session.lobbies.append(
Lobby( Lobby(
@ -1434,8 +1449,8 @@ async def request_bot_join_lobby(
bot_session_id = secrets.token_hex(16) bot_session_id = secrets.token_hex(16)
# Create the Session object for the bot # Create the Session object for the bot
bot_session = Session(bot_session_id) bot_session = Session(bot_session_id, is_bot=True)
logger.info(f"Created session for bot: {bot_session.getName()}") logger.info(f"Created bot session for: {bot_session.getName()}")
# Determine server URL for the bot to connect back to # Determine server URL for the bot to connect back to
# Use the server's public URL or construct from request # Use the server's public URL or construct from request
@ -1467,6 +1482,12 @@ async def request_bot_join_lobby(
result = response.json() result = response.json()
run_id = result.get("run_id", "unknown") run_id = result.get("run_id", "unknown")
# Update bot session with run and provider information
with bot_session.session_lock:
bot_session.bot_run_id = run_id
bot_session.bot_provider_id = target_provider_id
bot_session.setName(bot_nick)
logger.info( logger.info(
f"Bot {bot_name} requested to join lobby {request.lobby_id}" f"Bot {bot_name} requested to join lobby {request.lobby_id}"
) )
@ -1493,6 +1514,70 @@ async def request_bot_join_lobby(
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@app.post(public_url + "api/bots/leave", response_model=BotLeaveLobbyResponse)
async def request_bot_leave_lobby(
request: BotLeaveLobbyRequest,
) -> BotLeaveLobbyResponse:
"""Request a bot to leave from all lobbies and disconnect"""
# Find the bot session
bot_session = getSession(request.session_id)
if not bot_session:
raise HTTPException(status_code=404, detail="Bot session not found")
if not bot_session.is_bot:
raise HTTPException(status_code=400, detail="Session is not a bot")
run_id = bot_session.bot_run_id
provider_id = bot_session.bot_provider_id
logger.info(f"Requesting bot {bot_session.getName()} to leave all lobbies")
# Try to stop the bot at the provider level if we have the information
if provider_id and run_id and provider_id in bot_providers:
provider = bot_providers[provider_id]
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{provider.base_url}/bots/runs/{run_id}/stop",
timeout=5.0,
)
if response.status_code == 200:
logger.info(
f"Successfully requested bot provider to stop run {run_id}"
)
else:
logger.warning(
f"Bot provider returned error when stopping: HTTP {response.status_code}"
)
except Exception as e:
logger.warning(f"Failed to request bot stop from provider: {e}")
# Force disconnect the bot session from all lobbies
with bot_session.session_lock:
lobbies_to_part = bot_session.lobbies[:]
for lobby in lobbies_to_part:
try:
await bot_session.part(lobby)
except Exception as e:
logger.warning(f"Error parting bot from lobby {lobby.getName()}: {e}")
# Close WebSocket connection if it exists
if bot_session.ws:
try:
await bot_session.ws.close()
except Exception as e:
logger.warning(f"Error closing bot WebSocket: {e}")
bot_session.ws = None
return BotLeaveLobbyResponse(
status="disconnected",
session_id=request.session_id,
run_id=run_id,
)
# Register websocket endpoint directly on app with full public_url path # Register websocket endpoint directly on app with full public_url path
@app.websocket(f"{public_url}" + "ws/lobby/{lobby_id}/{session_id}") @app.websocket(f"{public_url}" + "ws/lobby/{lobby_id}/{session_id}")
async def lobby_join( async def lobby_join(

View File

@ -43,6 +43,9 @@ class ParticipantModel(BaseModel):
session_id: str session_id: str
live: bool live: bool
protected: bool protected: bool
is_bot: bool = False
bot_run_id: Optional[str] = None
bot_provider_id: Optional[str] = None
# ============================================================================= # =============================================================================
@ -312,6 +315,9 @@ class SessionSaved(BaseModel):
created_at: float = 0.0 created_at: float = 0.0
last_used: float = 0.0 last_used: float = 0.0
displaced_at: Optional[float] = None # When name was taken over displaced_at: Optional[float] = None # When name was taken over
is_bot: bool = False # Whether this session represents a bot
bot_run_id: Optional[str] = None # Bot run ID for tracking
bot_provider_id: Optional[str] = None # Bot provider ID
class SessionsPayload(BaseModel): class SessionsPayload(BaseModel):
@ -397,3 +403,17 @@ class BotJoinLobbyResponse(BaseModel):
bot_name: str bot_name: str
run_id: str run_id: str
provider_id: str provider_id: str
class BotLeaveLobbyRequest(BaseModel):
"""Request to make a bot leave a lobby"""
session_id: str # The session ID of the bot to remove
class BotLeaveLobbyResponse(BaseModel):
"""Response after requesting a bot to leave a lobby"""
status: str
session_id: str
run_id: Optional[str] = None

View File

@ -1,110 +0,0 @@
import time
from logger import logger
# Defensive monkeypatch: aioice Transaction.__retry may run after the
# underlying datagram transport or loop was torn down which results in
# AttributeError being raised and flooding logs. Wrap the original
# implementation to catch and suppress AttributeError while preserving
# other exceptions. This is a temporary mitigation to keep logs readable
# while we investigate/upstream a proper fix or upgrade aioice.
try:
import aioice.stun as _aioice_stun # type: ignore
# The method is defined with a double-underscore name (__retry) which
# gets name-mangled. Detect the actual attribute name robustly.
retry_attr_name = None
for name in dir(_aioice_stun.Transaction):
if name.endswith("retry"):
obj = getattr(_aioice_stun.Transaction, name)
if callable(obj):
retry_attr_name = name
_orig_retry = obj
break
if retry_attr_name is not None:
# Simple in-process dedupe cache so we only log the same AttributeError
# once per interval. This prevents flooding the logs when many
# transactions race to run after shutdown.
_MONKEYPATCH_LOG_CACHE: dict[str, float] = {}
_MONKEYPATCH_LOG_SUPPRESSION_INTERVAL = 5.0
def _should_log_once(key: str) -> bool:
now = time.time()
last = _MONKEYPATCH_LOG_CACHE.get(key)
if last is None or (now - last) > _MONKEYPATCH_LOG_SUPPRESSION_INTERVAL:
_MONKEYPATCH_LOG_CACHE[key] = now
return True
return False
def _safe_transaction_retry(self, *args, **kwargs): # type: ignore
try:
return _orig_retry(self, *args, **kwargs) # type: ignore
except AttributeError as e: # type: ignore
# Transport or event-loop already closed; log once per key
key = f"Transaction.{retry_attr_name}:{e}"
if _should_log_once(key):
logger.warning(
"aioice Transaction.%s AttributeError suppressed: %s",
retry_attr_name,
e,
)
except Exception: # type: ignore
# Preserve visibility for other unexpected exceptions
logger.exception(
"aioice Transaction.%s raised an unexpected exception",
retry_attr_name,
)
setattr(_aioice_stun.Transaction, retry_attr_name, _safe_transaction_retry) # type: ignore
logger.info("Applied safe aioice Transaction.%s monkeypatch", retry_attr_name)
else:
logger.warning("aioice Transaction.__retry not found; skipping monkeypatch")
except Exception as e:
logger.exception("Failed to apply aioice Transaction.__retry monkeypatch: %s", e)
# Additional defensive patch: wrap the protocol-level send_stun implementation
# (e.g. StunProtocol.send_stun) which ultimately calls the datagram transport's
# sendto. If the transport or its loop is already torn down, sendto can raise
# AttributeError which then triggers asyncio's fatal error path (calling a None
# loop). Wrapping here prevents the flood of selector_events/_fatal_error
# AttributeError traces.
try:
import aioice.ice as _aioice_ice # type: ignore
# Prefer to patch StunProtocol.send_stun which is used by the ICE code.
send_attr_name = None
if hasattr(_aioice_ice, "StunProtocol"):
proto_cls = getattr(_aioice_ice, "StunProtocol")
for name in dir(proto_cls):
if name.endswith("send_stun"):
attr = getattr(proto_cls, name)
if callable(attr):
send_attr_name = name
_orig_send_stun = attr
break
if send_attr_name is not None:
def _safe_send_stun(self, message, addr): # type: ignore
try:
return _orig_send_stun(self, message, addr) # type: ignore
except AttributeError as e: # type: ignore
# Likely transport._sock or transport._loop is None; log once
key = f"StunProtocol.{send_attr_name}:{e}"
if _should_log_once(key):
logger.warning(
"aioice StunProtocol.%s AttributeError suppressed: %s",
send_attr_name,
e,
)
except Exception: # type: ignore
logger.exception(
"aioice StunProtocol.%s raised unexpected exception", send_attr_name
)
setattr(proto_cls, send_attr_name, _safe_send_stun) # type: ignore
logger.info("Applied safe aioice StunProtocol.%s monkeypatch", send_attr_name)
else:
logger.warning("aioice StunProtocol.send_stun not found; skipping monkeypatch")
except Exception as e:
logger.exception("Failed to apply aioice StunProtocol.send_stun monkeypatch: %s", e)

View File

@ -315,22 +315,39 @@ class WebRTCSignalingClient:
if self.registration_check_task and not self.registration_check_task.done(): if self.registration_check_task and not self.registration_check_task.done():
self.registration_check_task.cancel() self.registration_check_task.cancel()
try: try:
# Only await if we're in the same event loop
current_loop = asyncio.get_running_loop()
task_loop = self.registration_check_task.get_loop()
if current_loop == task_loop:
await self.registration_check_task await self.registration_check_task
else:
logger.warning("Registration check task in different event loop, skipping await")
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
except Exception as e:
logger.warning(f"Error cancelling registration check task: {e}")
self.registration_check_task = None self.registration_check_task = None
if self.websocket: if self.websocket:
ws = cast(WebSocketProtocol, self.websocket) ws = cast(WebSocketProtocol, self.websocket)
try:
await ws.close() await ws.close()
except Exception as e:
logger.warning(f"Error closing websocket: {e}")
# Close all peer connections # Close all peer connections
for pc in self.peer_connections.values(): for pc in self.peer_connections.values():
try:
await pc.close() await pc.close()
except Exception as e:
logger.warning(f"Error closing peer connection: {e}")
# Stop local tracks # Stop local tracks
for track in self.local_tracks.values(): for track in self.local_tracks.values():
try:
track.stop() track.stop()
except Exception as e:
logger.warning(f"Error stopping track: {e}")
# Reset registration status # Reset registration status
self.is_registered = False self.is_registered = False