""" Bot Configuration API This module provides REST API endpoints for managing bot configurations including schema discovery, configuration CRUD operations, and real-time updates. """ from typing import Dict, List, Optional, Any from fastapi import APIRouter, HTTPException, BackgroundTasks, WebSocket from logger import logger from core.bot_config_manager import BotConfigManager # Import WebSocket handler base class try: from websocket.message_handlers import MessageHandler except ImportError: from ..websocket.message_handlers import MessageHandler # Import shared models with fallback handling try: from ...shared.models import ( BotConfigSchema, BotLobbyConfig, BotConfigUpdateRequest, BotConfigUpdateResponse, BotConfigListResponse ) except ImportError: try: from shared.models import ( BotConfigSchema, BotLobbyConfig, BotConfigUpdateRequest, BotConfigUpdateResponse, BotConfigListResponse ) except ImportError: # Create dummy models for standalone testing from pydantic import BaseModel class BotConfigSchema(BaseModel): bot_name: str version: str = "1.0" parameters: List[Dict[str, Any]] class BotLobbyConfig(BaseModel): bot_name: str lobby_id: str provider_id: str config_values: Dict[str, Any] created_at: float updated_at: float created_by: str class BotConfigUpdateRequest(BaseModel): bot_name: str lobby_id: str config_values: Dict[str, Any] class BotConfigUpdateResponse(BaseModel): success: bool message: str updated_config: Optional[BotLobbyConfig] = None class BotConfigListResponse(BaseModel): lobby_id: str configs: List[BotLobbyConfig] def create_bot_config_router(config_manager: BotConfigManager, bot_manager) -> APIRouter: """Create FastAPI router for bot configuration endpoints""" router = APIRouter(prefix="/api/bots/config", tags=["Bot Configuration"]) @router.get("/schema/{bot_name}") async def get_bot_config_schema(bot_name: str) -> BotConfigSchema: """Get configuration schema for a specific bot""" try: # Check if we have cached schema schema = config_manager.get_bot_config_schema(bot_name) if not schema: # Try to discover schema from bot provider providers = bot_manager.get_providers() for provider_id, provider in providers.items(): try: # Check if this provider has the bot provider_bots = await bot_manager.get_provider_bots(provider_id) bot_names = [bot.name for bot in provider_bots.bots] if bot_name in bot_names: schema = await config_manager.discover_bot_config_schema( bot_name, provider.base_url ) if schema: break except Exception as e: logger.warning(f"Failed to check provider {provider_id} for bot {bot_name}: {e}") continue if not schema: raise HTTPException( status_code=404, detail=f"No configuration schema found for bot '{bot_name}'" ) return schema except HTTPException: raise except Exception as e: logger.error(f"Failed to get bot config schema for {bot_name}: {e}") raise HTTPException(status_code=500, detail="Internal server error") @router.get("/lobby/{lobby_id}") async def get_lobby_bot_configs(lobby_id: str) -> BotConfigListResponse: """Get all bot configurations for a lobby""" try: configs = config_manager.get_lobby_configs(lobby_id) return BotConfigListResponse(lobby_id=lobby_id, configs=configs) except Exception as e: logger.error(f"Failed to get lobby configs for {lobby_id}: {e}") raise HTTPException(status_code=500, detail="Internal server error") @router.get("/lobby/{lobby_id}/bot/{bot_name}") async def get_lobby_bot_config(lobby_id: str, bot_name: str) -> BotLobbyConfig: """Get specific bot configuration for a lobby""" try: config = config_manager.get_lobby_bot_config(lobby_id, bot_name) if not config: raise HTTPException( status_code=404, detail=f"No configuration found for bot '{bot_name}' in lobby '{lobby_id}'" ) return config except HTTPException: raise except Exception as e: logger.error(f"Failed to get config for bot {bot_name} in lobby {lobby_id}: {e}") raise HTTPException(status_code=500, detail="Internal server error") @router.post("/update") async def update_bot_config( request: BotConfigUpdateRequest, background_tasks: BackgroundTasks, session_id: str = "unknown" # TODO: Get from auth/session context ) -> BotConfigUpdateResponse: """Update bot configuration for a lobby""" try: # Find the provider for this bot provider_id = None provider_url = None providers = bot_manager.get_providers() for pid, provider in providers.items(): try: provider_bots = await bot_manager.get_provider_bots(pid) bot_names = [bot.name for bot in provider_bots.bots] if request.bot_name in bot_names: provider_id = pid provider_url = provider.base_url break except Exception: continue if not provider_id: raise HTTPException( status_code=404, detail=f"Bot '{request.bot_name}' not found in any provider" ) # Update configuration config = config_manager.set_bot_config( lobby_id=request.lobby_id, bot_name=request.bot_name, provider_id=provider_id, config_values=request.config_values, session_id=session_id ) # Notify bot provider in background background_tasks.add_task( config_manager.notify_bot_config_change, provider_url, request.bot_name, request.lobby_id, config ) return BotConfigUpdateResponse( success=True, message="Configuration updated successfully", updated_config=config ) except ValueError as e: # Validation error return BotConfigUpdateResponse( success=False, message=f"Validation error: {str(e)}" ) except HTTPException: raise except Exception as e: logger.error(f"Failed to update bot config: {e}") raise HTTPException(status_code=500, detail="Internal server error") @router.delete("/lobby/{lobby_id}/bot/{bot_name}") async def delete_bot_config(lobby_id: str, bot_name: str) -> Dict[str, Any]: """Delete bot configuration for a lobby""" try: success = config_manager.delete_bot_config(lobby_id, bot_name) if not success: raise HTTPException( status_code=404, detail=f"No configuration found for bot '{bot_name}' in lobby '{lobby_id}'" ) return {"success": True, "message": "Configuration deleted successfully"} except HTTPException: raise except Exception as e: logger.error(f"Failed to delete bot config: {e}") raise HTTPException(status_code=500, detail="Internal server error") @router.delete("/lobby/{lobby_id}") async def delete_lobby_configs(lobby_id: str) -> Dict[str, Any]: """Delete all bot configurations for a lobby""" try: success = config_manager.delete_lobby_configs(lobby_id) return { "success": success, "message": "All lobby configurations deleted" if success else "No configurations found" } except Exception as e: logger.error(f"Failed to delete lobby configs: {e}") raise HTTPException(status_code=500, detail="Internal server error") @router.get("/statistics") async def get_config_statistics() -> Dict[str, Any]: """Get configuration manager statistics""" try: return config_manager.get_statistics() except Exception as e: logger.error(f"Failed to get config statistics: {e}") raise HTTPException(status_code=500, detail="Internal server error") @router.post("/refresh-schemas") async def refresh_bot_schemas(background_tasks: BackgroundTasks) -> Dict[str, Any]: """Refresh all bot configuration schemas from providers""" try: async def refresh_task(): refreshed = 0 providers = bot_manager.get_providers() for provider_id, provider in providers.items(): try: provider_bots = await bot_manager.get_provider_bots(provider_id) for bot in provider_bots.bots: try: schema = await config_manager.discover_bot_config_schema( bot.name, provider.base_url ) if schema: refreshed += 1 except Exception as e: logger.warning(f"Failed to refresh schema for {bot.name}: {e}") except Exception as e: logger.warning(f"Failed to refresh schemas from provider {provider_id}: {e}") logger.info(f"Refreshed {refreshed} bot configuration schemas") background_tasks.add_task(refresh_task) return { "success": True, "message": "Schema refresh started in background" } except Exception as e: logger.error(f"Failed to start schema refresh: {e}") raise HTTPException(status_code=500, detail="Internal server error") return router class BotConfigUpdateHandler(MessageHandler): """WebSocket handler for real-time bot configuration updates""" def __init__(self, config_manager: BotConfigManager): self.config_manager = config_manager async def handle(self, session, lobby, data: Dict[str, Any], websocket: WebSocket, managers: Dict[str, Any]): """Handle real-time bot configuration updates via WebSocket""" try: # Extract update data lobby_id = lobby.lobby_id if lobby else data.get("lobby_id") bot_name = data.get("bot_name") config_values = data.get("config_values") session_id = session.session_id if session else "unknown" if not all([lobby_id, bot_name, config_values]): await websocket.send_json({ "type": "bot_config_error", "error": "Missing required fields: lobby_id, bot_name, config_values" }) return # Update configuration (this will validate the values) config = self.config_manager.set_bot_config( lobby_id=lobby_id, bot_name=bot_name, provider_id="", # Will be resolved config_values=config_values, session_id=session_id ) # Send success response await websocket.send_json({ "type": "bot_config_updated", "config": { "bot_name": config.bot_name, "lobby_id": config.lobby_id, "config_values": config.config_values, "updated_at": config.updated_at } }) logger.info(f"Bot configuration updated via WebSocket: {bot_name} in lobby {lobby_id}") except Exception as e: error_msg = f"Error updating bot configuration: {str(e)}" logger.error(error_msg) await websocket.send_json({ "type": "bot_config_error", "error": error_msg }) def setup_websocket_config_handlers(websocket_manager, config_manager: BotConfigManager): """Setup WebSocket handlers for real-time configuration updates""" # Register the bot configuration update handler config_handler = BotConfigUpdateHandler(config_manager) websocket_manager.message_router.register("bot_config_update", config_handler) logger.info("Bot configuration WebSocket handlers registered")