Moved aioice monkeypatch into a separate file for debug usage

This commit is contained in:
James Ketr 2025-09-01 15:06:17 -07:00
parent fd58ee2a91
commit 450357db79
5 changed files with 224 additions and 144 deletions

View File

@ -1,4 +1,4 @@
import React, { useState, useEffect, KeyboardEvent } from "react"; import React, { useState, useEffect, KeyboardEvent, useCallback } from "react";
import { Input, Paper, Typography } from "@mui/material"; import { Input, Paper, Typography } from "@mui/material";
import { Session } from "./GlobalContext"; import { Session } from "./GlobalContext";
@ -8,6 +8,7 @@ import { ws_base, base } from "./Common";
import { Box, Button, Tooltip } from "@mui/material"; import { Box, Button, Tooltip } from "@mui/material";
import { BrowserRouter as Router, Route, Routes, useParams } from "react-router-dom"; import { BrowserRouter as Router, Route, Routes, useParams } from "react-router-dom";
import useWebSocket, { ReadyState } from "react-use-websocket"; import useWebSocket, { ReadyState } from "react-use-websocket";
import ConnectionStatus from "./ConnectionStatus";
console.log(`AI Voice Chat Build: ${process.env.REACT_APP_AI_VOICECHAT_BUILD}`); console.log(`AI Voice Chat Build: ${process.env.REACT_APP_AI_VOICECHAT_BUILD}`);
@ -31,17 +32,29 @@ const LobbyView: React.FC<LobbyProps> = (props: LobbyProps) => {
const [editPassword, setEditPassword] = useState<string>(""); const [editPassword, setEditPassword] = useState<string>("");
const [socketUrl, setSocketUrl] = useState<string | null>(null); const [socketUrl, setSocketUrl] = useState<string | null>(null);
const [creatingLobby, setCreatingLobby] = useState<boolean>(false); const [creatingLobby, setCreatingLobby] = useState<boolean>(false);
const [reconnectAttempt, setReconnectAttempt] = useState<number>(0);
const socket = useWebSocket(socketUrl, { const {
onOpen: () => console.log("app - WebSocket connection opened."), sendJsonMessage,
onClose: () => console.log("app - WebSocket connection closed."), lastJsonMessage,
onError: (event) => console.error("app - WebSocket error observed:", event), readyState
// onMessage: (event) => console.log("WebSocket message received:"), } = useWebSocket(socketUrl, {
// shouldReconnect: (closeEvent) => true, // Will attempt to reconnect on all close events. onOpen: () => {
reconnectInterval: 3000, console.log("app - WebSocket connection opened.");
setReconnectAttempt(0);
},
onClose: () => {
console.log("app - WebSocket connection closed.");
setReconnectAttempt(prev => prev + 1);
},
onError: (event: Event) => console.error("app - WebSocket error observed:", event),
shouldReconnect: (closeEvent) => true, // Will attempt to reconnect on all close events
reconnectInterval: 5000, // Retry every 5 seconds
onReconnectStop: (numAttempts) => {
console.log(`Stopped reconnecting after ${numAttempts} attempts`);
},
share: true, share: true,
}); });
const { sendJsonMessage, lastJsonMessage, readyState } = socket;
useEffect(() => { useEffect(() => {
if (lobby && session) { if (lobby && session) {
@ -145,7 +158,10 @@ const LobbyView: React.FC<LobbyProps> = (props: LobbyProps) => {
return ( return (
<Paper className="Lobby" sx={{ p: 2, m: 2, width: "fit-content" }}> <Paper className="Lobby" sx={{ p: 2, m: 2, width: "fit-content" }}>
{readyState !== ReadyState.OPEN || !session ? ( {readyState !== ReadyState.OPEN || !session ? (
<h2>Connecting to server...</h2> <ConnectionStatus
readyState={readyState}
reconnectAttempt={reconnectAttempt}
/>
) : ( ) : (
<> <>
<Box sx={{ mb: 2, display: "flex", gap: 2, alignItems: "flex-start", flexDirection: "column" }}> <Box sx={{ mb: 2, display: "flex", gap: 2, alignItems: "flex-start", flexDirection: "column" }}>
@ -219,6 +235,7 @@ const LobbyView: React.FC<LobbyProps> = (props: LobbyProps) => {
const App = () => { const App = () => {
const [session, setSession] = useState<Session | null>(null); const [session, setSession] = useState<Session | null>(null);
const [error, setError] = useState<string | null>(null); const [error, setError] = useState<string | null>(null);
const [sessionRetryAttempt, setSessionRetryAttempt] = useState<number>(0);
useEffect(() => { useEffect(() => {
if (error) { if (error) {
@ -233,11 +250,8 @@ const App = () => {
console.log(`App - sessionId`, session.id); console.log(`App - sessionId`, session.id);
}, [session]); }, [session]);
useEffect(() => { const getSession = useCallback(async () => {
if (session) { try {
return;
}
const getSession = async () => {
const res = await fetch(`${base}/api/session`, { const res = await fetch(`${base}/api/session`, {
method: "GET", method: "GET",
cache: "no-cache", cache: "no-cache",
@ -248,26 +262,44 @@ const App = () => {
}); });
if (res.status >= 400) { if (res.status >= 400) {
const error = `Unable to connect to AI Voice Chat server! Try refreshing your browser in a few seconds.`; throw new Error(`HTTP ${res.status}: Unable to connect to AI Voice Chat server`);
console.error(error);
setError(error);
return;
} }
const data = await res.json(); const data = await res.json();
if (data.error) { if (data.error) {
console.error(`App - Server error: ${data.error}`); throw new Error(`Server error: ${data.error}`);
setError(data.error);
return;
} }
setSession(data); setSession(data);
}; setSessionRetryAttempt(0);
} catch (err) {
const errorMessage = err instanceof Error ? err.message : 'Unknown error occurred';
console.error('Failed to get session:', errorMessage);
setError(errorMessage);
// Schedule retry after 5 seconds
setSessionRetryAttempt(prev => prev + 1);
setTimeout(() => {
getSession(); // Retry
}, 5000);
}
}, []);
useEffect(() => {
if (session) {
return;
}
getSession(); getSession();
}, [session, setSession]); }, [session, getSession]);
return ( return (
<Box> <Box>
{!session && <h2>Connecting to server...</h2>} {!session && (
<ConnectionStatus
readyState={sessionRetryAttempt > 0 ? ReadyState.CLOSED : ReadyState.CONNECTING}
reconnectAttempt={sessionRetryAttempt}
/>
)}
{session && ( {session && (
<Router> <Router>
<Routes> <Routes>

View File

@ -11,6 +11,7 @@ import Videocam from "@mui/icons-material/Videocam";
import Box from "@mui/material/Box"; import Box from "@mui/material/Box";
import useWebSocket, { ReadyState } from "react-use-websocket"; import useWebSocket, { ReadyState } from "react-use-websocket";
import { Session } from "./GlobalContext"; import { Session } from "./GlobalContext";
import WebRTCStatus from "./WebRTCStatus";
const debug = true; const debug = true;
// When true, do not send host candidates to the signaling server. Keeps TURN relays preferred. // When true, do not send host candidates to the signaling server. Keeps TURN relays preferred.
@ -131,6 +132,8 @@ interface Peer {
local: boolean; local: boolean;
dead: boolean; dead: boolean;
connection?: RTCPeerConnection; connection?: RTCPeerConnection;
connectionState?: string;
isNegotiating?: boolean;
} }
export type { Peer }; export type { Peer };
@ -328,8 +331,25 @@ const MediaAgent = (props: MediaAgentProps) => {
const initiatedOfferRef = useRef<Set<string>>(new Set()); const initiatedOfferRef = useRef<Set<string>>(new Set());
const pendingIceCandidatesRef = useRef<Map<string, RTCIceCandidateInit[]>>(new Map()); const pendingIceCandidatesRef = useRef<Map<string, RTCIceCandidateInit[]>>(new Map());
// Update peer states when connection state changes
const updatePeerConnectionState = useCallback((peerId: string, connectionState: string, isNegotiating: boolean = false) => {
setPeers(prevPeers => {
const updatedPeers = { ...prevPeers };
if (updatedPeers[peerId]) {
updatedPeers[peerId] = {
...updatedPeers[peerId],
connectionState,
isNegotiating
};
}
return updatedPeers;
});
}, [setPeers]);
const { sendJsonMessage, lastJsonMessage, readyState } = useWebSocket(socketUrl, { const { sendJsonMessage, lastJsonMessage, readyState } = useWebSocket(socketUrl, {
share: true, share: true,
shouldReconnect: (closeEvent) => true, // Auto-reconnect on connection loss
reconnectInterval: 5000,
onError: (err) => { onError: (err) => {
console.error(err); console.error(err);
}, },
@ -475,6 +495,10 @@ const MediaAgent = (props: MediaAgentProps) => {
} }
console.log(`media-agent - addPeer:${peer.peer_name} Handling negotiationneeded for ${peer.peer_name}`); console.log(`media-agent - addPeer:${peer.peer_name} Handling negotiationneeded for ${peer.peer_name}`);
// Mark as negotiating
isNegotiatingRef.current.set(peer_id, true);
updatePeerConnectionState(peer_id, connection.connectionState, true);
try { try {
makingOfferRef.current.set(peer_id, true); makingOfferRef.current.set(peer_id, true);
@ -517,6 +541,10 @@ const MediaAgent = (props: MediaAgentProps) => {
connection.connectionState, connection.connectionState,
event event
); );
// Update peer connection state
updatePeerConnectionState(peer_id, connection.connectionState);
if (connection.connectionState === "failed") { if (connection.connectionState === "failed") {
setTimeout(() => { setTimeout(() => {
if (connection.connectionState === "failed") { if (connection.connectionState === "failed") {
@ -681,6 +709,7 @@ const MediaAgent = (props: MediaAgentProps) => {
} catch (err) { } catch (err) {
console.error(`media-agent - addPeer:${peer.peer_name} Failed to create/send offer:`, err); console.error(`media-agent - addPeer:${peer.peer_name} Failed to create/send offer:`, err);
isNegotiatingRef.current.set(peer_id, false); isNegotiatingRef.current.set(peer_id, false);
updatePeerConnectionState(peer_id, connection.connectionState, false);
} finally { } finally {
// Clear the makingOffer flag after we're done // Clear the makingOffer flag after we're done
makingOfferRef.current.set(peer_id, false); makingOfferRef.current.set(peer_id, false);
@ -737,6 +766,7 @@ const MediaAgent = (props: MediaAgentProps) => {
try { try {
await pc.setRemoteDescription(desc); await pc.setRemoteDescription(desc);
isNegotiatingRef.current.set(peer_id, false); // Negotiation complete isNegotiatingRef.current.set(peer_id, false); // Negotiation complete
updatePeerConnectionState(peer_id, pc.connectionState, false);
console.log(`media-agent - sessionDescription:${peer_name} - Remote description set`); console.log(`media-agent - sessionDescription:${peer_name} - Remote description set`);
// Process any queued ICE candidates // Process any queued ICE candidates
@ -1222,18 +1252,30 @@ const MediaControl: React.FC<MediaControlProps> = ({ isSelf, peer, className })
</div> </div>
{isValid ? ( {isValid ? (
peer.attributes?.srcObject && ( peer.attributes?.srcObject && (
<Video <Box sx={{ position: 'relative' }}>
key={`video-${peer.session_id}-${peer.attributes.srcObject.id}`} <Video
className="Video" key={`video-${peer.session_id}-${peer.attributes.srcObject.id}`}
data-id={peer.peer_name} className="Video"
autoPlay data-id={peer.peer_name}
srcObject={peer.attributes.srcObject} autoPlay
local={peer.local} srcObject={peer.attributes.srcObject}
muted={peer.local || muted} // Pass muted state local={peer.local}
/> muted={peer.local || muted} // Pass muted state
/>
<WebRTCStatus
isNegotiating={peer.isNegotiating || false}
connectionState={peer.connectionState}
/>
</Box>
) )
) : ( ) : (
<div className="placeholder">Waiting for media</div> <Box sx={{ position: 'relative' }}>
<div className="placeholder">Waiting for media</div>
<WebRTCStatus
isNegotiating={peer.isNegotiating || false}
connectionState={peer.connectionState}
/>
</Box>
)} )}
{/* <Moveable {/* <Moveable
flushSync={flushSync} flushSync={flushSync}

View File

@ -54,10 +54,12 @@ const UserList: React.FC<UserListProps> = (props: UserListProps) => {
[session] [session]
); );
// Use the WebSocket hook for lobby events (rely on ws from context, but can use hook for message handling) // Use the WebSocket hook for lobby events with automatic reconnection
const { sendJsonMessage } = useWebSocket(socketUrl, { const { sendJsonMessage } = useWebSocket(socketUrl, {
share: true, share: true,
onMessage: (event) => { shouldReconnect: (closeEvent) => true, // Auto-reconnect on connection loss
reconnectInterval: 5000,
onMessage: (event: MessageEvent) => {
if (!session) { if (!session) {
return; return;
} }

110
voicebot/debug_aioice.py Normal file
View File

@ -0,0 +1,110 @@
import time
from logger import logger
# Defensive monkeypatch: aioice Transaction.__retry may run after the
# underlying datagram transport or loop was torn down which results in
# AttributeError being raised and flooding logs. Wrap the original
# implementation to catch and suppress AttributeError while preserving
# other exceptions. This is a temporary mitigation to keep logs readable
# while we investigate/upstream a proper fix or upgrade aioice.
try:
import aioice.stun as _aioice_stun # type: ignore
# The method is defined with a double-underscore name (__retry) which
# gets name-mangled. Detect the actual attribute name robustly.
retry_attr_name = None
for name in dir(_aioice_stun.Transaction):
if name.endswith("retry"):
obj = getattr(_aioice_stun.Transaction, name)
if callable(obj):
retry_attr_name = name
_orig_retry = obj
break
if retry_attr_name is not None:
# Simple in-process dedupe cache so we only log the same AttributeError
# once per interval. This prevents flooding the logs when many
# transactions race to run after shutdown.
_MONKEYPATCH_LOG_CACHE: dict[str, float] = {}
_MONKEYPATCH_LOG_SUPPRESSION_INTERVAL = 5.0
def _should_log_once(key: str) -> bool:
now = time.time()
last = _MONKEYPATCH_LOG_CACHE.get(key)
if last is None or (now - last) > _MONKEYPATCH_LOG_SUPPRESSION_INTERVAL:
_MONKEYPATCH_LOG_CACHE[key] = now
return True
return False
def _safe_transaction_retry(self, *args, **kwargs): # type: ignore
try:
return _orig_retry(self, *args, **kwargs) # type: ignore
except AttributeError as e: # type: ignore
# Transport or event-loop already closed; log once per key
key = f"Transaction.{retry_attr_name}:{e}"
if _should_log_once(key):
logger.warning(
"aioice Transaction.%s AttributeError suppressed: %s",
retry_attr_name,
e,
)
except Exception: # type: ignore
# Preserve visibility for other unexpected exceptions
logger.exception(
"aioice Transaction.%s raised an unexpected exception",
retry_attr_name,
)
setattr(_aioice_stun.Transaction, retry_attr_name, _safe_transaction_retry) # type: ignore
logger.info("Applied safe aioice Transaction.%s monkeypatch", retry_attr_name)
else:
logger.warning("aioice Transaction.__retry not found; skipping monkeypatch")
except Exception as e:
logger.exception("Failed to apply aioice Transaction.__retry monkeypatch: %s", e)
# Additional defensive patch: wrap the protocol-level send_stun implementation
# (e.g. StunProtocol.send_stun) which ultimately calls the datagram transport's
# sendto. If the transport or its loop is already torn down, sendto can raise
# AttributeError which then triggers asyncio's fatal error path (calling a None
# loop). Wrapping here prevents the flood of selector_events/_fatal_error
# AttributeError traces.
try:
import aioice.ice as _aioice_ice # type: ignore
# Prefer to patch StunProtocol.send_stun which is used by the ICE code.
send_attr_name = None
if hasattr(_aioice_ice, "StunProtocol"):
proto_cls = getattr(_aioice_ice, "StunProtocol")
for name in dir(proto_cls):
if name.endswith("send_stun"):
attr = getattr(proto_cls, name)
if callable(attr):
send_attr_name = name
_orig_send_stun = attr
break
if send_attr_name is not None:
def _safe_send_stun(self, message, addr): # type: ignore
try:
return _orig_send_stun(self, message, addr) # type: ignore
except AttributeError as e: # type: ignore
# Likely transport._sock or transport._loop is None; log once
key = f"StunProtocol.{send_attr_name}:{e}"
if _should_log_once(key):
logger.warning(
"aioice StunProtocol.%s AttributeError suppressed: %s",
send_attr_name,
e,
)
except Exception: # type: ignore
logger.exception(
"aioice StunProtocol.%s raised unexpected exception", send_attr_name
)
setattr(proto_cls, send_attr_name, _safe_send_stun) # type: ignore
logger.info("Applied safe aioice StunProtocol.%s monkeypatch", send_attr_name)
else:
logger.warning("aioice StunProtocol.send_stun not found; skipping monkeypatch")
except Exception as e:
logger.exception("Failed to apply aioice StunProtocol.send_stun monkeypatch: %s", e)

View File

@ -61,113 +61,7 @@ from av import VideoFrame, AudioFrame
import time import time
from logger import logger from logger import logger
# Defensive monkeypatch: aioice Transaction.__retry may run after the # import debug_aioice
# underlying datagram transport or loop was torn down which results in
# AttributeError being raised and flooding logs. Wrap the original
# implementation to catch and suppress AttributeError while preserving
# other exceptions. This is a temporary mitigation to keep logs readable
# while we investigate/upstream a proper fix or upgrade aioice.
try:
import aioice.stun as _aioice_stun # type: ignore
# The method is defined with a double-underscore name (__retry) which
# gets name-mangled. Detect the actual attribute name robustly.
retry_attr_name = None
for name in dir(_aioice_stun.Transaction):
if name.endswith("retry"):
obj = getattr(_aioice_stun.Transaction, name)
if callable(obj):
retry_attr_name = name
_orig_retry = obj
break
if retry_attr_name is not None:
# Simple in-process dedupe cache so we only log the same AttributeError
# once per interval. This prevents flooding the logs when many
# transactions race to run after shutdown.
_MONKEYPATCH_LOG_CACHE: dict[str, float] = {}
_MONKEYPATCH_LOG_SUPPRESSION_INTERVAL = 5.0
def _should_log_once(key: str) -> bool:
now = time.time()
last = _MONKEYPATCH_LOG_CACHE.get(key)
if last is None or (now - last) > _MONKEYPATCH_LOG_SUPPRESSION_INTERVAL:
_MONKEYPATCH_LOG_CACHE[key] = now
return True
return False
def _safe_transaction_retry(self, *args, **kwargs): # type: ignore
try:
return _orig_retry(self, *args, **kwargs) # type: ignore
except AttributeError as e: # type: ignore
# Transport or event-loop already closed; log once per key
key = f"Transaction.{retry_attr_name}:{e}"
if _should_log_once(key):
logger.warning(
"aioice Transaction.%s AttributeError suppressed: %s",
retry_attr_name,
e,
)
except Exception: # type: ignore
# Preserve visibility for other unexpected exceptions
logger.exception(
"aioice Transaction.%s raised an unexpected exception",
retry_attr_name,
)
setattr(_aioice_stun.Transaction, retry_attr_name, _safe_transaction_retry) # type: ignore
logger.info("Applied safe aioice Transaction.%s monkeypatch", retry_attr_name)
else:
logger.warning("aioice Transaction.__retry not found; skipping monkeypatch")
except Exception as e:
logger.exception("Failed to apply aioice Transaction.__retry monkeypatch: %s", e)
# Additional defensive patch: wrap the protocol-level send_stun implementation
# (e.g. StunProtocol.send_stun) which ultimately calls the datagram transport's
# sendto. If the transport or its loop is already torn down, sendto can raise
# AttributeError which then triggers asyncio's fatal error path (calling a None
# loop). Wrapping here prevents the flood of selector_events/_fatal_error
# AttributeError traces.
try:
import aioice.ice as _aioice_ice # type: ignore
# Prefer to patch StunProtocol.send_stun which is used by the ICE code.
send_attr_name = None
if hasattr(_aioice_ice, "StunProtocol"):
proto_cls = getattr(_aioice_ice, "StunProtocol")
for name in dir(proto_cls):
if name.endswith("send_stun"):
attr = getattr(proto_cls, name)
if callable(attr):
send_attr_name = name
_orig_send_stun = attr
break
if send_attr_name is not None:
def _safe_send_stun(self, message, addr): # type: ignore
try:
return _orig_send_stun(self, message, addr) # type: ignore
except AttributeError as e: # type: ignore
# Likely transport._sock or transport._loop is None; log once
key = f"StunProtocol.{send_attr_name}:{e}"
if _should_log_once(key):
logger.warning(
"aioice StunProtocol.%s AttributeError suppressed: %s",
send_attr_name,
e,
)
except Exception: # type: ignore
logger.exception(
"aioice StunProtocol.%s raised unexpected exception", send_attr_name
)
setattr(proto_cls, send_attr_name, _safe_send_stun) # type: ignore
logger.info("Applied safe aioice StunProtocol.%s monkeypatch", send_attr_name)
else:
logger.warning("aioice StunProtocol.send_stun not found; skipping monkeypatch")
except Exception as e:
logger.exception("Failed to apply aioice StunProtocol.send_stun monkeypatch: %s", e)
# TypedDict for ICE candidate payloads received from signalling # TypedDict for ICE candidate payloads received from signalling
class ICECandidateDict(TypedDict, total=False): class ICECandidateDict(TypedDict, total=False):