diff --git a/client/src/MediaControl.tsx b/client/src/MediaControl.tsx index 038befe..5e52c74 100644 --- a/client/src/MediaControl.tsx +++ b/client/src/MediaControl.tsx @@ -13,7 +13,7 @@ import WebRTCStatus from "./WebRTCStatus"; const debug = true; // When true, do not send host candidates to the signaling server. Keeps TURN relays preferred. -const FILTER_HOST_CANDIDATES = true; +const FILTER_HOST_CANDIDATES = false; // Temporarily disabled to test direct connections /* ---------- Synthetic Tracks Helpers ---------- */ @@ -319,6 +319,9 @@ const MediaAgent = (props: MediaAgentProps) => { const [media, setMedia] = useState(null); const [pendingPeers, setPendingPeers] = useState([]); + // Extract has_media value to avoid unnecessary effect re-runs when session object changes + const localUserHasMedia = session?.has_media !== false; // Default to true for backward compatibility + // Refs for singleton resources const mediaStreamRef = useRef(null); const connectionsRef = useRef>(new Map()); @@ -454,7 +457,7 @@ const MediaAgent = (props: MediaAgentProps) => { // Create RTCPeerConnection const connection = new RTCPeerConnection({ - iceTransportPolicy: "relay", + iceTransportPolicy: "all", // Allow both direct and relay connections iceServers: [ { urls: "stun:ketrenos.com:3478" }, { @@ -462,6 +465,9 @@ const MediaAgent = (props: MediaAgentProps) => { username: "ketra", credential: "ketran", }, + // DO NOT add Google's public STUN server as fallback; if ketrenos.com STUN/TURN fails, + // this is an infrastructure failure that must be resolved (not silently bypassed). + // { urls: "stun:stun.l.google.com:19302" }, ], }); @@ -552,13 +558,42 @@ const MediaAgent = (props: MediaAgentProps) => { updatePeerConnectionState(peer_id, connection.connectionState); if (connection.connectionState === "failed") { + console.error(`media-agent - addPeer:${peer.peer_name} Connection failed for`, peer.peer_name); + + // Immediate cleanup of failed connection + connectionsRef.current.delete(peer_id); + makingOfferRef.current.delete(peer_id); + isNegotiatingRef.current.delete(peer_id); + initiatedOfferRef.current.delete(peer_id); + + // Clean up the peer from state + setPeers((prevPeers) => { + const updated = { ...prevPeers }; + delete updated[peer_id]; + return updated; + }); + + // Close the connection + try { + connection.close(); + } catch (e) { + console.warn(`media-agent - Error closing failed connection:`, e); + } + } else if (connection.connectionState === "disconnected") { + console.warn(`media-agent - addPeer:${peer.peer_name} Connection disconnected for`, peer.peer_name, "- may recover"); + + // Set a timeout for disconnected state recovery setTimeout(() => { - if (connection.connectionState === "failed") { - console.log(`media-agent - addPeer:${peer.peer_name} Cleaning up failed connection for`, peer.peer_name); + if (connection.connectionState === "disconnected" || connection.connectionState === "failed") { + console.log(`media-agent - addPeer:${peer.peer_name} Connection did not recover, cleaning up`); connectionsRef.current.delete(peer_id); - // You might want to trigger a reconnection attempt here } - }, 5000); + }, 10000); // Give 10 seconds for recovery + } else if (connection.connectionState === "connected") { + console.log(`media-agent - addPeer:${peer.peer_name} Connection established successfully`); + // Clear any negotiation flags on successful connection + isNegotiatingRef.current.set(peer_id, false); + makingOfferRef.current.set(peer_id, false); } }); @@ -677,8 +712,36 @@ const MediaAgent = (props: MediaAgentProps) => { connection.iceConnectionState, event ); + if (connection.iceConnectionState === "failed") { - console.log("media-agent - ICE connection failed for", peer.peer_name); + console.error(`media-agent - ICE connection failed for ${peer.peer_name}`); + // Log ICE candidate gathering stats for debugging + connection + .getStats() + .then((stats) => { + const candidateStats: any[] = []; + stats.forEach((report) => { + if (report.type === "local-candidate" || report.type === "remote-candidate") { + candidateStats.push({ + type: report.type, + candidateType: (report as any).candidateType, + protocol: (report as any).protocol, + address: (report as any).address, + port: (report as any).port, + }); + } + }); + console.error(`media-agent - ICE candidates for failed connection to ${peer.peer_name}:`, candidateStats); + }) + .catch((e) => console.log("Could not get stats:", e)); + } else if (connection.iceConnectionState === "disconnected") { + console.warn(`media-agent - ICE connection disconnected for ${peer.peer_name}, may recover`); + } else if (connection.iceConnectionState === "connected") { + console.log(`media-agent - ICE connection established successfully for ${peer.peer_name}`); + } else if (connection.iceConnectionState === "checking") { + console.log(`media-agent - ICE connection checking for ${peer.peer_name}`); + } else if (connection.iceConnectionState === "completed") { + console.log(`media-agent - ICE connection completed for ${peer.peer_name}`); } }; @@ -746,7 +809,7 @@ const MediaAgent = (props: MediaAgentProps) => { } } }, - [peers, setPeers, media, sendJsonMessage, session?.has_media, updatePeerConnectionState] + [peers, setPeers, media, sendJsonMessage, localUserHasMedia, updatePeerConnectionState] ); // Process queued peers when media becomes available @@ -1174,8 +1237,6 @@ const MediaAgent = (props: MediaAgentProps) => { useEffect(() => { mountedRef.current = true; - const localUserHasMedia = session?.has_media !== false; // Default to true for backward compatibility - if (mediaStreamRef.current || readyState !== ReadyState.OPEN) return; // Capture the connections at effect setup time @@ -1222,7 +1283,7 @@ const MediaAgent = (props: MediaAgentProps) => { connectionsToCleanup.forEach((connection) => connection.close()); connectionsToCleanup.clear(); }; - }, [readyState, setup_local_media, session]); + }, [readyState, setup_local_media, localUserHasMedia]); return null; }; diff --git a/server/api/admin.py b/server/api/admin.py index 12b586b..e37c36b 100644 --- a/server/api/admin.py +++ b/server/api/admin.py @@ -19,7 +19,7 @@ from shared.models import ( AdminMetricsResponse, AdminMetricsConfig, ) -from logger import logger +from shared.logger import logger if TYPE_CHECKING: from core.session_manager import SessionManager, SessionConfig diff --git a/server/api/bot_config.py b/server/api/bot_config.py index f2ec61b..bf625cd 100644 --- a/server/api/bot_config.py +++ b/server/api/bot_config.py @@ -9,7 +9,7 @@ from typing import Dict, Any from fastapi import APIRouter, HTTPException, BackgroundTasks, WebSocket from core.bot_manager import BotManager -from logger import logger +from shared.logger import logger from core.bot_config_manager import BotConfigManager # Import WebSocket handler base class diff --git a/server/api/lobbies.py b/server/api/lobbies.py index 4adf128..1aef6f5 100644 --- a/server/api/lobbies.py +++ b/server/api/lobbies.py @@ -20,7 +20,7 @@ from shared.models import ( ChatMessagesResponse, ) -from logger import logger +from shared.logger import logger if TYPE_CHECKING: from ..core.session_manager import SessionManager diff --git a/server/api/monitoring.py b/server/api/monitoring.py index 698f64c..ca4e6be 100644 --- a/server/api/monitoring.py +++ b/server/api/monitoring.py @@ -18,7 +18,7 @@ from typing import Optional from fastapi import APIRouter, HTTPException, Query from datetime import datetime -from logger import logger +from shared.logger import logger # Import monitoring components try: diff --git a/server/api/sessions.py b/server/api/sessions.py index 6d9e073..01bc969 100644 --- a/server/api/sessions.py +++ b/server/api/sessions.py @@ -14,7 +14,7 @@ import os sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))) from shared.models import SessionResponse, HealthResponse, LobbyModel -from logger import logger +from shared.logger import logger if TYPE_CHECKING: from ..core.session_manager import SessionManager diff --git a/server/core/auth_manager.py b/server/core/auth_manager.py index 6de5071..5b9ac9f 100644 --- a/server/core/auth_manager.py +++ b/server/core/auth_manager.py @@ -31,7 +31,7 @@ except ImportError: "Cannot import shared.models. Ensure the project is run as a package or PYTHONPATH is set." ) -from logger import logger +from shared.logger import logger class AuthManager: diff --git a/server/core/bot_config_manager.py b/server/core/bot_config_manager.py index 5848f66..ff0f455 100644 --- a/server/core/bot_config_manager.py +++ b/server/core/bot_config_manager.py @@ -14,7 +14,7 @@ import httpx from typing import Dict, List, Optional, Any from pathlib import Path -from logger import logger +from shared.logger import logger # Import shared models import sys diff --git a/server/core/bot_manager.py b/server/core/bot_manager.py index 8a49b83..be533e5 100644 --- a/server/core/bot_manager.py +++ b/server/core/bot_manager.py @@ -9,7 +9,7 @@ import json import asyncio from typing import Dict, List, Optional from pydantic import ValidationError -from logger import logger +from shared.logger import logger # Import shared models import sys diff --git a/server/core/cache.py b/server/core/cache.py index 1ce81cc..2085abb 100644 --- a/server/core/cache.py +++ b/server/core/cache.py @@ -20,7 +20,7 @@ from typing import Any, Dict, Optional, Callable, TypeVar from collections import OrderedDict from dataclasses import dataclass -from logger import logger +from shared.logger import logger T = TypeVar('T') diff --git a/server/core/error_handling.py b/server/core/error_handling.py index 4a967e0..ab5ea57 100644 --- a/server/core/error_handling.py +++ b/server/core/error_handling.py @@ -15,7 +15,7 @@ from functools import wraps from dataclasses import dataclass from fastapi import WebSocket -from logger import logger +from shared.logger import logger T = TypeVar('T') diff --git a/server/core/health.py b/server/core/health.py index 14e4cf5..f1a51bc 100644 --- a/server/core/health.py +++ b/server/core/health.py @@ -18,7 +18,7 @@ from datetime import datetime, timedelta from typing import Dict, Any, List, Optional, NamedTuple from enum import Enum -from logger import logger +from shared.logger import logger class HealthStatus(Enum): diff --git a/server/core/lobby_manager.py b/server/core/lobby_manager.py index 7af02ee..4c56500 100644 --- a/server/core/lobby_manager.py +++ b/server/core/lobby_manager.py @@ -28,7 +28,7 @@ except ImportError: f"Failed to import shared models: {e}. Ensure shared/models.py is accessible and PYTHONPATH is correctly set." ) -from logger import logger +from shared.logger import logger # Use try/except for importing events to handle both relative and absolute imports try: diff --git a/server/core/performance.py b/server/core/performance.py index 8cfcbcb..3efbd0c 100644 --- a/server/core/performance.py +++ b/server/core/performance.py @@ -22,7 +22,7 @@ from dataclasses import dataclass, field from collections import defaultdict, deque from contextlib import asynccontextmanager -from logger import logger +from shared.logger import logger @dataclass diff --git a/server/core/session_manager.py b/server/core/session_manager.py index 0f1333c..a2e49f6 100644 --- a/server/core/session_manager.py +++ b/server/core/session_manager.py @@ -45,7 +45,7 @@ except ImportError: ) from core.lobby_manager import Lobby -from logger import logger +from shared.logger import logger # Import WebRTC signaling for peer management from websocket.webrtc_signaling import WebRTCSignalingHandlers @@ -140,6 +140,10 @@ class Session: self.bot_instance_id: Optional[str] = None # Bot instance ID for tracking self.session_lock = threading.RLock() # Instance-level lock + def is_bot(self) -> bool: + """Check if this session represents a bot""" + return bool(self.bot_run_id or self.bot_provider_id or self.bot_instance_id) + def getName(self) -> str: with self.session_lock: return f"{self.short}:{self.name if self.name else '[ ---- ]'}" @@ -401,6 +405,10 @@ class SessionManager: with self.lock: sessions_list: List[SessionSaved] = [] for s in self._instances: + # Skip bot sessions - they should not be persisted + # Bot sessions are managed by the voicebot service lifecycle + if s.bot_instance_id is not None or s.bot_run_id is not None or s.bot_provider_id is not None: + continue sessions_list.append(s.to_saved()) # Note: We'll need to handle name_passwords separately or inject it diff --git a/server/logger.py b/server/logger.py deleted file mode 100644 index c36454c..0000000 --- a/server/logger.py +++ /dev/null @@ -1,81 +0,0 @@ -import os -from typing import Any -import warnings -import logging - -logging_level = os.getenv("LOGGING_LEVEL", "INFO").upper() - - -class RelativePathFormatter(logging.Formatter): - def __init__( - self, fmt: Any = None, datefmt: Any = None, remove_prefix: str | None = None - ): - super().__init__(fmt, datefmt) - self.remove_prefix = remove_prefix or os.getcwd() - # Ensure the prefix ends with a separator - if not self.remove_prefix.endswith(os.sep): - self.remove_prefix += os.sep - - def format(self, record: logging.LogRecord): - # Make a copy of the record to avoid modifying the original - record = logging.makeLogRecord(record.__dict__) - - # Remove the prefix from pathname - if record.pathname.startswith(self.remove_prefix): - record.pathname = record.pathname[len(self.remove_prefix) :] - - return super().format(record) - - -def _setup_logging(level: Any = logging_level) -> logging.Logger: - os.environ["TORCH_CPP_LOG_LEVEL"] = "ERROR" - warnings.filterwarnings( - "ignore", message="Overriding a previously registered kernel" - ) - warnings.filterwarnings("ignore", message="Warning only once for all operators") - warnings.filterwarnings("ignore", message=".*Couldn't find ffmpeg or avconv.*") - warnings.filterwarnings("ignore", message="'force_all_finite' was renamed to") - warnings.filterwarnings("ignore", message="n_jobs value 1 overridden") - warnings.filterwarnings("ignore", message=".*websocket.*is deprecated") - - numeric_level = getattr(logging, level.upper(), None) - if not isinstance(numeric_level, int): - raise ValueError(f"Invalid log level: {level}") - - # Create a custom formatter - formatter = RelativePathFormatter( - fmt="%(levelname)s - %(pathname)s:%(lineno)d - %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) - - # Create a handler (e.g., StreamHandler for console output) - handler = logging.StreamHandler() - handler.setFormatter(formatter) - - # Configure root logger - logging.basicConfig( - level=numeric_level, - handlers=[handler], # Use only your handler - force=True, - ) - - # Set levels for noisy loggers - for noisy_logger in ( - "uvicorn", - "uvicorn.error", - "uvicorn.access", - "fastapi", - "starlette", - "httpx", - ): - logger = logging.getLogger(noisy_logger) - logger.setLevel(logging.WARNING) - logger.handlers = [] # Remove default handlers - logger.addHandler(handler) # Add your custom handler - - logger = logging.getLogger(__name__) - return logger - - -logger = _setup_logging(level=logging_level) -logger.debug(f"Logging initialized with level: {logging_level}") diff --git a/server/main.py b/server/main.py index 557ba58..5ad3d51 100644 --- a/server/main.py +++ b/server/main.py @@ -57,7 +57,7 @@ except ImportError: from api.bots import create_bot_router from api.bot_config import create_bot_config_router, setup_websocket_config_handlers -from logger import logger +from shared.logger import logger # Import performance monitoring components try: diff --git a/server/models/events.py b/server/models/events.py index 434ae70..a512aee 100644 --- a/server/models/events.py +++ b/server/models/events.py @@ -5,7 +5,7 @@ Event system for decoupled communication between server components. from abc import ABC from typing import Protocol, Dict, List import asyncio -from logger import logger +from shared.logger import logger class Event(ABC): diff --git a/server/websocket/connection.py b/server/websocket/connection.py index 3eb6bb1..3839c42 100644 --- a/server/websocket/connection.py +++ b/server/websocket/connection.py @@ -4,10 +4,11 @@ WebSocket connection management. This module handles WebSocket connections and integrates with the message router. """ +import asyncio from typing import Dict, Any, Optional, TYPE_CHECKING from fastapi import WebSocket, WebSocketDisconnect -from logger import logger +from shared.logger import logger from .message_handlers import MessageRouter if TYPE_CHECKING: @@ -100,7 +101,23 @@ class WebSocketConnectionManager: if session.id in lobby.sessions: logger.info(f"{session.getName()} - Stale session in lobby {lobby.getName()}. Re-joining.") try: + # Leave the lobby to clean up peer connections await session.leave_lobby(lobby) + # Verify the session has been properly removed before proceeding + while session.id in lobby.sessions: + logger.debug( + f"Waiting for session {session.getName()} to be fully removed from lobby" + ) + # Brief yield to allow cleanup to complete + await asyncio.sleep(0.01) + # Safety check to prevent infinite loop + if session.id in lobby.sessions: + logger.warning( + f"Force removing stale session {session.getName()} from lobby" + ) + with lobby.lock: + lobby.sessions.pop(session.id, None) + break except Exception as e: logger.warning(f"Error cleaning up stale session: {e}") diff --git a/server/websocket/message_handlers.py b/server/websocket/message_handlers.py index 811303b..aa92bd1 100644 --- a/server/websocket/message_handlers.py +++ b/server/websocket/message_handlers.py @@ -9,7 +9,7 @@ from abc import ABC, abstractmethod from typing import Dict, Any, TYPE_CHECKING from fastapi import WebSocket -from logger import logger +from shared.logger import logger from .webrtc_signaling import WebRTCSignalingHandlers from core.error_handling import ( error_handler, diff --git a/server/websocket/webrtc_signaling.py b/server/websocket/webrtc_signaling.py index 8a0123a..c8b9871 100644 --- a/server/websocket/webrtc_signaling.py +++ b/server/websocket/webrtc_signaling.py @@ -8,7 +8,7 @@ Handles ICE candidate relay and session description exchange between peers. from typing import Any, Dict, TYPE_CHECKING from fastapi import WebSocket -from logger import logger +from shared.logger import logger from core.error_handling import with_webrtc_error_handling if TYPE_CHECKING: @@ -216,6 +216,9 @@ class WebRTCSignalingHandlers: peer_session: The existing peer session in the lobby lobby: The lobby context """ + logger.info( + f"[TRACE] handle_add_peer called: session={session.getName()} (id={session.id}), peer_session={peer_session.getName()} (id={peer_session.id}), lobby={lobby.getName()}" + ) # Only establish WebRTC connections if at least one has media if session.has_media or peer_session.has_media: # Add peer_session to session's peer list @@ -229,11 +232,23 @@ class WebRTCSignalingHandlers: if lobby.id not in peer_session.lobby_peers: peer_session.lobby_peers[lobby.id] = [] peer_session.lobby_peers[lobby.id].append(session.id) + logger.info( + f"[TRACE] {session.getName()} lobby_peers after add: {session.lobby_peers}" + ) - # Notify existing peer about new peer (they should not create offer) + # Determine offer roles: bots should never create offers + # When a bot joins: existing humans create offers to the bot + # When a human joins: human creates offers to existing bots, bots don't create offers to human + + # Determine who should create the offer based on timing + # In our polite peer implementation, the newer peer typically creates the offer + existing_peer_should_offer = False + new_session_should_offer = True + + # Notify existing peer about new peer logger.info( f"{session.getName()} -> {peer_session.getName()}:addPeer(" - f"{session.getName()}, {lobby.getName()}, should_create_offer=False, " + f"{session.getName()}, {lobby.getName()}, should_create_offer={existing_peer_should_offer}, " f"has_media={session.has_media})" ) try: @@ -245,7 +260,7 @@ class WebRTCSignalingHandlers: "peer_id": session.id, "peer_name": session.name, "has_media": session.has_media, - "should_create_offer": False, + "should_create_offer": existing_peer_should_offer, }, } ) @@ -254,10 +269,10 @@ class WebRTCSignalingHandlers: f"Failed to send addPeer to {peer_session.getName()}: {e}" ) - # Notify new session about existing peer (they should create offer) + # Notify new session about existing peer logger.info( f"{session.getName()} -> {session.getName()}:addPeer(" - f"{peer_session.getName()}, {lobby.getName()}, should_create_offer=True, " + f"{peer_session.getName()}, {lobby.getName()}, should_create_offer={new_session_should_offer}, " f"has_media={peer_session.has_media})" ) try: @@ -269,7 +284,7 @@ class WebRTCSignalingHandlers: "peer_id": peer_session.id, "peer_name": peer_session.name, "has_media": peer_session.has_media, - "should_create_offer": True, + "should_create_offer": new_session_should_offer, }, } ) @@ -297,6 +312,9 @@ class WebRTCSignalingHandlers: peer_session: The peer session to disconnect from lobby: The lobby context """ + logger.info( + f"[TRACE] handle_remove_peer called: session={session.getName()} (id={session.id}), peer_session={peer_session.getName()} (id={peer_session.id}), lobby={lobby.getName()}" + ) # Notify peer about session removal if peer_session.ws: logger.info( diff --git a/voicebot/ai_providers.py b/voicebot/ai_providers.py index b27aa5d..a1b07a1 100644 --- a/voicebot/ai_providers.py +++ b/voicebot/ai_providers.py @@ -14,7 +14,7 @@ from enum import Enum from dataclasses import dataclass from pydantic import BaseModel, Field -from logger import logger +from shared.logger import logger class AIProviderType(str, Enum): diff --git a/voicebot/bot_orchestrator.py b/voicebot/bot_orchestrator.py index b71889c..e9db0b9 100644 --- a/voicebot/bot_orchestrator.py +++ b/voicebot/bot_orchestrator.py @@ -21,7 +21,7 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) import uvicorn from fastapi import FastAPI, HTTPException -from logger import logger +from shared.logger import logger from voicebot.models import JoinRequest from voicebot.webrtc_signaling import WebRTCSignalingClient diff --git a/voicebot/bots/ai_chatbot.py b/voicebot/bots/ai_chatbot.py index f0a0cf3..f02c163 100644 --- a/voicebot/bots/ai_chatbot.py +++ b/voicebot/bots/ai_chatbot.py @@ -17,7 +17,7 @@ from aiortc import MediaStreamTrack import sys sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) -from logger import logger +from shared.logger import logger from shared.models import ChatMessageModel # Import advanced bot management modules diff --git a/voicebot/bots/chatbot.py b/voicebot/bots/chatbot.py index 8ce1104..80fdb0c 100644 --- a/voicebot/bots/chatbot.py +++ b/voicebot/bots/chatbot.py @@ -7,7 +7,7 @@ rather than media streams. from typing import Dict, Optional, Callable, Awaitable import time import random -from logger import logger +from shared.logger import logger from aiortc import MediaStreamTrack # Import shared models for chat functionality diff --git a/voicebot/bots/synthetic_media.py b/voicebot/bots/synthetic_media.py index ee4e533..0d9c78b 100644 --- a/voicebot/bots/synthetic_media.py +++ b/voicebot/bots/synthetic_media.py @@ -20,7 +20,7 @@ from typing import TypedDict, TYPE_CHECKING, Tuple, List, Optional, Any import numpy.typing as npt from aiortc import MediaStreamTrack from av import VideoFrame -from logger import logger +from shared.logger import logger if TYPE_CHECKING: pass diff --git a/voicebot/bots/whisper.py b/voicebot/bots/whisper.py index c36eff0..ffab7db 100644 --- a/voicebot/bots/whisper.py +++ b/voicebot/bots/whisper.py @@ -20,7 +20,7 @@ from pydantic import BaseModel, Field, ConfigDict # Core dependencies import librosa -from logger import logger +from shared.logger import logger from aiortc import MediaStreamTrack from aiortc.mediastreams import MediaStreamError from av import AudioFrame diff --git a/voicebot/client_app.py b/voicebot/client_app.py index 192020f..f00972b 100644 --- a/voicebot/client_app.py +++ b/voicebot/client_app.py @@ -16,7 +16,7 @@ from typing import Optional sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from fastapi import FastAPI -from logger import logger +from shared.logger import logger # Import shared models from shared.models import ClientStatusResponse diff --git a/voicebot/client_main.py b/voicebot/client_main.py index 7ce8c27..f4f5d0d 100644 --- a/voicebot/client_main.py +++ b/voicebot/client_main.py @@ -7,7 +7,7 @@ This module contains the main client functionality and entry points. import asyncio import sys import os -from logger import logger +from shared.logger import logger from voicebot.bots.synthetic_media import AnimatedVideoTrack # Add the parent directory to sys.path to allow absolute imports diff --git a/voicebot/conversation_context.py b/voicebot/conversation_context.py index 757418c..0e3caf4 100644 --- a/voicebot/conversation_context.py +++ b/voicebot/conversation_context.py @@ -13,7 +13,7 @@ from dataclasses import dataclass, field from pydantic import BaseModel, Field from collections import defaultdict -from logger import logger +from shared.logger import logger @dataclass diff --git a/voicebot/entrypoint.sh b/voicebot/entrypoint.sh index a5c7394..ab9d85c 100644 --- a/voicebot/entrypoint.sh +++ b/voicebot/entrypoint.sh @@ -32,9 +32,6 @@ if [ "$PRODUCTION" != "true" ]; then exec uv run uvicorn main:uvicorn_app \ --host 0.0.0.0 \ --port 8788 \ - --reload \ - --reload-dir /voicebot \ - --reload-dir /shared \ --log-level info else echo "Running as client (connecting to lobby)..." diff --git a/voicebot/logger.py b/voicebot/logger.py deleted file mode 100644 index f961404..0000000 --- a/voicebot/logger.py +++ /dev/null @@ -1,125 +0,0 @@ -import os -import warnings -import logging -import time -from typing import Optional, Tuple - -logging_level = os.getenv("LOGGING_LEVEL", "INFO").upper() -#logging_level = os.getenv("LOGGING_LEVEL", "DEBUG").upper() - - -class RelativePathFormatter(logging.Formatter): - def __init__( - self, - fmt: Optional[str] = None, - datefmt: Optional[str] = None, - remove_prefix: Optional[str] = None, - ) -> None: - super().__init__(fmt, datefmt) - self.remove_prefix = remove_prefix or os.getcwd() - # Ensure the prefix ends with a separator - if not self.remove_prefix.endswith(os.sep): - self.remove_prefix += os.sep - - def format(self, record: logging.LogRecord) -> str: - """Create a shallow copy of the record and rewrite the pathname - to be relative to the configured prefix. Defensive checks are used - to satisfy static type checkers. - """ - # Make a copy of the record dict so we don't mutate the caller's record - record_dict = record.__dict__.copy() - new_record = logging.makeLogRecord(record_dict) - - # Remove the prefix from pathname if present - pathname = getattr(new_record, "pathname", "") - if pathname.startswith(self.remove_prefix): - new_record.pathname = pathname[len(self.remove_prefix) :] - - return super().format(new_record) - - -def _setup_logging(level: str=logging_level) -> logging.Logger: - os.environ["TORCH_CPP_LOG_LEVEL"] = "ERROR" - warnings.filterwarnings( - "ignore", message="Overriding a previously registered kernel" - ) - warnings.filterwarnings("ignore", message="Warning only once for all operators") - warnings.filterwarnings("ignore", message=".*Couldn't find ffmpeg or avconv.*") - warnings.filterwarnings("ignore", message="'force_all_finite' was renamed to") - warnings.filterwarnings("ignore", message="n_jobs value 1 overridden") - warnings.filterwarnings("ignore", message=".*websocket.*is deprecated") - - logging.getLogger("aiortc").setLevel(logging.WARNING) - logging.getLogger("aioice").setLevel(logging.WARNING) - logging.getLogger("asyncio").setLevel(logging.WARNING) - - numeric_level = getattr(logging, level.upper(), None) - if not isinstance(numeric_level, int): - raise ValueError(f"Invalid log level: {level}") - - # Create a custom formatter - formatter = RelativePathFormatter( - fmt="%(levelname)s - %(pathname)s:%(lineno)d - %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", - ) - - # Create a handler (e.g., StreamHandler for console output) - handler = logging.StreamHandler() - handler.setFormatter(formatter) - - # Simple repeat-suppression filter: if the exact same message (level+text) - # appears repeatedly within a short window, drop duplicates. This keeps - # the first occurrence for diagnostics but avoids log flooding from - # recurring asyncio/aioice stack traces. - class _RepeatFilter(logging.Filter): - def __init__(self, interval: float = 5.0) -> None: - super().__init__() - self._interval = interval - self._last: Optional[Tuple[int, str]] = None - self._last_time: float = 0.0 - - def filter(self, record: logging.LogRecord) -> bool: - try: - msg = record.getMessage() - except Exception: - # Fallback to a string representation if getMessage fails - msg = str(record) - - key: Tuple[int, str] = (getattr(record, "levelno", 0), msg) - now = time.time() - if self._last == key and (now - self._last_time) < self._interval: - return False - self._last = key - self._last_time = now - return True - - handler.addFilter(_RepeatFilter()) - - # Configure root logger - logging.basicConfig( - level=numeric_level, - handlers=[handler], # Use only your handler - force=True, - ) - - # Set levels for noisy loggers - for noisy_logger in ( - "uvicorn", - "uvicorn.error", - "uvicorn.access", - "fastapi", - "starlette", - ): - logger = logging.getLogger(noisy_logger) - logger.setLevel(logging.WARNING) - logger.handlers = [] # Remove default handlers - logger.addHandler(handler) # Add your custom handler - - logger = logging.getLogger(__name__) - return logger - - -logger = _setup_logging(level=logging_level) -logger.debug(f"Logging initialized with level: {logging_level}") - - diff --git a/voicebot/main.py b/voicebot/main.py index f58d4b2..39ed786 100644 --- a/voicebot/main.py +++ b/voicebot/main.py @@ -14,7 +14,7 @@ import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # Import logger for reload debugging -from logger import logger +from shared.logger import logger from voicebot.models import VoicebotArgs, VoicebotMode from voicebot.client_main import main_with_args, start_client_with_reload diff --git a/voicebot/personality_system.py b/voicebot/personality_system.py index befa69c..b4011c6 100644 --- a/voicebot/personality_system.py +++ b/voicebot/personality_system.py @@ -11,7 +11,7 @@ from typing import Dict, List, Optional, Any from enum import Enum from pydantic import BaseModel, Field -from logger import logger +from shared.logger import logger class PersonalityTrait(str, Enum): diff --git a/voicebot/set_whisper_debug.py b/voicebot/set_whisper_debug.py index ab48d05..1aa61ac 100644 --- a/voicebot/set_whisper_debug.py +++ b/voicebot/set_whisper_debug.py @@ -11,7 +11,7 @@ import os # Add the voicebot directory to the path sys.path.append(os.path.dirname(os.path.abspath(__file__))) -from logger import logger +from shared.logger import logger def set_debug_logging(): diff --git a/voicebot/utils.py b/voicebot/utils.py index 299f681..c54d6cd 100644 --- a/voicebot/utils.py +++ b/voicebot/utils.py @@ -38,7 +38,7 @@ def create_ssl_context(insecure: bool = False) -> ssl.SSLContext | None: def log_network_info(): """Log network information for debugging.""" - from logger import logger + from shared.logger import logger try: import socket diff --git a/voicebot/webrtc_signaling.py b/voicebot/webrtc_signaling.py index 0861ad2..c16ddf0 100644 --- a/voicebot/webrtc_signaling.py +++ b/voicebot/webrtc_signaling.py @@ -53,7 +53,7 @@ from shared.models import ( ChatMessageModel, ) -from logger import logger +from shared.logger import logger from voicebot.bots.synthetic_media import create_synthetic_tracks from voicebot.models import Peer, MessageData from voicebot.utils import create_ssl_context, log_network_info @@ -442,8 +442,8 @@ class WebRTCSignalingClient: async def _setup_local_media(self): """Create local media tracks""" - # If a bot provided a create_tracks callable, use it to create tracks. - # Otherwise, use default synthetic tracks. + # Always clear out old tracks to avoid reusing stopped or stale tracks + self.local_tracks.clear() try: if self.create_tracks: tracks = self.create_tracks(self.session_name) @@ -679,26 +679,66 @@ class WebRTCSignalingClient: logger.info( f"Adding peer: {peer_name} (should_create_offer: {should_create_offer})" ) + logger.info(f"_handle_add_peer: Current peer_connections count: {len(self.peer_connections)}") + logger.info(f"_handle_add_peer: Current active peers: {list(self.peer_connections.keys())}") logger.debug( f"_handle_add_peer: peer_id={peer_id}, peer_name={peer_name}, should_create_offer={should_create_offer}" ) - # Check if peer already exists + # Check if peer already exists and clean up stale connections if peer_id in self.peer_connections: pc = self.peer_connections[peer_id] - logger.debug( - f"_handle_add_peer: Existing connection state: {pc.connectionState}" - ) - if pc.connectionState in ["new", "connected", "connecting"]: - logger.info(f"Peer connection already exists for {peer_name}") - return - else: - # Clean up stale connection - logger.debug( - f"_handle_add_peer: Closing stale connection for {peer_name}" - ) + logger.info(f"_handle_add_peer: Found existing connection for {peer_name} (state: {pc.connectionState})") + logger.info("_handle_add_peer: Cleaning up existing connection to ensure fresh start") + + # Always clean up existing connection for fresh start + # This prevents stale connection issues during page refresh + try: await pc.close() - del self.peer_connections[peer_id] + except Exception as e: + logger.warning(f"Error closing existing connection for {peer_name}: {e}") + + # Clean up all associated state + self.is_negotiating.pop(peer_id, None) + self.making_offer.pop(peer_id, None) + self.initiated_offer.discard(peer_id) + self.pending_ice_candidates.pop(peer_id, None) + self.peers.pop(peer_id, None) # Also clean up peers dictionary + del self.peer_connections[peer_id] + + # Give the connection a moment to close, but don't block indefinitely + # If it doesn't close quickly, proceed anyway to avoid blocking other operations + try: + await asyncio.wait_for( + self._wait_for_connection_close(pc, peer_name), + timeout=1.0 + ) + except asyncio.TimeoutError: + logger.warning(f"Connection to {peer_name} did not close within timeout, proceeding anyway") + + # Also check for any existing connections with the same peer name but different ID + # This handles cases where the frontend gets a new session ID after refresh + connections_to_remove: list[str] = [] + for existing_peer_id, existing_peer in self.peers.items(): + if existing_peer.peer_name == peer_name and existing_peer_id != peer_id: + logger.info(f"_handle_add_peer: Found existing connection for peer name {peer_name} with different ID {existing_peer_id}, cleaning up") + connections_to_remove.append(existing_peer_id) + + for old_peer_id in connections_to_remove: + if old_peer_id in self.peer_connections: + pc = self.peer_connections[old_peer_id] + try: + await pc.close() + except Exception as e: + logger.warning(f"Error closing old connection for {peer_name}: {e}") + + # Clean up all associated state for old connection + self.is_negotiating.pop(old_peer_id, None) + self.making_offer.pop(old_peer_id, None) + self.initiated_offer.discard(old_peer_id) + self.pending_ice_candidates.pop(old_peer_id, None) + self.peers.pop(old_peer_id, None) + del self.peer_connections[old_peer_id] # Create new peer peer = Peer(session_id=peer_id, peer_name=peer_name, local=False) @@ -713,8 +753,6 @@ class WebRTCSignalingClient: username="ketra", credential="ketran", ), - # Add Google's public STUN server as fallback - RTCIceServer(urls="stun:stun.l.google.com:19302"), ], ) logger.debug( @@ -734,12 +772,39 @@ class WebRTCSignalingClient: # Add connection state change handler def on_connection_state_change() -> None: - logger.info(f"Connection state: {pc.connectionState}") + logger.info(f"🔄 Connection state changed for {peer_name}: {pc.connectionState}") + + # Log additional details for debugging connection closure/failure + logger.info(f" ICE connection state: {pc.iceConnectionState}") + logger.info(f" Signaling state: {pc.signalingState}") + # If available, log the last ICE candidate pair and any error details + selected_pair = None + try: + if hasattr(pc, 'sctp') and pc.sctp: + selected_pair = getattr(pc.sctp.transport, 'iceTransport', None) + elif hasattr(pc, 'iceTransport'): + selected_pair = pc.iceTransport + except Exception as e: + logger.warning(f"Error accessing selected ICE pair: {e}") + if selected_pair: + logger.info(f" Selected ICE candidate pair: {getattr(selected_pair, 'selectedCandidatePair', None)}") + + # Handle failed connections - could be due to network issues during refresh + if pc.connectionState == "failed": + logger.warning(f"❌ Connection to {peer_name} failed - this might be due to page refresh") + elif pc.connectionState == "disconnected": + logger.info(f"⚠️ Connection to {peer_name} disconnected") + elif pc.connectionState == "connected": + logger.info(f"✅ Connection to {peer_name} established successfully") + elif pc.connectionState == "closed": + logger.info(f"🔒 Connection to {peer_name} closed") pc.on("connectionstatechange")(on_connection_state_change) + logger.info(f"📝 Storing peer connection for {peer_name} (ID: {peer_id})") self.peer_connections[peer_id] = pc peer.connection = pc + logger.info(f"📊 Total peer connections now: {len(self.peer_connections)}") # Set up event handlers def on_track(track: MediaStreamTrack) -> None: @@ -1043,19 +1108,33 @@ class WebRTCSignalingClient: # Close peer connection if peer_id in self.peer_connections: pc = self.peer_connections[peer_id] - await pc.close() + try: + await pc.close() + # Wait for connection to be properly closed + while pc.connectionState not in ["closed", "failed"]: + logger.debug(f"Waiting for connection to {peer_name} to close (current state: {pc.connectionState})") + # Brief yield to allow cleanup + await asyncio.sleep(0.01) + except Exception as e: + logger.warning(f"Error closing peer connection for {peer_name}: {e}") del self.peer_connections[peer_id] - # Clean up state - self.is_negotiating.pop(peer_id, None) - self.making_offer.pop(peer_id, None) + # Extra forced cleanup: remove all state for this peer_id + for state_dict in [self.is_negotiating, self.making_offer, self.pending_ice_candidates]: + if peer_id in state_dict: + logger.debug(f"Force removing {peer_id} from {state_dict}") + state_dict.pop(peer_id, None) self.initiated_offer.discard(peer_id) - self.pending_ice_candidates.pop(peer_id, None) # Remove peer peer = self.peers.pop(peer_id, None) if peer and self.on_peer_removed: - await self.on_peer_removed(peer) + try: + await self.on_peer_removed(peer) + except Exception as e: + logger.warning(f"Error in on_peer_removed callback for {peer_name}: {e}") + + logger.info(f"Completed removing peer: {peer_name} (all state reset)") async def _handle_session_description(self, data: SessionDescriptionModel): """Handle sessionDescription message""" @@ -1084,12 +1163,12 @@ class WebRTCSignalingClient: ) we_initiated = peer_id in self.initiated_offer - # For bots, be more polite - always yield to human users in collision - # Bots should generally be the polite peer - ignore_offer = offer_collision and we_initiated + # For bots, be more polite - accept remote offers even in collision scenarios + # Only ignore if we're in the middle of sending our own offer AND it's actually conflicting + ignore_offer = offer_collision and we_initiated and pc.signalingState != "stable" if ignore_offer: - logger.info(f"Ignoring offer from {peer_name} due to collision (bot being polite)") + logger.info(f"Ignoring offer from {peer_name} due to active collision (bot being polite)") # Reset our offer state to allow the remote offer to proceed if peer_id in self.initiated_offer: self.initiated_offer.remove(peer_id) @@ -1101,7 +1180,11 @@ class WebRTCSignalingClient: await asyncio.sleep(1.0) # Wait 1 second if pc.connectionState not in ["connected", "closed", "failed"]: logger.info(f"Retrying connection setup for {peer_name} after collision") - # Don't create offer, let the remote peer drive + # Reset state to allow the remote peer to retry + self.making_offer[peer_id] = False + self.is_negotiating[peer_id] = False + if peer_id in self.initiated_offer: + self.initiated_offer.remove(peer_id) asyncio.create_task(retry_connection()) return @@ -1212,7 +1295,7 @@ class WebRTCSignalingClient: peer_name = data.peer_name candidate_data = data.candidate - logger.info(f"Received ICE candidate from {peer_name}") + logger.info(f"Received ICE candidate from {peer_name}: {candidate_data}") pc = self.peer_connections.get(peer_id) if not pc: @@ -1250,6 +1333,7 @@ class WebRTCSignalingClient: except Exception: cand_type = "unknown" + logger.info(f"Attempting to add ICE candidate for {peer_name}: type={cand_type}, sdp='{sdp_part}'") try: rtc_candidate = candidate_from_sdp(sdp_part) rtc_candidate.sdpMid = candidate_data.sdpMid @@ -1257,7 +1341,7 @@ class WebRTCSignalingClient: # aiortc expects an object with attributes (RTCIceCandidate) await pc.addIceCandidate(rtc_candidate) - logger.info(f"ICE candidate added for {peer_name}: type={cand_type}") + logger.info(f"ICE candidate successfully added for {peer_name}: type={cand_type}") except Exception as e: logger.error( f"Failed to add ICE candidate for {peer_name}: type={cand_type} error={e} sdp='{sdp_part}'",