``` POST https://ketrenos.com/ai-voicebot/api/bots/ai_chatbot/join 404 (Not Found) ``` The issue was caused by three main problems: 1. **Incorrect Provider Registration Check**: The voicebot service was checking provider registration using the wrong API endpoint (`/api/bots` instead of `/api/bots/providers`) 2. **No Persistence for Bot Providers**: Bot providers were stored only in memory and lost on server restart, requiring re-registration 3. **AsyncIO Task Initialization Issue**: The cleanup task was being created during `__init__` when no event loop was running, causing FastAPI route registration failures **File**: `voicebot/bot_orchestrator.py` **Problem**: The `check_provider_registration` function was calling `/api/bots` (which returns available bots) instead of `/api/bots/providers` (which returns registered providers). **Fix**: Updated the function to use the correct endpoint and parse the response properly: ```python async def check_provider_registration(server_url: str, provider_id: str, insecure: bool = False) -> bool: """Check if the bot provider is still registered with the server.""" try: import httpx verify = not insecure async with httpx.AsyncClient(verify=verify) as client: # Check if our provider is still in the provider list response = await client.get(f"{server_url}/api/bots/providers", timeout=5.0) if response.status_code == 200: data = response.json() providers = data.get("providers", []) # providers is a list of BotProviderModel objects, check if our provider_id is in the list is_registered = any(provider.get("provider_id") == provider_id for provider in providers) logger.debug(f"Registration check: provider_id={provider_id}, found_providers={len(providers)}, is_registered={is_registered}") return is_registered else: logger.warning(f"Registration check failed: HTTP {response.status_code}") return False except Exception as e: logger.debug(f"Provider registration check failed: {e}") return False ``` **File**: `server/core/bot_manager.py` **Problem**: Bot providers were stored only in memory and lost on server restart. **Fix**: Added persistence functionality to save/load bot providers to/from `bot_providers.json`: ```python def _save_bot_providers(self): """Save bot providers to disk""" try: with self.lock: providers_data = {} for provider_id, provider in self.bot_providers.items(): providers_data[provider_id] = provider.model_dump() with open(self.bot_providers_file, 'w') as f: json.dump(providers_data, f, indent=2) logger.debug(f"Saved {len(providers_data)} bot providers to {self.bot_providers_file}") except Exception as e: logger.error(f"Failed to save bot providers: {e}") def _load_bot_providers(self): """Load bot providers from disk""" try: if not os.path.exists(self.bot_providers_file): logger.debug(f"No bot providers file found at {self.bot_providers_file}") return with open(self.bot_providers_file, 'r') as f: providers_data = json.load(f) with self.lock: for provider_id, provider_dict in providers_data.items(): try: provider = BotProviderModel.model_validate(provider_dict) self.bot_providers[provider_id] = provider except Exception as e: logger.warning(f"Failed to load bot provider {provider_id}: {e}") logger.info(f"Loaded {len(self.bot_providers)} bot providers from {self.bot_providers_file}") except Exception as e: logger.error(f"Failed to load bot providers: {e}") ``` **Integration**: The persistence functions are automatically called: - `_load_bot_providers()` during `BotManager.__init__()` - `_save_bot_providers()` when registering new providers or removing stale ones **File**: `server/core/bot_manager.py` **Problem**: The cleanup task was being created during `BotManager.__init__()` when no event loop was running, causing the FastAPI application to fail to register routes properly. **Fix**: Deferred the cleanup task creation until it's actually needed: ```python def __init__(self): # ... other initialization ... # Load persisted bot providers self._load_bot_providers() # Note: Don't start cleanup task here - will be started when needed def start_cleanup(self): """Start the cleanup task""" try: if self.cleanup_task is None: self.cleanup_task = asyncio.create_task(self._periodic_cleanup()) logger.debug("Bot provider cleanup task started") except RuntimeError: # No event loop running yet, cleanup will be started later logger.debug("No event loop available for bot provider cleanup task") async def register_provider(self, request: BotProviderRegisterRequest) -> BotProviderRegisterResponse: # ... registration logic ... # Start cleanup task if not already running self.start_cleanup() return BotProviderRegisterResponse(provider_id=provider_id) ``` **File**: `server/core/bot_manager.py` **Enhancement**: Added a background task that periodically removes providers that haven't been seen in 15 minutes: ```python async def _periodic_cleanup(self): """Periodically clean up stale bot providers""" cleanup_interval = 300 # 5 minutes stale_threshold = 900 # 15 minutes while not self._shutdown_event.is_set(): try: await asyncio.sleep(cleanup_interval) now = time.time() providers_to_remove = [] with self.lock: for provider_id, provider in self.bot_providers.items(): if now - provider.last_seen > stale_threshold: providers_to_remove.append(provider_id) logger.info(f"Marking stale bot provider for removal: {provider.name} (ID: {provider_id}, last_seen: {now - provider.last_seen:.1f}s ago)") if providers_to_remove: with self.lock: for provider_id in providers_to_remove: if provider_id in self.bot_providers: del self.bot_providers[provider_id] self._save_bot_providers() logger.info(f"Cleaned up {len(providers_to_remove)} stale bot providers") except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in bot provider cleanup: {e}") ``` **File**: `client/src/BotManager.tsx` **Enhancement**: Added retry logic to handle temporary 404s during service restarts: ```typescript // Retry logic for handling service restart scenarios let retries = 3; let response; while (retries > 0) { try { response = await botsApi.requestJoinLobby(selectedBot, request); break; // Success, exit retry loop } catch (err: any) { retries--; // If it's a 404 error and we have retries left, wait and retry if (err?.status === 404 && retries > 0) { console.log(`Bot join failed with 404, retrying... (${retries} attempts left)`); await new Promise(resolve => setTimeout(resolve, 1000)); // Wait 1 second continue; } // If it's not a 404 or we're out of retries, throw the error throw err; } } ``` 1. **Persistence**: Bot providers now survive server restarts and don't need to re-register immediately 2. **Correct Registration Checks**: Provider registration checks use the correct API endpoint 3. **Proper AsyncIO Task Management**: Cleanup tasks are started only when an event loop is available 4. **Automatic Cleanup**: Stale providers are automatically removed to prevent accumulation of dead entries 5. **Client Resilience**: Frontend can handle temporary 404s during service restarts with automatic retries 6. **Reduced Downtime**: Users experience fewer failed bot additions during service restarts After implementing these fixes: 1. Bot providers are correctly persisted in `bot_providers.json` 2. Server restarts load existing providers from disk 3. Provider registration checks use the correct `/api/bots/providers` endpoint 4. AsyncIO cleanup tasks start properly without interfering with route registration 5. Client retries failed requests with 404 errors 6. Periodic cleanup prevents accumulation of stale providers 7. Bot join requests work correctly: `POST /api/bots/{bot_name}/join` returns 200 OK Test the fix with these commands: ```bash curl -k https://ketrenos.com/ai-voicebot/api/lobby curl -k -X POST https://ketrenos.com/ai-voicebot/api/bots/ai_chatbot/join \ -H "Content-Type: application/json" \ -d '{"lobby_id":"<lobby_id>","nick":"test-bot","provider_id":"<provider_id>"}' curl -k https://ketrenos.com/ai-voicebot/api/bots/providers curl -k https://ketrenos.com/ai-voicebot/api/bots ``` 1. `voicebot/bot_orchestrator.py` - Fixed registration check endpoint 2. `server/core/bot_manager.py` - Added persistence and cleanup 3. `client/src/BotManager.tsx` - Added retry logic No additional configuration is required. The fixes work with existing environment variables and settings.
644 lines
25 KiB
Python
644 lines
25 KiB
Python
"""
|
||
Bot orchestrator FastAPI service.
|
||
|
||
This module provides the FastAPI service for bot discovery and orchestration.
|
||
"""
|
||
|
||
import asyncio
|
||
import threading
|
||
import uuid
|
||
import importlib
|
||
import pkgutil
|
||
import sys
|
||
import os
|
||
import time
|
||
from contextlib import asynccontextmanager
|
||
from typing import Dict, Any, List, Optional
|
||
|
||
# Add the parent directory to sys.path to allow absolute imports
|
||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
||
import uvicorn
|
||
from fastapi import FastAPI, HTTPException
|
||
|
||
from logger import logger
|
||
from voicebot.models import JoinRequest
|
||
from voicebot.webrtc_signaling import WebRTCSignalingClient
|
||
|
||
# Add shared models import for chat types
|
||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
from shared.models import ChatMessageModel, BotInfoModel, BotProviderBotsResponse
|
||
|
||
|
||
# Global variables for reconnection logic
|
||
_server_url: Optional[str] = None
|
||
_voicebot_url: Optional[str] = None
|
||
_insecure: bool = False
|
||
_provider_id: Optional[str] = None
|
||
_reconnect_task: Optional[asyncio.Task[None]] = None
|
||
_shutdown_event = asyncio.Event()
|
||
_provider_registration_status: bool = False
|
||
|
||
|
||
def get_provider_registration_status() -> dict[str, bool | str | float | None]:
|
||
"""Get the current provider registration status for use by bot clients."""
|
||
return {
|
||
"is_registered": _provider_registration_status,
|
||
"provider_id": _provider_id,
|
||
"server_url": _server_url,
|
||
"last_check": time.time()
|
||
}
|
||
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
global _reconnect_task, _shutdown_event
|
||
|
||
# Startup
|
||
logger.info(f"🚀 Voicebot bot orchestrator started successfully at {time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||
# Log the discovered bots
|
||
bots = discover_bots()
|
||
if bots:
|
||
bot_names = [bot.name for bot in bots]
|
||
logger.info(f"📋 Discovered {len(bots)} bots: {bot_names}")
|
||
else:
|
||
logger.info("⚠️ No bots discovered")
|
||
|
||
# Check for remote server registration
|
||
remote_server_url = os.getenv('VOICEBOT_SERVER_URL')
|
||
if remote_server_url:
|
||
# Set up global variables for reconnection logic
|
||
global _server_url, _voicebot_url, _insecure, _provider_id
|
||
_server_url = remote_server_url
|
||
_insecure = os.getenv('VOICEBOT_SERVER_INSECURE', 'false').lower() == 'true'
|
||
|
||
host = os.getenv('HOST', '0.0.0.0')
|
||
port = os.getenv('PORT', '8788')
|
||
_voicebot_url = _construct_voicebot_url(host, port)
|
||
|
||
# Attempt initial registration
|
||
try:
|
||
_provider_id = await _perform_server_registration(remote_server_url, host, port, _insecure)
|
||
_provider_registration_status = True
|
||
logger.info(f"🎉 Successfully registered with remote server! Provider ID: {_provider_id}")
|
||
except Exception as e:
|
||
_provider_registration_status = False
|
||
logger.error(f"❌ Failed initial registration with remote server: {e}")
|
||
logger.warning("⚠️ Will attempt reconnection in background")
|
||
|
||
# Start the reconnection monitoring task
|
||
_reconnect_task = asyncio.create_task(reconnection_monitor())
|
||
else:
|
||
logger.info("ℹ️ No VOICEBOT_SERVER_URL provided - running in local-only mode")
|
||
|
||
yield
|
||
|
||
# Shutdown
|
||
logger.info("🛑 Voicebot bot orchestrator shutting down")
|
||
_shutdown_event.set()
|
||
if _reconnect_task:
|
||
_reconnect_task.cancel()
|
||
try:
|
||
await _reconnect_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
|
||
async def reconnection_monitor():
|
||
"""Background task that monitors server connectivity and re-registers if needed."""
|
||
reconnect_interval = 15 # Check every 15 seconds (faster for testing)
|
||
retry_interval = 5 # Retry failed connections every 5 seconds
|
||
|
||
global _provider_registration_status, _provider_id
|
||
|
||
logger.info(f"🔄 Starting provider reconnection monitor (check every {reconnect_interval}s)")
|
||
|
||
while not _shutdown_event.is_set():
|
||
try:
|
||
if _server_url and _voicebot_url and _provider_id:
|
||
# First check if server is healthy
|
||
is_server_healthy = await check_server_health(_server_url, _insecure)
|
||
|
||
if not is_server_healthy:
|
||
logger.warning("⚠️ Server appears to be down or unreachable")
|
||
_provider_registration_status = False
|
||
# Try to re-register
|
||
try:
|
||
_provider_id = await register_with_server(_server_url, _voicebot_url, _insecure)
|
||
_provider_registration_status = True
|
||
logger.info(f"🔄 Successfully re-registered with server! Provider ID: {_provider_id}")
|
||
# Use longer interval after successful reconnection
|
||
await asyncio.sleep(reconnect_interval)
|
||
except Exception as e:
|
||
logger.error(f"❌ Re-registration failed: {e}")
|
||
# Use shorter interval for retry
|
||
await asyncio.sleep(retry_interval)
|
||
else:
|
||
# Server is healthy, now check if we're still registered
|
||
is_registered = await check_provider_registration(_server_url, _provider_id, _insecure)
|
||
_provider_registration_status = is_registered
|
||
|
||
if not is_registered:
|
||
logger.warning("⚠️ Provider registration lost, attempting to re-register")
|
||
try:
|
||
_provider_id = await register_with_server(_server_url, _voicebot_url, _insecure)
|
||
_provider_registration_status = True
|
||
logger.info(f"🔄 Successfully re-registered with server! Provider ID: {_provider_id}")
|
||
except Exception as e:
|
||
logger.error(f"❌ Re-registration failed: {e}")
|
||
await asyncio.sleep(retry_interval)
|
||
continue
|
||
|
||
# All good, check again after normal interval
|
||
await asyncio.sleep(reconnect_interval)
|
||
else:
|
||
# Missing configuration, wait longer
|
||
_provider_registration_status = False
|
||
await asyncio.sleep(reconnect_interval * 2)
|
||
|
||
except asyncio.CancelledError:
|
||
logger.info("🛑 Provider reconnection monitor cancelled")
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"💥 Unexpected error in provider reconnection monitor: {e}")
|
||
await asyncio.sleep(retry_interval)
|
||
|
||
|
||
async def check_server_health(server_url: str, insecure: bool = False) -> bool:
|
||
"""Check if the server is reachable and healthy."""
|
||
try:
|
||
import httpx
|
||
|
||
verify = not insecure
|
||
async with httpx.AsyncClient(verify=verify) as client:
|
||
# Try to hit the health endpoint
|
||
response = await client.get(f"{server_url}/api/health", timeout=5.0)
|
||
return response.status_code == 200
|
||
except Exception as e:
|
||
logger.debug(f"Health check failed: {e}")
|
||
return False
|
||
|
||
|
||
async def check_provider_registration(server_url: str, provider_id: str, insecure: bool = False) -> bool:
|
||
"""Check if the bot provider is still registered with the server."""
|
||
try:
|
||
import httpx
|
||
|
||
verify = not insecure
|
||
async with httpx.AsyncClient(verify=verify) as client:
|
||
# Check if our provider is still in the provider list
|
||
response = await client.get(f"{server_url}/api/bots/providers", timeout=5.0)
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
providers = data.get("providers", [])
|
||
# providers is a list of BotProviderModel objects, check if our provider_id is in the list
|
||
is_registered = any(provider.get("provider_id") == provider_id for provider in providers)
|
||
logger.debug(f"Registration check: provider_id={provider_id}, found_providers={len(providers)}, is_registered={is_registered}")
|
||
return is_registered
|
||
else:
|
||
logger.warning(f"Registration check failed: HTTP {response.status_code}")
|
||
return False
|
||
except Exception as e:
|
||
logger.debug(f"Provider registration check failed: {e}")
|
||
return False
|
||
|
||
app = FastAPI(title="voicebot-bot-orchestrator", lifespan=lifespan)
|
||
|
||
# Lightweight in-memory registry of running bot clients
|
||
registry: Dict[str, WebRTCSignalingClient] = {}
|
||
|
||
# Log module import for debugging reloads
|
||
logger.info("📦 Bot orchestrator module imported/reloaded")
|
||
|
||
|
||
# Global bot registry for internal use
|
||
_bot_registry: Dict[str, Dict[str, Any]] = {}
|
||
|
||
def discover_bots() -> "List[BotInfoModel]":
|
||
"""Discover bot modules under the voicebot.bots package that expose bot_info.
|
||
|
||
This intentionally imports modules under `voicebot.bots` so heavy bot
|
||
implementations can remain in that package and be imported lazily.
|
||
"""
|
||
global _bot_registry
|
||
from shared.models import BotInfoModel
|
||
|
||
bots: List[BotInfoModel] = []
|
||
_bot_registry.clear() # Clear previous discoveries
|
||
|
||
try:
|
||
package = importlib.import_module("voicebot.bots")
|
||
package_path = package.__path__
|
||
except Exception:
|
||
logger.exception("Failed to import voicebot.bots package")
|
||
return bots
|
||
|
||
for _finder, name, _ispkg in pkgutil.iter_modules(package_path):
|
||
try:
|
||
mod = importlib.import_module(f"voicebot.bots.{name}")
|
||
except Exception:
|
||
logger.exception("Failed to import voicebot.bots.%s", name)
|
||
continue
|
||
|
||
if hasattr(mod, "agent_info") and callable(getattr(mod, "agent_info")):
|
||
try:
|
||
info = mod.agent_info()
|
||
# Convert string has_media to boolean for compatibility
|
||
processed_info = dict(info)
|
||
has_media_value = processed_info.get("has_media", True)
|
||
if isinstance(has_media_value, str):
|
||
processed_info["has_media"] = has_media_value.lower() in ("true", "1", "yes")
|
||
|
||
# Create BotInfoModel using model_validate
|
||
bot_info = BotInfoModel.model_validate(processed_info)
|
||
bots.append(bot_info)
|
||
|
||
# Store additional metadata in registry
|
||
create_tracks = None
|
||
if hasattr(mod, "create_agent_tracks") and callable(getattr(mod, "create_agent_tracks")):
|
||
create_tracks = getattr(mod, "create_agent_tracks")
|
||
|
||
chat_handler = None
|
||
if hasattr(mod, "handle_chat_message") and callable(getattr(mod, "handle_chat_message")):
|
||
chat_handler = getattr(mod, "handle_chat_message")
|
||
|
||
track_handler = None
|
||
if hasattr(mod, "get_track_handler") and callable(getattr(mod, "get_track_handler")):
|
||
track_handler = getattr(mod, "get_track_handler")()
|
||
elif hasattr(mod, "on_track_received") and callable(getattr(mod, "on_track_received")):
|
||
track_handler = getattr(mod, "on_track_received")
|
||
|
||
bind_send_chat_function = None
|
||
if hasattr(mod, "bind_send_chat_function") and callable(getattr(mod, "bind_send_chat_function")):
|
||
bind_send_chat_function = getattr(mod, "bind_send_chat_function")
|
||
|
||
# Check for configuration schema support
|
||
config_schema = None
|
||
config_handler = None
|
||
if hasattr(mod, "get_config_schema") and callable(getattr(mod, "get_config_schema")):
|
||
try:
|
||
config_schema = mod.get_config_schema()
|
||
logger.info(f"Bot {bot_info.name} supports configuration")
|
||
except Exception as e:
|
||
logger.warning(f"Failed to get config schema for {bot_info.name}: {e}")
|
||
|
||
if hasattr(mod, "handle_config_update") and callable(getattr(mod, "handle_config_update")):
|
||
config_handler = getattr(mod, "handle_config_update")
|
||
|
||
_bot_registry[bot_info.name] = {
|
||
"module": name,
|
||
"info": bot_info,
|
||
"create_tracks": create_tracks,
|
||
"chat_handler": chat_handler,
|
||
"track_handler": track_handler,
|
||
"chat_bind": bind_send_chat_function,
|
||
"config_schema": config_schema,
|
||
"config_handler": config_handler,
|
||
}
|
||
|
||
except Exception:
|
||
logger.exception("agent_info() failed for %s", name)
|
||
|
||
return bots
|
||
|
||
|
||
@app.get("/bots")
|
||
def list_bots() -> "BotProviderBotsResponse":
|
||
"""List available bots."""
|
||
from shared.models import BotProviderBotsResponse
|
||
bots = discover_bots()
|
||
return BotProviderBotsResponse(bots=bots)
|
||
|
||
|
||
@app.post("/bots/{bot_name}/join")
|
||
async def bot_join(bot_name: str, req: JoinRequest):
|
||
"""Make a bot join a lobby."""
|
||
# Ensure bots are discovered and registry is populated
|
||
discover_bots()
|
||
|
||
if bot_name not in _bot_registry:
|
||
raise HTTPException(status_code=404, detail="Bot not found")
|
||
|
||
bot_data = _bot_registry[bot_name]
|
||
create_tracks = bot_data.get("create_tracks")
|
||
chat_handler = bot_data.get("chat_handler")
|
||
track_handler = bot_data.get("track_handler")
|
||
bind_send_chat_function = bot_data.get("chat_bind")
|
||
|
||
logger.info(f"🤖 Bot {bot_name} joining lobby {req.lobby_id} with nick: '{req.nick}'")
|
||
if track_handler:
|
||
logger.info(f"🤖 Bot {bot_name} has track handling capabilities")
|
||
if chat_handler:
|
||
logger.info(f"🤖 Bot {bot_name} has chat receiving capabilities")
|
||
if bind_send_chat_function:
|
||
logger.info(f"🤖 Bot {bot_name} has chat sending capabilities")
|
||
|
||
# Start the WebRTCSignalingClient in a background asyncio task and register it
|
||
client = WebRTCSignalingClient(
|
||
server_url=req.server_url,
|
||
lobby_id=req.lobby_id,
|
||
session_id=req.session_id,
|
||
session_name=req.nick,
|
||
insecure=req.insecure,
|
||
create_tracks=create_tracks,
|
||
bind_send_chat_function=bind_send_chat_function,
|
||
)
|
||
|
||
# Set up chat message handler if the bot provides one
|
||
if chat_handler:
|
||
async def bot_chat_handler(chat_message: ChatMessageModel):
|
||
"""Wrapper to call the bot's chat handler and optionally send responses"""
|
||
try:
|
||
# Call the bot's chat handler - it may return a response message
|
||
response = await chat_handler(chat_message, client.send_chat_message)
|
||
if response and isinstance(response, str):
|
||
await client.send_chat_message(response)
|
||
except Exception as e:
|
||
logger.error(f"Error in bot chat handler for {bot_name}: {e}", exc_info=True)
|
||
|
||
client.on_chat_message_received = bot_chat_handler
|
||
|
||
# Set up track handler if the bot provides one
|
||
if track_handler:
|
||
client.on_track_received = track_handler
|
||
|
||
run_id = str(uuid.uuid4())
|
||
|
||
async def run_client():
|
||
try:
|
||
registry[run_id] = client
|
||
await client.connect()
|
||
except Exception:
|
||
logger.exception("Bot client failed for run %s", run_id)
|
||
finally:
|
||
registry.pop(run_id, None)
|
||
|
||
def run_client_in_thread():
|
||
"""Run the client in a new event loop in a separate thread."""
|
||
try:
|
||
asyncio.run(run_client())
|
||
except Exception:
|
||
logger.exception("Bot client thread failed for run %s", run_id)
|
||
|
||
threading.Thread(target=run_client_in_thread, daemon=True).start()
|
||
|
||
return {"status": "started", "bot": bot_name, "run_id": run_id}
|
||
|
||
|
||
@app.get("/provider/status")
|
||
def get_provider_status():
|
||
"""Get the current provider registration status."""
|
||
return get_provider_registration_status()
|
||
|
||
|
||
@app.get("/bots/{bot_name}/config-schema")
|
||
def get_bot_config_schema(bot_name: str):
|
||
"""Get configuration schema for a specific bot."""
|
||
if bot_name not in _bot_registry:
|
||
raise HTTPException(status_code=404, detail=f"Bot '{bot_name}' not found")
|
||
|
||
bot_entry = _bot_registry[bot_name]
|
||
config_schema = bot_entry.get("config_schema")
|
||
|
||
if not config_schema:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail=f"Bot '{bot_name}' does not support configuration"
|
||
)
|
||
|
||
return config_schema
|
||
|
||
|
||
@app.post("/bots/{bot_name}/config")
|
||
async def update_bot_config(bot_name: str, config_data: dict[str, Any]) -> dict[str, str | bool]:
|
||
"""Update bot configuration for a specific lobby."""
|
||
if bot_name not in _bot_registry:
|
||
raise HTTPException(status_code=404, detail=f"Bot '{bot_name}' not found")
|
||
|
||
bot_entry = _bot_registry[bot_name]
|
||
config_handler = bot_entry.get("config_handler")
|
||
|
||
if not config_handler:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail=f"Bot '{bot_name}' does not support configuration updates"
|
||
)
|
||
|
||
try:
|
||
lobby_id = config_data.get("lobby_id")
|
||
config_values = config_data.get("config_values", {})
|
||
|
||
if not lobby_id:
|
||
raise HTTPException(status_code=400, detail="lobby_id is required")
|
||
|
||
# Call the bot's configuration handler
|
||
success = await config_handler(lobby_id, config_values)
|
||
|
||
if success:
|
||
return {"success": True, "message": "Configuration updated successfully"}
|
||
else:
|
||
return {"success": False, "message": "Configuration update failed"}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to update config for bot {bot_name}: {e}")
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
|
||
@app.post("/bots/runs/{run_id}/stop")
|
||
async def stop_run(run_id: str):
|
||
"""Stop a running bot."""
|
||
client = registry.get(run_id)
|
||
if not client:
|
||
raise HTTPException(status_code=404, detail="Run not found")
|
||
|
||
try:
|
||
# Request graceful shutdown instead of awaiting disconnect from different loop
|
||
client.request_shutdown()
|
||
|
||
# Give the client a moment to shutdown gracefully
|
||
await asyncio.sleep(0.5)
|
||
|
||
# Remove from registry
|
||
registry.pop(run_id, None)
|
||
|
||
return {"status": "stopped", "run_id": run_id}
|
||
except Exception:
|
||
logger.exception("Failed to stop run %s", run_id)
|
||
# Still remove from registry even if shutdown failed
|
||
registry.pop(run_id, None)
|
||
raise HTTPException(status_code=500, detail="Failed to stop run")
|
||
|
||
|
||
@app.get("/bots/runs")
|
||
def list_runs() -> Dict[str, Any]:
|
||
"""List running bot instances."""
|
||
return {
|
||
"runs": [
|
||
{"run_id": run_id, "session_id": client.session_id, "session_name": client.session_name}
|
||
for run_id, client in registry.items()
|
||
]
|
||
}
|
||
|
||
|
||
def start_bot_api(host: str = "0.0.0.0", port: int = 8788):
|
||
"""Start the bot orchestration API server"""
|
||
uvicorn.run(app, host=host, port=port)
|
||
|
||
|
||
def _construct_voicebot_url(host: str, port: str) -> str:
|
||
"""Construct the voicebot URL based on host and port"""
|
||
if host == "0.0.0.0":
|
||
import socket
|
||
try:
|
||
hostname = socket.gethostname()
|
||
voicebot_url = f"http://{hostname}:{port}"
|
||
logger.info(f"🏠 Using hostname-based URL: {voicebot_url}")
|
||
except Exception:
|
||
voicebot_url = f"http://localhost:{port}"
|
||
logger.info(f"🏠 Using localhost URL: {voicebot_url}")
|
||
else:
|
||
voicebot_url = f"http://{host}:{port}"
|
||
logger.info(f"🏠 Using host-based URL: {voicebot_url}")
|
||
|
||
return voicebot_url
|
||
|
||
|
||
def _perform_server_registration_sync(server_url: str, host: str, port: str, insecure: bool) -> str:
|
||
"""Synchronous wrapper for _perform_server_registration"""
|
||
return asyncio.run(_perform_server_registration(server_url, host, port, insecure))
|
||
|
||
|
||
async def _perform_server_registration(server_url: str, host: str, port: str, insecure: bool) -> str:
|
||
"""Perform server registration with common logic"""
|
||
voicebot_url = _construct_voicebot_url(host, port)
|
||
|
||
logger.info("⏱️ Waiting 2 seconds before attempting remote registration...")
|
||
await asyncio.sleep(2) # Give server time to fully start
|
||
|
||
provider_id = await register_with_server(server_url, voicebot_url, insecure)
|
||
logger.info(f"🎉 Successfully registered with remote server! Provider ID: {provider_id}")
|
||
return provider_id
|
||
|
||
|
||
async def register_with_server(server_url: str, voicebot_url: str, insecure: bool = False) -> str:
|
||
"""Register this voicebot instance as a bot provider with the main server"""
|
||
logger.info(f"🔗 Attempting to register with remote server at {server_url}")
|
||
logger.info(f"📍 Registration details - Voicebot URL: {voicebot_url}, Insecure: {insecure}")
|
||
|
||
try:
|
||
# Import httpx locally to avoid dependency issues
|
||
import httpx
|
||
|
||
# Get provider key from environment variable
|
||
provider_key = os.getenv('VOICEBOT_PROVIDER_KEY', 'default-voicebot')
|
||
|
||
payload = {
|
||
"base_url": voicebot_url.rstrip('/'),
|
||
"name": "voicebot-provider",
|
||
"description": "AI voicebot provider with speech recognition and synthetic media capabilities",
|
||
"provider_key": provider_key
|
||
}
|
||
|
||
logger.info(f"📤 Sending registration payload: {payload}")
|
||
|
||
# Prepare SSL context if needed
|
||
verify = not insecure
|
||
|
||
async with httpx.AsyncClient(verify=verify) as client:
|
||
response = await client.post(
|
||
f"{server_url}/api/bots/providers/register",
|
||
json=payload,
|
||
timeout=10.0
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
provider_id = result.get("provider_id")
|
||
logger.info(f"✅ Successfully registered with server as provider: {provider_id}")
|
||
logger.info(f"🎯 Remote server can now discover bots at: {voicebot_url}")
|
||
return provider_id
|
||
else:
|
||
logger.error(f"❌ Failed to register with server: HTTP {response.status_code}: {response.text}")
|
||
raise RuntimeError(f"Registration failed: {response.status_code}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"💥 Error registering with server: {e}")
|
||
raise
|
||
|
||
|
||
def start_bot_provider(
|
||
host: str = "0.0.0.0",
|
||
port: int = 8788,
|
||
server_url: str | None = None,
|
||
insecure: bool = False,
|
||
reload: bool = False
|
||
):
|
||
"""Start the bot provider API server and optionally register with main server"""
|
||
import time
|
||
|
||
# Set up global variables for reconnection logic
|
||
global _server_url, _voicebot_url, _insecure, _provider_id
|
||
_server_url = server_url
|
||
_insecure = insecure
|
||
_voicebot_url = _construct_voicebot_url(host, str(port))
|
||
|
||
# Start the FastAPI server in a background thread
|
||
# Add reload functionality for development
|
||
if reload:
|
||
server_thread = threading.Thread(
|
||
target=lambda: uvicorn.run(
|
||
app,
|
||
host=host,
|
||
port=port,
|
||
log_level="info",
|
||
reload=True,
|
||
reload_dirs=["/voicebot", "/shared"]
|
||
),
|
||
daemon=True
|
||
)
|
||
else:
|
||
server_thread = threading.Thread(
|
||
target=lambda: uvicorn.run(app, host=host, port=port, log_level="info"),
|
||
daemon=True
|
||
)
|
||
logger.info(f"Starting bot provider API server on {host}:{port}...")
|
||
server_thread.start()
|
||
|
||
# If server_url is provided, attempt initial registration
|
||
if server_url:
|
||
logger.info(f"🔄 Server URL provided - will attempt registration with: {server_url}")
|
||
# Give the server a moment to start
|
||
logger.info("⏱️ Waiting 2 seconds for server to fully start...")
|
||
time.sleep(2)
|
||
|
||
try:
|
||
_provider_id = _perform_server_registration_sync(server_url, host, str(port), insecure)
|
||
logger.info(f"🎉 Registration completed successfully! Provider ID: {_provider_id}")
|
||
except Exception as e:
|
||
logger.error(f"❌ Failed initial registration with server: {e}")
|
||
logger.warning("⚠️ Bot orchestrator will continue running and attempt reconnection")
|
||
|
||
# Start a background thread for reconnection monitoring
|
||
def run_reconnection_monitor():
|
||
"""Run reconnection monitor in a separate thread with its own event loop."""
|
||
try:
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
loop.run_until_complete(reconnection_monitor())
|
||
except Exception as e:
|
||
logger.error(f"Reconnection monitor thread failed: {e}")
|
||
|
||
reconnect_thread = threading.Thread(target=run_reconnection_monitor, daemon=True)
|
||
reconnect_thread.start()
|
||
logger.info("🔄 Started reconnection monitor in background thread")
|
||
else:
|
||
logger.info("ℹ️ No remote server URL provided - running in local-only mode")
|
||
|
||
# Keep the main thread alive
|
||
try:
|
||
while True:
|
||
time.sleep(1)
|
||
except KeyboardInterrupt:
|
||
logger.info("Shutting down bot provider...")
|
||
_shutdown_event.set()
|