ai-voicebot/server/api/bot_config.py

359 lines
14 KiB
Python

"""
Bot Configuration API
This module provides REST API endpoints for managing bot configurations
including schema discovery, configuration CRUD operations, and real-time updates.
"""
from typing import Dict, List, Optional, Any
from fastapi import APIRouter, HTTPException, BackgroundTasks, WebSocket
from logger import logger
from core.bot_config_manager import BotConfigManager
# Import WebSocket handler base class
try:
from websocket.message_handlers import MessageHandler
except ImportError:
from ..websocket.message_handlers import MessageHandler
# Import shared models with fallback handling
try:
from ...shared.models import (
BotConfigSchema,
BotLobbyConfig,
BotConfigUpdateRequest,
BotConfigUpdateResponse,
BotConfigListResponse
)
except ImportError:
try:
from shared.models import (
BotConfigSchema,
BotLobbyConfig,
BotConfigUpdateRequest,
BotConfigUpdateResponse,
BotConfigListResponse
)
except ImportError:
# Create dummy models for standalone testing
from pydantic import BaseModel
class BotConfigSchema(BaseModel):
bot_name: str
version: str = "1.0"
parameters: List[Dict[str, Any]]
class BotLobbyConfig(BaseModel):
bot_name: str
lobby_id: str
provider_id: str
config_values: Dict[str, Any]
created_at: float
updated_at: float
created_by: str
class BotConfigUpdateRequest(BaseModel):
bot_name: str
lobby_id: str
config_values: Dict[str, Any]
class BotConfigUpdateResponse(BaseModel):
success: bool
message: str
updated_config: Optional[BotLobbyConfig] = None
class BotConfigListResponse(BaseModel):
lobby_id: str
configs: List[BotLobbyConfig]
def create_bot_config_router(config_manager: BotConfigManager, bot_manager) -> APIRouter:
"""Create FastAPI router for bot configuration endpoints"""
router = APIRouter(prefix="/api/bots/config", tags=["Bot Configuration"])
@router.get("/schema/{bot_name}")
async def get_bot_config_schema(bot_name: str) -> BotConfigSchema:
"""Get configuration schema for a specific bot"""
try:
# Check if we have cached schema
schema = config_manager.get_bot_config_schema(bot_name)
if not schema:
# Try to discover schema from bot provider
providers = bot_manager.get_providers()
for provider_id, provider in providers.items():
try:
# Check if this provider has the bot
provider_bots = await bot_manager.get_provider_bots(provider_id)
bot_names = [bot.name for bot in provider_bots.bots]
if bot_name in bot_names:
schema = await config_manager.discover_bot_config_schema(
bot_name, provider.base_url
)
if schema:
break
except Exception as e:
logger.warning(f"Failed to check provider {provider_id} for bot {bot_name}: {e}")
continue
if not schema:
raise HTTPException(
status_code=404,
detail=f"No configuration schema found for bot '{bot_name}'"
)
return schema
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get bot config schema for {bot_name}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/lobby/{lobby_id}")
async def get_lobby_bot_configs(lobby_id: str) -> BotConfigListResponse:
"""Get all bot configurations for a lobby"""
try:
configs = config_manager.get_lobby_configs(lobby_id)
return BotConfigListResponse(lobby_id=lobby_id, configs=configs)
except Exception as e:
logger.error(f"Failed to get lobby configs for {lobby_id}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/lobby/{lobby_id}/bot/{bot_name}")
async def get_lobby_bot_config(lobby_id: str, bot_name: str) -> BotLobbyConfig:
"""Get specific bot configuration for a lobby"""
try:
config = config_manager.get_lobby_bot_config(lobby_id, bot_name)
if not config:
raise HTTPException(
status_code=404,
detail=f"No configuration found for bot '{bot_name}' in lobby '{lobby_id}'"
)
return config
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get config for bot {bot_name} in lobby {lobby_id}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/update")
async def update_bot_config(
request: BotConfigUpdateRequest,
background_tasks: BackgroundTasks,
session_id: str = "unknown" # TODO: Get from auth/session context
) -> BotConfigUpdateResponse:
"""Update bot configuration for a lobby"""
try:
# Find the provider for this bot
provider_id = None
provider_url = None
providers = bot_manager.get_providers()
for pid, provider in providers.items():
try:
provider_bots = await bot_manager.get_provider_bots(pid)
bot_names = [bot.name for bot in provider_bots.bots]
if request.bot_name in bot_names:
provider_id = pid
provider_url = provider.base_url
break
except Exception:
continue
if not provider_id:
raise HTTPException(
status_code=404,
detail=f"Bot '{request.bot_name}' not found in any provider"
)
# Update configuration
config = config_manager.set_bot_config(
lobby_id=request.lobby_id,
bot_name=request.bot_name,
provider_id=provider_id,
config_values=request.config_values,
session_id=session_id
)
# Notify bot provider in background
background_tasks.add_task(
config_manager.notify_bot_config_change,
provider_url,
request.bot_name,
request.lobby_id,
config
)
return BotConfigUpdateResponse(
success=True,
message="Configuration updated successfully",
updated_config=config
)
except ValueError as e:
# Validation error
return BotConfigUpdateResponse(
success=False,
message=f"Validation error: {str(e)}"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to update bot config: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.delete("/lobby/{lobby_id}/bot/{bot_name}")
async def delete_bot_config(lobby_id: str, bot_name: str) -> Dict[str, Any]:
"""Delete bot configuration for a lobby"""
try:
success = config_manager.delete_bot_config(lobby_id, bot_name)
if not success:
raise HTTPException(
status_code=404,
detail=f"No configuration found for bot '{bot_name}' in lobby '{lobby_id}'"
)
return {"success": True, "message": "Configuration deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to delete bot config: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.delete("/lobby/{lobby_id}")
async def delete_lobby_configs(lobby_id: str) -> Dict[str, Any]:
"""Delete all bot configurations for a lobby"""
try:
success = config_manager.delete_lobby_configs(lobby_id)
return {
"success": success,
"message": "All lobby configurations deleted" if success else "No configurations found"
}
except Exception as e:
logger.error(f"Failed to delete lobby configs: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/statistics")
async def get_config_statistics() -> Dict[str, Any]:
"""Get configuration manager statistics"""
try:
return config_manager.get_statistics()
except Exception as e:
logger.error(f"Failed to get config statistics: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/refresh-schemas")
async def refresh_bot_schemas(background_tasks: BackgroundTasks) -> Dict[str, Any]:
"""Refresh all bot configuration schemas from providers"""
try:
async def refresh_task():
refreshed = 0
providers = bot_manager.get_providers()
for provider_id, provider in providers.items():
try:
provider_bots = await bot_manager.get_provider_bots(provider_id)
for bot in provider_bots.bots:
try:
schema = await config_manager.discover_bot_config_schema(
bot.name, provider.base_url
)
if schema:
refreshed += 1
except Exception as e:
logger.warning(f"Failed to refresh schema for {bot.name}: {e}")
except Exception as e:
logger.warning(f"Failed to refresh schemas from provider {provider_id}: {e}")
logger.info(f"Refreshed {refreshed} bot configuration schemas")
background_tasks.add_task(refresh_task)
return {
"success": True,
"message": "Schema refresh started in background"
}
except Exception as e:
logger.error(f"Failed to start schema refresh: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
return router
class BotConfigUpdateHandler(MessageHandler):
"""WebSocket handler for real-time bot configuration updates"""
def __init__(self, config_manager: BotConfigManager):
self.config_manager = config_manager
async def handle(self, session, lobby, data: Dict[str, Any], websocket: WebSocket, managers: Dict[str, Any]):
"""Handle real-time bot configuration updates via WebSocket"""
try:
# Extract update data
lobby_id = lobby.lobby_id if lobby else data.get("lobby_id")
bot_name = data.get("bot_name")
config_values = data.get("config_values")
session_id = session.session_id if session else "unknown"
if not all([lobby_id, bot_name, config_values]):
await websocket.send_json({
"type": "bot_config_error",
"error": "Missing required fields: lobby_id, bot_name, config_values"
})
return
# Update configuration (this will validate the values)
config = self.config_manager.set_bot_config(
lobby_id=lobby_id,
bot_name=bot_name,
provider_id="", # Will be resolved
config_values=config_values,
session_id=session_id
)
# Send success response
await websocket.send_json({
"type": "bot_config_updated",
"config": {
"bot_name": config.bot_name,
"lobby_id": config.lobby_id,
"config_values": config.config_values,
"updated_at": config.updated_at
}
})
logger.info(f"Bot configuration updated via WebSocket: {bot_name} in lobby {lobby_id}")
except Exception as e:
error_msg = f"Error updating bot configuration: {str(e)}"
logger.error(error_msg)
await websocket.send_json({
"type": "bot_config_error",
"error": error_msg
})
def setup_websocket_config_handlers(websocket_manager, config_manager: BotConfigManager):
"""Setup WebSocket handlers for real-time configuration updates"""
# Register the bot configuration update handler
config_handler = BotConfigUpdateHandler(config_manager)
websocket_manager.message_router.register("bot_config_update", config_handler)
logger.info("Bot configuration WebSocket handlers registered")