678 lines
27 KiB
Python
678 lines
27 KiB
Python
"""
|
||
Bot orchestrator FastAPI service.
|
||
|
||
This module provides the FastAPI service for bot discovery and orchestration.
|
||
"""
|
||
|
||
import asyncio
|
||
import threading
|
||
import uuid
|
||
import importlib
|
||
import inspect
|
||
import pkgutil
|
||
import sys
|
||
import os
|
||
import time
|
||
from contextlib import asynccontextmanager
|
||
from typing import Dict, Any, List, Optional
|
||
|
||
# Add the parent directory to sys.path to allow absolute imports
|
||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
|
||
import uvicorn
|
||
from fastapi import FastAPI, HTTPException
|
||
|
||
from shared.logger import logger
|
||
from voicebot.models import JoinRequest
|
||
from voicebot.webrtc_signaling import WebRTCSignalingClient
|
||
|
||
# Add shared models import for chat types
|
||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
from shared.models import ChatMessageModel, BotInfoModel, BotProviderBotsResponse
|
||
|
||
|
||
# Global variables for reconnection logic
|
||
_server_url: Optional[str] = None
|
||
_voicebot_url: Optional[str] = None
|
||
_insecure: bool = False
|
||
_provider_id: Optional[str] = None
|
||
_reconnect_task: Optional[asyncio.Task[None]] = None
|
||
_shutdown_event = asyncio.Event()
|
||
_provider_registration_status: bool = False
|
||
|
||
|
||
def get_provider_registration_status() -> dict[str, bool | str | float | None]:
|
||
"""Get the current provider registration status for use by bot clients."""
|
||
return {
|
||
"is_registered": _provider_registration_status,
|
||
"provider_id": _provider_id,
|
||
"server_url": _server_url,
|
||
"last_check": time.time()
|
||
}
|
||
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
global _reconnect_task, _shutdown_event
|
||
|
||
# Startup
|
||
logger.info(f"🚀 Voicebot bot orchestrator started successfully at {time.strftime('%Y-%m-%d %H:%M:%S')}")
|
||
# Log the discovered bots
|
||
bots = discover_bots()
|
||
if bots:
|
||
bot_names = [bot.name for bot in bots]
|
||
logger.info(f"📋 Discovered {len(bots)} bots: {bot_names}")
|
||
else:
|
||
logger.info("⚠️ No bots discovered")
|
||
|
||
# Check for remote server registration
|
||
remote_server_url = os.getenv('VOICEBOT_SERVER_URL')
|
||
if remote_server_url:
|
||
# Set up global variables for reconnection logic
|
||
global _server_url, _voicebot_url, _insecure, _provider_id
|
||
_server_url = remote_server_url
|
||
_insecure = os.getenv('VOICEBOT_SERVER_INSECURE', 'false').lower() == 'true'
|
||
|
||
host = os.getenv('HOST', '0.0.0.0')
|
||
port = os.getenv('PORT', '8788')
|
||
_voicebot_url = _construct_voicebot_url(host, port)
|
||
|
||
# Attempt initial registration
|
||
try:
|
||
_provider_id = await _perform_server_registration(remote_server_url, host, port, _insecure)
|
||
_provider_registration_status = True
|
||
logger.info(f"🎉 Successfully registered with remote server! Provider ID: {_provider_id}")
|
||
except Exception as e:
|
||
_provider_registration_status = False
|
||
logger.error(f"❌ Failed initial registration with remote server: {e}")
|
||
logger.warning("⚠️ Will attempt reconnection in background")
|
||
|
||
# Start the reconnection monitoring task
|
||
_reconnect_task = asyncio.create_task(reconnection_monitor())
|
||
else:
|
||
logger.info("ℹ️ No VOICEBOT_SERVER_URL provided - running in local-only mode")
|
||
|
||
yield
|
||
|
||
# Shutdown
|
||
logger.info("🛑 Voicebot bot orchestrator shutting down")
|
||
_shutdown_event.set()
|
||
if _reconnect_task:
|
||
_reconnect_task.cancel()
|
||
try:
|
||
await _reconnect_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
|
||
async def reconnection_monitor():
|
||
"""Background task that monitors server connectivity and re-registers if needed."""
|
||
reconnect_interval = 15 # Check every 15 seconds (faster for testing)
|
||
retry_interval = 5 # Retry failed connections every 5 seconds
|
||
|
||
global _provider_registration_status, _provider_id
|
||
|
||
logger.info(f"🔄 Starting provider reconnection monitor (check every {reconnect_interval}s)")
|
||
|
||
while not _shutdown_event.is_set():
|
||
try:
|
||
if _server_url and _voicebot_url and _provider_id:
|
||
# First check if server is healthy
|
||
is_server_healthy = await check_server_health(_server_url, _insecure)
|
||
|
||
if not is_server_healthy:
|
||
logger.warning("⚠️ Server appears to be down or unreachable")
|
||
_provider_registration_status = False
|
||
# Try to re-register
|
||
try:
|
||
_provider_id = await register_with_server(_server_url, _voicebot_url, _insecure)
|
||
_provider_registration_status = True
|
||
logger.info(f"🔄 Successfully re-registered with server! Provider ID: {_provider_id}")
|
||
# Use longer interval after successful reconnection
|
||
await asyncio.sleep(reconnect_interval)
|
||
except Exception as e:
|
||
logger.error(f"❌ Re-registration failed: {e}")
|
||
# Use shorter interval for retry
|
||
await asyncio.sleep(retry_interval)
|
||
else:
|
||
# Server is healthy, now check if we're still registered
|
||
is_registered = await check_provider_registration(_server_url, _provider_id, _insecure)
|
||
_provider_registration_status = is_registered
|
||
|
||
if not is_registered:
|
||
logger.warning("⚠️ Provider registration lost, attempting to re-register")
|
||
try:
|
||
_provider_id = await register_with_server(_server_url, _voicebot_url, _insecure)
|
||
_provider_registration_status = True
|
||
logger.info(f"🔄 Successfully re-registered with server! Provider ID: {_provider_id}")
|
||
except Exception as e:
|
||
logger.error(f"❌ Re-registration failed: {e}")
|
||
await asyncio.sleep(retry_interval)
|
||
continue
|
||
|
||
# All good, check again after normal interval
|
||
await asyncio.sleep(reconnect_interval)
|
||
else:
|
||
# Missing configuration, wait longer
|
||
_provider_registration_status = False
|
||
await asyncio.sleep(reconnect_interval * 2)
|
||
|
||
except asyncio.CancelledError:
|
||
logger.info("🛑 Provider reconnection monitor cancelled")
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"💥 Unexpected error in provider reconnection monitor: {e}")
|
||
await asyncio.sleep(retry_interval)
|
||
|
||
|
||
async def check_server_health(server_url: str, insecure: bool = False) -> bool:
|
||
"""Check if the server is reachable and healthy."""
|
||
try:
|
||
import httpx
|
||
|
||
verify = not insecure
|
||
async with httpx.AsyncClient(verify=verify) as client:
|
||
# Try to hit the health endpoint
|
||
response = await client.get(f"{server_url}/api/health", timeout=5.0)
|
||
return response.status_code == 200
|
||
except Exception as e:
|
||
logger.debug(f"Health check failed: {e}")
|
||
return False
|
||
|
||
|
||
async def check_provider_registration(server_url: str, provider_id: str, insecure: bool = False) -> bool:
|
||
"""Check if the bot provider is still registered with the server."""
|
||
try:
|
||
import httpx
|
||
|
||
verify = not insecure
|
||
async with httpx.AsyncClient(verify=verify) as client:
|
||
# Check if our provider is still in the provider list
|
||
response = await client.get(f"{server_url}/api/bots/providers", timeout=5.0)
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
providers = data.get("providers", [])
|
||
# providers is a list of BotProviderModel objects, check if our provider_id is in the list
|
||
is_registered = any(provider.get("provider_id") == provider_id for provider in providers)
|
||
logger.debug(f"Registration check: provider_id={provider_id}, found_providers={len(providers)}, is_registered={is_registered}")
|
||
return is_registered
|
||
else:
|
||
logger.warning(f"Registration check failed: HTTP {response.status_code}")
|
||
return False
|
||
except Exception as e:
|
||
logger.debug(f"Provider registration check failed: {e}")
|
||
return False
|
||
|
||
app = FastAPI(title="voicebot-bot-orchestrator", lifespan=lifespan)
|
||
|
||
# Lightweight in-memory registry of running bot clients
|
||
registry: Dict[str, WebRTCSignalingClient] = {}
|
||
|
||
# Log module import for debugging reloads
|
||
logger.info("📦 Bot orchestrator module imported/reloaded")
|
||
|
||
|
||
# Global bot registry for internal use
|
||
_bot_registry: Dict[str, Dict[str, Any]] = {}
|
||
|
||
def discover_bots() -> "List[BotInfoModel]":
|
||
"""Discover bot modules under the voicebot.bots package that expose bot_info.
|
||
|
||
This intentionally imports modules under `voicebot.bots` so heavy bot
|
||
implementations can remain in that package and be imported lazily.
|
||
"""
|
||
global _bot_registry
|
||
from shared.models import BotInfoModel
|
||
|
||
bots: List[BotInfoModel] = []
|
||
_bot_registry.clear() # Clear previous discoveries
|
||
|
||
try:
|
||
package = importlib.import_module("voicebot.bots")
|
||
package_path = package.__path__
|
||
except Exception:
|
||
logger.exception("Failed to import voicebot.bots package")
|
||
return bots
|
||
|
||
for _finder, name, _ispkg in pkgutil.iter_modules(package_path):
|
||
try:
|
||
mod = importlib.import_module(f"voicebot.bots.{name}")
|
||
except Exception:
|
||
logger.exception("Failed to import voicebot.bots.%s", name)
|
||
continue
|
||
|
||
if hasattr(mod, "agent_info") and callable(getattr(mod, "agent_info")):
|
||
try:
|
||
info = mod.agent_info()
|
||
# Convert string has_media and configurable to boolean for compatibility
|
||
processed_info = dict(info)
|
||
has_media_value = processed_info.get("has_media", True)
|
||
if isinstance(has_media_value, str):
|
||
processed_info["has_media"] = has_media_value.lower() in ("true", "1", "yes")
|
||
|
||
configurable_value = processed_info.get("configurable", False)
|
||
if isinstance(configurable_value, str):
|
||
processed_info["configurable"] = configurable_value.lower() in ("true", "1", "yes")
|
||
|
||
# Create BotInfoModel using model_validate
|
||
bot_info = BotInfoModel.model_validate(processed_info)
|
||
bots.append(bot_info)
|
||
|
||
# Store additional metadata in registry
|
||
create_tracks = None
|
||
if hasattr(mod, "create_agent_tracks") and callable(getattr(mod, "create_agent_tracks")):
|
||
create_tracks = getattr(mod, "create_agent_tracks")
|
||
|
||
chat_handler = None
|
||
if hasattr(mod, "handle_chat_message") and callable(getattr(mod, "handle_chat_message")):
|
||
chat_handler = getattr(mod, "handle_chat_message")
|
||
|
||
track_handler = None
|
||
if hasattr(mod, "get_track_handler") and callable(getattr(mod, "get_track_handler")):
|
||
track_handler = getattr(mod, "get_track_handler")()
|
||
elif hasattr(mod, "on_track_received") and callable(getattr(mod, "on_track_received")):
|
||
track_handler = getattr(mod, "on_track_received")
|
||
|
||
bind_send_chat_function = None
|
||
if hasattr(mod, "bind_send_chat_function") and callable(getattr(mod, "bind_send_chat_function")):
|
||
bind_send_chat_function = getattr(mod, "bind_send_chat_function")
|
||
|
||
# Check for configuration schema support
|
||
config_schema = None
|
||
config_handler = None
|
||
if hasattr(mod, "get_config_schema") and callable(getattr(mod, "get_config_schema")):
|
||
try:
|
||
config_schema = mod.get_config_schema()
|
||
logger.info(f"Bot {bot_info.name} supports configuration")
|
||
# Set the config_schema on the BotInfoModel
|
||
bot_info.config_schema = config_schema
|
||
except Exception as e:
|
||
logger.warning(f"Failed to get config schema for {bot_info.name}: {e}")
|
||
|
||
if hasattr(mod, "handle_config_update") and callable(getattr(mod, "handle_config_update")):
|
||
config_handler = getattr(mod, "handle_config_update")
|
||
|
||
_bot_registry[bot_info.name] = {
|
||
"module": name,
|
||
"info": bot_info,
|
||
"create_tracks": create_tracks,
|
||
"chat_handler": chat_handler,
|
||
"track_handler": track_handler,
|
||
"chat_bind": bind_send_chat_function,
|
||
"config_schema": config_schema,
|
||
"config_handler": config_handler,
|
||
}
|
||
|
||
except Exception:
|
||
logger.exception("agent_info() failed for %s", name)
|
||
|
||
return bots
|
||
|
||
|
||
@app.get("/bots")
|
||
def list_bots() -> "BotProviderBotsResponse":
|
||
"""List available bots."""
|
||
from shared.models import BotProviderBotsResponse
|
||
bots = discover_bots()
|
||
return BotProviderBotsResponse(bots=bots)
|
||
|
||
|
||
@app.post("/bots/{bot_name}/join")
|
||
async def bot_join(bot_name: str, req: JoinRequest):
|
||
"""Make a bot join a lobby."""
|
||
# Ensure bots are discovered and registry is populated
|
||
discover_bots()
|
||
|
||
if bot_name not in _bot_registry:
|
||
raise HTTPException(status_code=404, detail="Bot not found")
|
||
|
||
bot_data = _bot_registry[bot_name]
|
||
create_tracks = bot_data.get("create_tracks")
|
||
chat_handler = bot_data.get("chat_handler")
|
||
track_handler = bot_data.get("track_handler")
|
||
bind_send_chat_function = bot_data.get("chat_bind")
|
||
config_handler = bot_data.get("config_handler")
|
||
|
||
logger.info(f"🤖 Bot {bot_name} joining lobby {req.lobby_id} with nick: '{req.nick}'")
|
||
if req.config_values:
|
||
logger.info(f"🤖 Bot {bot_name} has existing configuration: {req.config_values}")
|
||
if track_handler:
|
||
logger.info(f"🤖 Bot {bot_name} has track handling capabilities")
|
||
if chat_handler:
|
||
logger.info(f"🤖 Bot {bot_name} has chat receiving capabilities")
|
||
if bind_send_chat_function:
|
||
logger.info(f"🤖 Bot {bot_name} has chat sending capabilities")
|
||
|
||
# Apply configuration if provided
|
||
if req.config_values and config_handler:
|
||
try:
|
||
logger.info(f"Applying existing configuration to bot {bot_name}")
|
||
if inspect.iscoroutinefunction(config_handler):
|
||
success = await config_handler(req.lobby_id, req.config_values)
|
||
else:
|
||
success = config_handler(req.lobby_id, req.config_values)
|
||
if success:
|
||
logger.info(f"Successfully applied existing configuration to bot {bot_name}")
|
||
else:
|
||
logger.warning(f"Failed to apply existing configuration to bot {bot_name}")
|
||
except Exception as e:
|
||
logger.error(f"Error applying existing configuration to bot {bot_name}: {e}")
|
||
|
||
# Start the WebRTCSignalingClient in a background asyncio task and register it
|
||
client = WebRTCSignalingClient(
|
||
server_url=req.server_url,
|
||
lobby_id=req.lobby_id,
|
||
session_id=req.session_id,
|
||
session_name=req.nick,
|
||
insecure=req.insecure,
|
||
create_tracks=create_tracks,
|
||
bind_send_chat_function=bind_send_chat_function,
|
||
)
|
||
|
||
# Set up chat message handler if the bot provides one
|
||
if chat_handler:
|
||
async def bot_chat_handler(chat_message: ChatMessageModel):
|
||
"""Wrapper to call the bot's chat handler and optionally send responses"""
|
||
try:
|
||
# Call the bot's chat handler - it may return a response message
|
||
await chat_handler(chat_message, client.send_chat_message)
|
||
except Exception as e:
|
||
logger.error(f"Error in bot chat handler for {bot_name}: {e}", exc_info=True)
|
||
|
||
client.on_chat_message_received = bot_chat_handler
|
||
|
||
# Set up track handler if the bot provides one
|
||
if track_handler:
|
||
client.on_track_received = track_handler
|
||
|
||
run_id = str(uuid.uuid4())
|
||
|
||
async def run_client():
|
||
try:
|
||
registry[run_id] = client
|
||
await client.connect()
|
||
except Exception:
|
||
logger.exception("Bot client failed for run %s", run_id)
|
||
finally:
|
||
registry.pop(run_id, None)
|
||
|
||
def run_client_in_thread():
|
||
"""Run the client in a new event loop in a separate thread."""
|
||
try:
|
||
asyncio.run(run_client())
|
||
except Exception:
|
||
logger.exception("Bot client thread failed for run %s", run_id)
|
||
|
||
threading.Thread(target=run_client_in_thread, daemon=True).start()
|
||
|
||
return {"status": "started", "bot": bot_name, "run_id": run_id}
|
||
|
||
|
||
@app.get("/provider/status")
|
||
def get_provider_status():
|
||
"""Get the current provider registration status."""
|
||
return get_provider_registration_status()
|
||
|
||
|
||
@app.get("/bots/{bot_name}/config-schema")
|
||
def get_bot_config_schema(bot_name: str):
|
||
"""Get configuration schema for a specific bot."""
|
||
if bot_name not in _bot_registry:
|
||
raise HTTPException(status_code=404, detail=f"Bot '{bot_name}' not found")
|
||
|
||
bot_entry = _bot_registry[bot_name]
|
||
config_schema = bot_entry.get("config_schema")
|
||
|
||
if not config_schema:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail=f"Bot '{bot_name}' does not support configuration"
|
||
)
|
||
|
||
return config_schema
|
||
|
||
|
||
@app.post("/bots/{bot_name}/config")
|
||
async def update_bot_config(bot_name: str, config_data: dict[str, Any]) -> dict[str, str | bool]:
|
||
"""Update bot configuration for a specific lobby."""
|
||
if bot_name not in _bot_registry:
|
||
raise HTTPException(status_code=404, detail=f"Bot '{bot_name}' not found")
|
||
|
||
bot_entry = _bot_registry[bot_name]
|
||
config_handler = bot_entry.get("config_handler")
|
||
|
||
if not config_handler:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail=f"Bot '{bot_name}' does not support configuration updates"
|
||
)
|
||
|
||
try:
|
||
lobby_id = config_data.get("lobby_id")
|
||
config_values = config_data.get("config_values", {})
|
||
|
||
if not lobby_id:
|
||
raise HTTPException(status_code=400, detail="lobby_id is required")
|
||
|
||
# Call the bot's configuration handler
|
||
if inspect.iscoroutinefunction(config_handler):
|
||
success = await config_handler(lobby_id, config_values)
|
||
else:
|
||
success = config_handler(lobby_id, config_values)
|
||
|
||
if success:
|
||
return {"success": True, "message": "Configuration updated successfully"}
|
||
else:
|
||
return {"success": False, "message": "Configuration update failed"}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to update config for bot {bot_name}: {e}")
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
|
||
@app.post("/bots/runs/{run_id}/stop")
|
||
async def stop_run(run_id: str):
|
||
"""Stop a running bot."""
|
||
client = registry.get(run_id)
|
||
if not client:
|
||
raise HTTPException(status_code=404, detail="Run not found")
|
||
|
||
try:
|
||
# Request graceful shutdown instead of awaiting disconnect from different loop
|
||
client.request_shutdown()
|
||
|
||
# Give the client a moment to shutdown gracefully
|
||
await asyncio.sleep(0.5)
|
||
|
||
# Remove from registry
|
||
registry.pop(run_id, None)
|
||
|
||
return {"status": "stopped", "run_id": run_id}
|
||
except Exception:
|
||
logger.exception("Failed to stop run %s", run_id)
|
||
# Still remove from registry even if shutdown failed
|
||
registry.pop(run_id, None)
|
||
raise HTTPException(status_code=500, detail="Failed to stop run")
|
||
|
||
|
||
@app.get("/bots/runs")
|
||
def list_runs() -> Dict[str, Any]:
|
||
"""List running bot instances."""
|
||
return {
|
||
"runs": [
|
||
{"run_id": run_id, "session_id": client.session_id, "session_name": client.session_name}
|
||
for run_id, client in registry.items()
|
||
]
|
||
}
|
||
|
||
|
||
def start_bot_api(host: str = "0.0.0.0", port: int = 8788):
|
||
"""Start the bot orchestration API server"""
|
||
uvicorn.run(app, host=host, port=port)
|
||
|
||
|
||
def _construct_voicebot_url(host: str, port: str) -> str:
|
||
"""Construct the voicebot URL based on host and port"""
|
||
if host == "0.0.0.0":
|
||
import socket
|
||
try:
|
||
hostname = socket.gethostname()
|
||
voicebot_url = f"http://{hostname}:{port}"
|
||
logger.info(f"🏠 Using hostname-based URL: {voicebot_url}")
|
||
except Exception:
|
||
voicebot_url = f"http://localhost:{port}"
|
||
logger.info(f"🏠 Using localhost URL: {voicebot_url}")
|
||
else:
|
||
voicebot_url = f"http://{host}:{port}"
|
||
logger.info(f"🏠 Using host-based URL: {voicebot_url}")
|
||
|
||
return voicebot_url
|
||
|
||
|
||
def _perform_server_registration_sync(server_url: str, host: str, port: str, insecure: bool) -> str:
|
||
"""Synchronous wrapper for _perform_server_registration"""
|
||
return asyncio.run(_perform_server_registration(server_url, host, port, insecure))
|
||
|
||
|
||
async def _perform_server_registration(server_url: str, host: str, port: str, insecure: bool) -> str:
|
||
"""Perform server registration with common logic"""
|
||
voicebot_url = _construct_voicebot_url(host, port)
|
||
|
||
logger.info("⏱️ Waiting 2 seconds before attempting remote registration...")
|
||
await asyncio.sleep(2) # Give server time to fully start
|
||
|
||
provider_id = await register_with_server(server_url, voicebot_url, insecure)
|
||
logger.info(f"🎉 Successfully registered with remote server! Provider ID: {provider_id}")
|
||
return provider_id
|
||
|
||
|
||
async def register_with_server(server_url: str, voicebot_url: str, insecure: bool = False) -> str:
|
||
"""Register this voicebot instance as a bot provider with the main server"""
|
||
logger.info(f"🔗 Attempting to register with remote server at {server_url}")
|
||
logger.info(f"📍 Registration details - Voicebot URL: {voicebot_url}, Insecure: {insecure}")
|
||
|
||
try:
|
||
# Import httpx locally to avoid dependency issues
|
||
import httpx
|
||
|
||
# Get provider key and ID from environment variables
|
||
provider_key = os.getenv('VOICEBOT_PROVIDER_KEY')
|
||
provider_id = os.getenv('VOICEBOT_PROVIDER_ID')
|
||
|
||
if not provider_key:
|
||
raise ValueError("VOICEBOT_PROVIDER_KEY environment variable is required")
|
||
if not provider_id:
|
||
raise ValueError("VOICEBOT_PROVIDER_ID environment variable is required")
|
||
|
||
payload = {
|
||
"base_url": voicebot_url.rstrip('/'),
|
||
"name": "voicebot-provider",
|
||
"description": "AI voicebot provider with speech recognition and synthetic media capabilities",
|
||
"provider_key": provider_key,
|
||
"provider_id": provider_id
|
||
}
|
||
|
||
logger.info(f"📍 Using static provider ID: {provider_id}")
|
||
logger.info(f"📤 Sending registration payload: {payload}")
|
||
|
||
# Prepare SSL context if needed
|
||
verify = not insecure
|
||
|
||
async with httpx.AsyncClient(verify=verify) as client:
|
||
response = await client.post(
|
||
f"{server_url}/api/bots/providers/register",
|
||
json=payload,
|
||
timeout=10.0
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
provider_id = result.get("provider_id")
|
||
logger.info(f"✅ Successfully registered with server as provider: {provider_id}")
|
||
logger.info(f"🎯 Remote server can now discover bots at: {voicebot_url}")
|
||
return provider_id
|
||
else:
|
||
logger.error(f"❌ Failed to register with server: HTTP {response.status_code}: {response.text}")
|
||
raise RuntimeError(f"Registration failed: {response.status_code}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"💥 Error registering with server: {e}")
|
||
raise
|
||
|
||
|
||
def start_bot_provider(
|
||
host: str = "0.0.0.0",
|
||
port: int = 8788,
|
||
server_url: str | None = None,
|
||
insecure: bool = False,
|
||
reload: bool = False
|
||
):
|
||
"""Start the bot provider API server and optionally register with main server"""
|
||
import time
|
||
|
||
# Set up global variables for reconnection logic
|
||
global _server_url, _voicebot_url, _insecure, _provider_id
|
||
_server_url = server_url
|
||
_insecure = insecure
|
||
_voicebot_url = _construct_voicebot_url(host, str(port))
|
||
|
||
# Start the FastAPI server in a background thread
|
||
# Add reload functionality for development
|
||
if reload:
|
||
server_thread = threading.Thread(
|
||
target=lambda: uvicorn.run(
|
||
app,
|
||
host=host,
|
||
port=port,
|
||
log_level="info",
|
||
reload=True,
|
||
reload_dirs=["/voicebot", "/shared"]
|
||
),
|
||
daemon=True
|
||
)
|
||
else:
|
||
server_thread = threading.Thread(
|
||
target=lambda: uvicorn.run(app, host=host, port=port, log_level="info"),
|
||
daemon=True
|
||
)
|
||
logger.info(f"Starting bot provider API server on {host}:{port}...")
|
||
server_thread.start()
|
||
|
||
# If server_url is provided, attempt initial registration
|
||
if server_url:
|
||
logger.info(f"🔄 Server URL provided - will attempt registration with: {server_url}")
|
||
# Give the server a moment to start
|
||
logger.info("⏱️ Waiting 2 seconds for server to fully start...")
|
||
time.sleep(2)
|
||
|
||
try:
|
||
_provider_id = _perform_server_registration_sync(server_url, host, str(port), insecure)
|
||
logger.info(f"🎉 Registration completed successfully! Provider ID: {_provider_id}")
|
||
except Exception as e:
|
||
logger.error(f"❌ Failed initial registration with server: {e}")
|
||
logger.warning("⚠️ Bot orchestrator will continue running and attempt reconnection")
|
||
|
||
# Start a background thread for reconnection monitoring
|
||
def run_reconnection_monitor():
|
||
"""Run reconnection monitor in a separate thread with its own event loop."""
|
||
try:
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
loop.run_until_complete(reconnection_monitor())
|
||
except Exception as e:
|
||
logger.error(f"Reconnection monitor thread failed: {e}")
|
||
|
||
reconnect_thread = threading.Thread(target=run_reconnection_monitor, daemon=True)
|
||
reconnect_thread.start()
|
||
logger.info("🔄 Started reconnection monitor in background thread")
|
||
else:
|
||
logger.info("ℹ️ No remote server URL provided - running in local-only mode")
|
||
|
||
# Keep the main thread alive
|
||
try:
|
||
while True:
|
||
time.sleep(1)
|
||
except KeyboardInterrupt:
|
||
logger.info("Shutting down bot provider...")
|
||
_shutdown_event.set()
|