ai-voicebot/voicebot/bot_orchestrator.py
2025-09-15 14:30:16 -07:00

678 lines
27 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Bot orchestrator FastAPI service.
This module provides the FastAPI service for bot discovery and orchestration.
"""
import asyncio
import threading
import uuid
import importlib
import inspect
import pkgutil
import sys
import os
import time
from contextlib import asynccontextmanager
from typing import Dict, Any, List, Optional
# Add the parent directory to sys.path to allow absolute imports
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import uvicorn
from fastapi import FastAPI, HTTPException
from shared.logger import logger
from voicebot.models import JoinRequest
from voicebot.webrtc_signaling import WebRTCSignalingClient
# Add shared models import for chat types
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from shared.models import ChatMessageModel, BotInfoModel, BotProviderBotsResponse
# Global variables for reconnection logic
_server_url: Optional[str] = None
_voicebot_url: Optional[str] = None
_insecure: bool = False
_provider_id: Optional[str] = None
_reconnect_task: Optional[asyncio.Task[None]] = None
_shutdown_event = asyncio.Event()
_provider_registration_status: bool = False
def get_provider_registration_status() -> dict[str, bool | str | float | None]:
"""Get the current provider registration status for use by bot clients."""
return {
"is_registered": _provider_registration_status,
"provider_id": _provider_id,
"server_url": _server_url,
"last_check": time.time()
}
@asynccontextmanager
async def lifespan(app: FastAPI):
global _reconnect_task, _shutdown_event
# Startup
logger.info(f"🚀 Voicebot bot orchestrator started successfully at {time.strftime('%Y-%m-%d %H:%M:%S')}")
# Log the discovered bots
bots = discover_bots()
if bots:
bot_names = [bot.name for bot in bots]
logger.info(f"📋 Discovered {len(bots)} bots: {bot_names}")
else:
logger.info("⚠️ No bots discovered")
# Check for remote server registration
remote_server_url = os.getenv('VOICEBOT_SERVER_URL')
if remote_server_url:
# Set up global variables for reconnection logic
global _server_url, _voicebot_url, _insecure, _provider_id
_server_url = remote_server_url
_insecure = os.getenv('VOICEBOT_SERVER_INSECURE', 'false').lower() == 'true'
host = os.getenv('HOST', '0.0.0.0')
port = os.getenv('PORT', '8788')
_voicebot_url = _construct_voicebot_url(host, port)
# Attempt initial registration
try:
_provider_id = await _perform_server_registration(remote_server_url, host, port, _insecure)
_provider_registration_status = True
logger.info(f"🎉 Successfully registered with remote server! Provider ID: {_provider_id}")
except Exception as e:
_provider_registration_status = False
logger.error(f"❌ Failed initial registration with remote server: {e}")
logger.warning("⚠️ Will attempt reconnection in background")
# Start the reconnection monitoring task
_reconnect_task = asyncio.create_task(reconnection_monitor())
else:
logger.info(" No VOICEBOT_SERVER_URL provided - running in local-only mode")
yield
# Shutdown
logger.info("🛑 Voicebot bot orchestrator shutting down")
_shutdown_event.set()
if _reconnect_task:
_reconnect_task.cancel()
try:
await _reconnect_task
except asyncio.CancelledError:
pass
async def reconnection_monitor():
"""Background task that monitors server connectivity and re-registers if needed."""
reconnect_interval = 15 # Check every 15 seconds (faster for testing)
retry_interval = 5 # Retry failed connections every 5 seconds
global _provider_registration_status, _provider_id
logger.info(f"🔄 Starting provider reconnection monitor (check every {reconnect_interval}s)")
while not _shutdown_event.is_set():
try:
if _server_url and _voicebot_url and _provider_id:
# First check if server is healthy
is_server_healthy = await check_server_health(_server_url, _insecure)
if not is_server_healthy:
logger.warning("⚠️ Server appears to be down or unreachable")
_provider_registration_status = False
# Try to re-register
try:
_provider_id = await register_with_server(_server_url, _voicebot_url, _insecure)
_provider_registration_status = True
logger.info(f"🔄 Successfully re-registered with server! Provider ID: {_provider_id}")
# Use longer interval after successful reconnection
await asyncio.sleep(reconnect_interval)
except Exception as e:
logger.error(f"❌ Re-registration failed: {e}")
# Use shorter interval for retry
await asyncio.sleep(retry_interval)
else:
# Server is healthy, now check if we're still registered
is_registered = await check_provider_registration(_server_url, _provider_id, _insecure)
_provider_registration_status = is_registered
if not is_registered:
logger.warning("⚠️ Provider registration lost, attempting to re-register")
try:
_provider_id = await register_with_server(_server_url, _voicebot_url, _insecure)
_provider_registration_status = True
logger.info(f"🔄 Successfully re-registered with server! Provider ID: {_provider_id}")
except Exception as e:
logger.error(f"❌ Re-registration failed: {e}")
await asyncio.sleep(retry_interval)
continue
# All good, check again after normal interval
await asyncio.sleep(reconnect_interval)
else:
# Missing configuration, wait longer
_provider_registration_status = False
await asyncio.sleep(reconnect_interval * 2)
except asyncio.CancelledError:
logger.info("🛑 Provider reconnection monitor cancelled")
break
except Exception as e:
logger.error(f"💥 Unexpected error in provider reconnection monitor: {e}")
await asyncio.sleep(retry_interval)
async def check_server_health(server_url: str, insecure: bool = False) -> bool:
"""Check if the server is reachable and healthy."""
try:
import httpx
verify = not insecure
async with httpx.AsyncClient(verify=verify) as client:
# Try to hit the health endpoint
response = await client.get(f"{server_url}/api/health", timeout=5.0)
return response.status_code == 200
except Exception as e:
logger.debug(f"Health check failed: {e}")
return False
async def check_provider_registration(server_url: str, provider_id: str, insecure: bool = False) -> bool:
"""Check if the bot provider is still registered with the server."""
try:
import httpx
verify = not insecure
async with httpx.AsyncClient(verify=verify) as client:
# Check if our provider is still in the provider list
response = await client.get(f"{server_url}/api/bots/providers", timeout=5.0)
if response.status_code == 200:
data = response.json()
providers = data.get("providers", [])
# providers is a list of BotProviderModel objects, check if our provider_id is in the list
is_registered = any(provider.get("provider_id") == provider_id for provider in providers)
logger.debug(f"Registration check: provider_id={provider_id}, found_providers={len(providers)}, is_registered={is_registered}")
return is_registered
else:
logger.warning(f"Registration check failed: HTTP {response.status_code}")
return False
except Exception as e:
logger.debug(f"Provider registration check failed: {e}")
return False
app = FastAPI(title="voicebot-bot-orchestrator", lifespan=lifespan)
# Lightweight in-memory registry of running bot clients
registry: Dict[str, WebRTCSignalingClient] = {}
# Log module import for debugging reloads
logger.info("📦 Bot orchestrator module imported/reloaded")
# Global bot registry for internal use
_bot_registry: Dict[str, Dict[str, Any]] = {}
def discover_bots() -> "List[BotInfoModel]":
"""Discover bot modules under the voicebot.bots package that expose bot_info.
This intentionally imports modules under `voicebot.bots` so heavy bot
implementations can remain in that package and be imported lazily.
"""
global _bot_registry
from shared.models import BotInfoModel
bots: List[BotInfoModel] = []
_bot_registry.clear() # Clear previous discoveries
try:
package = importlib.import_module("voicebot.bots")
package_path = package.__path__
except Exception:
logger.exception("Failed to import voicebot.bots package")
return bots
for _finder, name, _ispkg in pkgutil.iter_modules(package_path):
try:
mod = importlib.import_module(f"voicebot.bots.{name}")
except Exception:
logger.exception("Failed to import voicebot.bots.%s", name)
continue
if hasattr(mod, "agent_info") and callable(getattr(mod, "agent_info")):
try:
info = mod.agent_info()
# Convert string has_media and configurable to boolean for compatibility
processed_info = dict(info)
has_media_value = processed_info.get("has_media", True)
if isinstance(has_media_value, str):
processed_info["has_media"] = has_media_value.lower() in ("true", "1", "yes")
configurable_value = processed_info.get("configurable", False)
if isinstance(configurable_value, str):
processed_info["configurable"] = configurable_value.lower() in ("true", "1", "yes")
# Create BotInfoModel using model_validate
bot_info = BotInfoModel.model_validate(processed_info)
bots.append(bot_info)
# Store additional metadata in registry
create_tracks = None
if hasattr(mod, "create_agent_tracks") and callable(getattr(mod, "create_agent_tracks")):
create_tracks = getattr(mod, "create_agent_tracks")
chat_handler = None
if hasattr(mod, "handle_chat_message") and callable(getattr(mod, "handle_chat_message")):
chat_handler = getattr(mod, "handle_chat_message")
track_handler = None
if hasattr(mod, "get_track_handler") and callable(getattr(mod, "get_track_handler")):
track_handler = getattr(mod, "get_track_handler")()
elif hasattr(mod, "on_track_received") and callable(getattr(mod, "on_track_received")):
track_handler = getattr(mod, "on_track_received")
bind_send_chat_function = None
if hasattr(mod, "bind_send_chat_function") and callable(getattr(mod, "bind_send_chat_function")):
bind_send_chat_function = getattr(mod, "bind_send_chat_function")
# Check for configuration schema support
config_schema = None
config_handler = None
if hasattr(mod, "get_config_schema") and callable(getattr(mod, "get_config_schema")):
try:
config_schema = mod.get_config_schema()
logger.info(f"Bot {bot_info.name} supports configuration")
# Set the config_schema on the BotInfoModel
bot_info.config_schema = config_schema
except Exception as e:
logger.warning(f"Failed to get config schema for {bot_info.name}: {e}")
if hasattr(mod, "handle_config_update") and callable(getattr(mod, "handle_config_update")):
config_handler = getattr(mod, "handle_config_update")
_bot_registry[bot_info.name] = {
"module": name,
"info": bot_info,
"create_tracks": create_tracks,
"chat_handler": chat_handler,
"track_handler": track_handler,
"chat_bind": bind_send_chat_function,
"config_schema": config_schema,
"config_handler": config_handler,
}
except Exception:
logger.exception("agent_info() failed for %s", name)
return bots
@app.get("/bots")
def list_bots() -> "BotProviderBotsResponse":
"""List available bots."""
from shared.models import BotProviderBotsResponse
bots = discover_bots()
return BotProviderBotsResponse(bots=bots)
@app.post("/bots/{bot_name}/join")
async def bot_join(bot_name: str, req: JoinRequest):
"""Make a bot join a lobby."""
# Ensure bots are discovered and registry is populated
discover_bots()
if bot_name not in _bot_registry:
raise HTTPException(status_code=404, detail="Bot not found")
bot_data = _bot_registry[bot_name]
create_tracks = bot_data.get("create_tracks")
chat_handler = bot_data.get("chat_handler")
track_handler = bot_data.get("track_handler")
bind_send_chat_function = bot_data.get("chat_bind")
config_handler = bot_data.get("config_handler")
logger.info(f"🤖 Bot {bot_name} joining lobby {req.lobby_id} with nick: '{req.nick}'")
if req.config_values:
logger.info(f"🤖 Bot {bot_name} has existing configuration: {req.config_values}")
if track_handler:
logger.info(f"🤖 Bot {bot_name} has track handling capabilities")
if chat_handler:
logger.info(f"🤖 Bot {bot_name} has chat receiving capabilities")
if bind_send_chat_function:
logger.info(f"🤖 Bot {bot_name} has chat sending capabilities")
# Apply configuration if provided
if req.config_values and config_handler:
try:
logger.info(f"Applying existing configuration to bot {bot_name}")
if inspect.iscoroutinefunction(config_handler):
success = await config_handler(req.lobby_id, req.config_values)
else:
success = config_handler(req.lobby_id, req.config_values)
if success:
logger.info(f"Successfully applied existing configuration to bot {bot_name}")
else:
logger.warning(f"Failed to apply existing configuration to bot {bot_name}")
except Exception as e:
logger.error(f"Error applying existing configuration to bot {bot_name}: {e}")
# Start the WebRTCSignalingClient in a background asyncio task and register it
client = WebRTCSignalingClient(
server_url=req.server_url,
lobby_id=req.lobby_id,
session_id=req.session_id,
session_name=req.nick,
insecure=req.insecure,
create_tracks=create_tracks,
bind_send_chat_function=bind_send_chat_function,
)
# Set up chat message handler if the bot provides one
if chat_handler:
async def bot_chat_handler(chat_message: ChatMessageModel):
"""Wrapper to call the bot's chat handler and optionally send responses"""
try:
# Call the bot's chat handler - it may return a response message
await chat_handler(chat_message, client.send_chat_message)
except Exception as e:
logger.error(f"Error in bot chat handler for {bot_name}: {e}", exc_info=True)
client.on_chat_message_received = bot_chat_handler
# Set up track handler if the bot provides one
if track_handler:
client.on_track_received = track_handler
run_id = str(uuid.uuid4())
async def run_client():
try:
registry[run_id] = client
await client.connect()
except Exception:
logger.exception("Bot client failed for run %s", run_id)
finally:
registry.pop(run_id, None)
def run_client_in_thread():
"""Run the client in a new event loop in a separate thread."""
try:
asyncio.run(run_client())
except Exception:
logger.exception("Bot client thread failed for run %s", run_id)
threading.Thread(target=run_client_in_thread, daemon=True).start()
return {"status": "started", "bot": bot_name, "run_id": run_id}
@app.get("/provider/status")
def get_provider_status():
"""Get the current provider registration status."""
return get_provider_registration_status()
@app.get("/bots/{bot_name}/config-schema")
def get_bot_config_schema(bot_name: str):
"""Get configuration schema for a specific bot."""
if bot_name not in _bot_registry:
raise HTTPException(status_code=404, detail=f"Bot '{bot_name}' not found")
bot_entry = _bot_registry[bot_name]
config_schema = bot_entry.get("config_schema")
if not config_schema:
raise HTTPException(
status_code=404,
detail=f"Bot '{bot_name}' does not support configuration"
)
return config_schema
@app.post("/bots/{bot_name}/config")
async def update_bot_config(bot_name: str, config_data: dict[str, Any]) -> dict[str, str | bool]:
"""Update bot configuration for a specific lobby."""
if bot_name not in _bot_registry:
raise HTTPException(status_code=404, detail=f"Bot '{bot_name}' not found")
bot_entry = _bot_registry[bot_name]
config_handler = bot_entry.get("config_handler")
if not config_handler:
raise HTTPException(
status_code=404,
detail=f"Bot '{bot_name}' does not support configuration updates"
)
try:
lobby_id = config_data.get("lobby_id")
config_values = config_data.get("config_values", {})
if not lobby_id:
raise HTTPException(status_code=400, detail="lobby_id is required")
# Call the bot's configuration handler
if inspect.iscoroutinefunction(config_handler):
success = await config_handler(lobby_id, config_values)
else:
success = config_handler(lobby_id, config_values)
if success:
return {"success": True, "message": "Configuration updated successfully"}
else:
return {"success": False, "message": "Configuration update failed"}
except Exception as e:
logger.error(f"Failed to update config for bot {bot_name}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.post("/bots/runs/{run_id}/stop")
async def stop_run(run_id: str):
"""Stop a running bot."""
client = registry.get(run_id)
if not client:
raise HTTPException(status_code=404, detail="Run not found")
try:
# Request graceful shutdown instead of awaiting disconnect from different loop
client.request_shutdown()
# Give the client a moment to shutdown gracefully
await asyncio.sleep(0.5)
# Remove from registry
registry.pop(run_id, None)
return {"status": "stopped", "run_id": run_id}
except Exception:
logger.exception("Failed to stop run %s", run_id)
# Still remove from registry even if shutdown failed
registry.pop(run_id, None)
raise HTTPException(status_code=500, detail="Failed to stop run")
@app.get("/bots/runs")
def list_runs() -> Dict[str, Any]:
"""List running bot instances."""
return {
"runs": [
{"run_id": run_id, "session_id": client.session_id, "session_name": client.session_name}
for run_id, client in registry.items()
]
}
def start_bot_api(host: str = "0.0.0.0", port: int = 8788):
"""Start the bot orchestration API server"""
uvicorn.run(app, host=host, port=port)
def _construct_voicebot_url(host: str, port: str) -> str:
"""Construct the voicebot URL based on host and port"""
if host == "0.0.0.0":
import socket
try:
hostname = socket.gethostname()
voicebot_url = f"http://{hostname}:{port}"
logger.info(f"🏠 Using hostname-based URL: {voicebot_url}")
except Exception:
voicebot_url = f"http://localhost:{port}"
logger.info(f"🏠 Using localhost URL: {voicebot_url}")
else:
voicebot_url = f"http://{host}:{port}"
logger.info(f"🏠 Using host-based URL: {voicebot_url}")
return voicebot_url
def _perform_server_registration_sync(server_url: str, host: str, port: str, insecure: bool) -> str:
"""Synchronous wrapper for _perform_server_registration"""
return asyncio.run(_perform_server_registration(server_url, host, port, insecure))
async def _perform_server_registration(server_url: str, host: str, port: str, insecure: bool) -> str:
"""Perform server registration with common logic"""
voicebot_url = _construct_voicebot_url(host, port)
logger.info("⏱️ Waiting 2 seconds before attempting remote registration...")
await asyncio.sleep(2) # Give server time to fully start
provider_id = await register_with_server(server_url, voicebot_url, insecure)
logger.info(f"🎉 Successfully registered with remote server! Provider ID: {provider_id}")
return provider_id
async def register_with_server(server_url: str, voicebot_url: str, insecure: bool = False) -> str:
"""Register this voicebot instance as a bot provider with the main server"""
logger.info(f"🔗 Attempting to register with remote server at {server_url}")
logger.info(f"📍 Registration details - Voicebot URL: {voicebot_url}, Insecure: {insecure}")
try:
# Import httpx locally to avoid dependency issues
import httpx
# Get provider key and ID from environment variables
provider_key = os.getenv('VOICEBOT_PROVIDER_KEY')
provider_id = os.getenv('VOICEBOT_PROVIDER_ID')
if not provider_key:
raise ValueError("VOICEBOT_PROVIDER_KEY environment variable is required")
if not provider_id:
raise ValueError("VOICEBOT_PROVIDER_ID environment variable is required")
payload = {
"base_url": voicebot_url.rstrip('/'),
"name": "voicebot-provider",
"description": "AI voicebot provider with speech recognition and synthetic media capabilities",
"provider_key": provider_key,
"provider_id": provider_id
}
logger.info(f"📍 Using static provider ID: {provider_id}")
logger.info(f"📤 Sending registration payload: {payload}")
# Prepare SSL context if needed
verify = not insecure
async with httpx.AsyncClient(verify=verify) as client:
response = await client.post(
f"{server_url}/api/bots/providers/register",
json=payload,
timeout=10.0
)
if response.status_code == 200:
result = response.json()
provider_id = result.get("provider_id")
logger.info(f"✅ Successfully registered with server as provider: {provider_id}")
logger.info(f"🎯 Remote server can now discover bots at: {voicebot_url}")
return provider_id
else:
logger.error(f"❌ Failed to register with server: HTTP {response.status_code}: {response.text}")
raise RuntimeError(f"Registration failed: {response.status_code}")
except Exception as e:
logger.error(f"💥 Error registering with server: {e}")
raise
def start_bot_provider(
host: str = "0.0.0.0",
port: int = 8788,
server_url: str | None = None,
insecure: bool = False,
reload: bool = False
):
"""Start the bot provider API server and optionally register with main server"""
import time
# Set up global variables for reconnection logic
global _server_url, _voicebot_url, _insecure, _provider_id
_server_url = server_url
_insecure = insecure
_voicebot_url = _construct_voicebot_url(host, str(port))
# Start the FastAPI server in a background thread
# Add reload functionality for development
if reload:
server_thread = threading.Thread(
target=lambda: uvicorn.run(
app,
host=host,
port=port,
log_level="info",
reload=True,
reload_dirs=["/voicebot", "/shared"]
),
daemon=True
)
else:
server_thread = threading.Thread(
target=lambda: uvicorn.run(app, host=host, port=port, log_level="info"),
daemon=True
)
logger.info(f"Starting bot provider API server on {host}:{port}...")
server_thread.start()
# If server_url is provided, attempt initial registration
if server_url:
logger.info(f"🔄 Server URL provided - will attempt registration with: {server_url}")
# Give the server a moment to start
logger.info("⏱️ Waiting 2 seconds for server to fully start...")
time.sleep(2)
try:
_provider_id = _perform_server_registration_sync(server_url, host, str(port), insecure)
logger.info(f"🎉 Registration completed successfully! Provider ID: {_provider_id}")
except Exception as e:
logger.error(f"❌ Failed initial registration with server: {e}")
logger.warning("⚠️ Bot orchestrator will continue running and attempt reconnection")
# Start a background thread for reconnection monitoring
def run_reconnection_monitor():
"""Run reconnection monitor in a separate thread with its own event loop."""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(reconnection_monitor())
except Exception as e:
logger.error(f"Reconnection monitor thread failed: {e}")
reconnect_thread = threading.Thread(target=run_reconnection_monitor, daemon=True)
reconnect_thread.start()
logger.info("🔄 Started reconnection monitor in background thread")
else:
logger.info(" No remote server URL provided - running in local-only mode")
# Keep the main thread alive
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Shutting down bot provider...")
_shutdown_event.set()