""" Bot Configuration Manager This module handles per-lobby bot configuration management including: - Configuration schema discovery from bot providers - Configuration storage and retrieval per lobby - Real-time configuration updates - Validation of configuration values """ import json import time import httpx from typing import Dict, List, Optional, Any from pathlib import Path from shared.logger import logger # Import shared models import sys import os sys.path.append( os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) ) from shared.models import BotConfigSchema, BotLobbyConfig class BotConfigManager: """Manages bot configurations for lobbies""" def __init__(self, storage_dir: str = "./bot_configs"): self.storage_dir = Path(storage_dir) self.storage_dir.mkdir(exist_ok=True) # In-memory cache for fast access self.config_cache: Dict[str, Dict[str, BotLobbyConfig]] = {} # lobby_id -> bot_name -> config self.schema_cache: Dict[str, BotConfigSchema] = {} # bot_name -> schema # Load existing configurations self._load_configurations() def _get_config_file(self, lobby_id: str) -> Path: """Get configuration file path for a lobby""" return self.storage_dir / f"lobby_{lobby_id}.json" def _get_schema_file(self, bot_name: str) -> Path: """Get schema file path for a bot""" return self.storage_dir / f"schema_{bot_name}.json" def _load_configurations(self): """Load all configurations from disk""" try: # Load lobby configurations for config_file in self.storage_dir.glob("lobby_*.json"): try: lobby_id = config_file.stem.replace("lobby_", "") with open(config_file, 'r') as f: data = json.load(f) self.config_cache[lobby_id] = {} for bot_name, config_data in data.items(): config = BotLobbyConfig(**config_data) self.config_cache[lobby_id][bot_name] = config except Exception as e: logger.error(f"Failed to load lobby config {config_file}: {e}") # Load bot schemas for schema_file in self.storage_dir.glob("schema_*.json"): try: bot_name = schema_file.stem.replace("schema_", "") with open(schema_file, 'r') as f: schema_data = json.load(f) schema = BotConfigSchema(**schema_data) self.schema_cache[bot_name] = schema except Exception as e: logger.error(f"Failed to load bot schema {schema_file}: {e}") logger.info(f"Loaded configurations for {len(self.config_cache)} lobbies and {len(self.schema_cache)} bot schemas") except Exception as e: logger.error(f"Failed to load configurations: {e}") def _save_lobby_config(self, lobby_id: str): """Save lobby configuration to disk""" try: config_file = self._get_config_file(lobby_id) if lobby_id not in self.config_cache: return # Convert to serializable format data = {} for bot_name, config in self.config_cache[lobby_id].items(): data[bot_name] = config.model_dump() with open(config_file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: logger.error(f"Failed to save lobby config {lobby_id}: {e}") def _save_bot_schema(self, bot_name: str): """Save bot schema to disk""" try: if bot_name not in self.schema_cache: return schema_file = self._get_schema_file(bot_name) schema_data = self.schema_cache[bot_name].model_dump() with open(schema_file, 'w') as f: json.dump(schema_data, f, indent=2) except Exception as e: logger.error(f"Failed to save bot schema {bot_name}: {e}") async def discover_bot_config_schema( self, bot_name: str, provider_url: str, force_refresh: bool = False ) -> Optional[BotConfigSchema]: """Discover configuration schema from bot provider""" try: # Check if we have a cached schema and it's not forced refresh if not force_refresh and bot_name in self.schema_cache: cached_schema = self.schema_cache[bot_name] # Check if schema is less than 1 hour old schema_file = self._get_schema_file(bot_name) if schema_file.exists(): file_age = time.time() - schema_file.stat().st_mtime if file_age < 3600: # 1 hour logger.debug( f"Using cached schema for bot {bot_name} (age: {file_age:.0f}s)" ) return cached_schema async with httpx.AsyncClient() as client: # Try to get configuration schema from bot provider response = await client.get( f"{provider_url}/bots/{bot_name}/config-schema", timeout=10.0 ) if response.status_code == 200: schema_data = response.json() schema = BotConfigSchema(**schema_data) # Check if schema has actually changed if bot_name in self.schema_cache: old_schema = self.schema_cache[bot_name] if old_schema.model_dump() == schema.model_dump(): logger.debug( f"Schema for bot {bot_name} unchanged, updating timestamp only" ) else: logger.info(f"Schema for bot {bot_name} has been updated") # Cache the schema self.schema_cache[bot_name] = schema self._save_bot_schema(bot_name) logger.info( f"Discovered/refreshed config schema for bot {bot_name}" ) return schema else: logger.warning(f"Bot {bot_name} does not support configuration (HTTP {response.status_code})") except Exception as e: logger.warning(f"Failed to discover config schema for bot {bot_name}: {e}") # Return cached schema if available, even if refresh failed if bot_name in self.schema_cache: logger.info( f"Returning cached schema for bot {bot_name} after refresh failure" ) return self.schema_cache[bot_name] return None def get_bot_config_schema(self, bot_name: str) -> Optional[BotConfigSchema]: """Get cached configuration schema for a bot""" return self.schema_cache.get(bot_name) async def refresh_bot_schema( self, bot_name: str, provider_url: str ) -> Optional[BotConfigSchema]: """Force refresh of bot schema from provider""" return await self.discover_bot_config_schema( bot_name, provider_url, force_refresh=True ) def clear_bot_schema_cache(self, bot_name: str) -> bool: """Clear cached schema for a specific bot""" if bot_name in self.schema_cache: del self.schema_cache[bot_name] # Also remove the cached file schema_file = self._get_schema_file(bot_name) if schema_file.exists(): schema_file.unlink() logger.info(f"Cleared schema cache for bot {bot_name}") return True return False def get_lobby_bot_config(self, lobby_id: str, bot_name: str) -> Optional[BotLobbyConfig]: """Get bot configuration for a specific lobby""" if lobby_id in self.config_cache and bot_name in self.config_cache[lobby_id]: return self.config_cache[lobby_id][bot_name] return None def get_lobby_configs(self, lobby_id: str) -> List[BotLobbyConfig]: """Get all bot configurations for a lobby""" if lobby_id in self.config_cache: return list(self.config_cache[lobby_id].values()) return [] def set_bot_config(self, lobby_id: str, bot_name: str, provider_id: str, config_values: Dict[str, Any], session_id: str) -> BotLobbyConfig: """Set or update bot configuration for a lobby""" # Validate configuration against schema if available schema = self.get_bot_config_schema(bot_name) if schema: validated_values = self._validate_config_values(config_values, schema) else: validated_values = config_values # Create or update configuration now = time.time() existing_config = self.get_lobby_bot_config(lobby_id, bot_name) if existing_config: # Update existing config = BotLobbyConfig( bot_name=bot_name, lobby_id=lobby_id, provider_id=provider_id, config_values=validated_values, created_at=existing_config.created_at, updated_at=now, created_by=existing_config.created_by ) else: # Create new config = BotLobbyConfig( bot_name=bot_name, lobby_id=lobby_id, provider_id=provider_id, config_values=validated_values, created_at=now, updated_at=now, created_by=session_id ) # Store in cache if lobby_id not in self.config_cache: self.config_cache[lobby_id] = {} self.config_cache[lobby_id][bot_name] = config # Save to disk self._save_lobby_config(lobby_id) logger.info(f"Updated config for bot {bot_name} in lobby {lobby_id}") return config def _validate_config_values(self, values: Dict[str, Any], schema: BotConfigSchema) -> Dict[str, Any]: """Validate configuration values against schema""" validated = {} for param in schema.parameters: value = values.get(param.name) # Check required parameters if param.required and value is None: if param.default_value is not None: value = param.default_value else: raise ValueError(f"Required parameter '{param.name}' is missing") # Skip None values for optional parameters if value is None: continue # Type validation if param.type == "string": value = str(value) if param.max_length and len(value) > param.max_length: raise ValueError(f"Parameter '{param.name}' exceeds max length {param.max_length}") elif param.type == "number": value = float(value) if param.min_value is not None and value < param.min_value: raise ValueError(f"Parameter '{param.name}' below minimum {param.min_value}") if param.max_value is not None and value > param.max_value: raise ValueError(f"Parameter '{param.name}' above maximum {param.max_value}") elif param.type == "boolean": value = bool(value) elif param.type == "select": if param.options: valid_values = [opt["value"] for opt in param.options] if value not in valid_values: raise ValueError(f"Parameter '{param.name}' must be one of {valid_values}") elif param.type == "range": value = float(value) if param.min_value is not None and value < param.min_value: value = param.min_value if param.max_value is not None and value > param.max_value: value = param.max_value validated[param.name] = value return validated def delete_bot_config(self, lobby_id: str, bot_name: str) -> bool: """Delete bot configuration for a lobby""" if lobby_id in self.config_cache and bot_name in self.config_cache[lobby_id]: del self.config_cache[lobby_id][bot_name] # Clean up empty lobby configs if not self.config_cache[lobby_id]: del self.config_cache[lobby_id] config_file = self._get_config_file(lobby_id) if config_file.exists(): config_file.unlink() else: self._save_lobby_config(lobby_id) logger.info(f"Deleted config for bot {bot_name} in lobby {lobby_id}") return True return False def delete_lobby_configs(self, lobby_id: str) -> bool: """Delete all bot configurations for a lobby""" if lobby_id in self.config_cache: del self.config_cache[lobby_id] config_file = self._get_config_file(lobby_id) if config_file.exists(): config_file.unlink() logger.info(f"Deleted all configs for lobby {lobby_id}") return True return False async def notify_bot_config_change(self, provider_url: str, bot_name: str, lobby_id: str, config: BotLobbyConfig): """Notify bot provider of configuration change""" try: async with httpx.AsyncClient() as client: # Notify the bot provider of the configuration change response = await client.post( f"{provider_url}/bots/{bot_name}/config", json={ "lobby_id": lobby_id, "config_values": config.config_values }, timeout=10.0 ) if response.status_code == 200: logger.info(f"Successfully notified bot {bot_name} of config change") return True else: logger.warning(f"Failed to notify bot {bot_name} of config change: HTTP {response.status_code}") except Exception as e: logger.error(f"Failed to notify bot {bot_name} of config change: {e}") return False def get_statistics(self) -> Dict[str, Any]: """Get configuration manager statistics""" total_configs = sum(len(lobby_configs) for lobby_configs in self.config_cache.values()) return { "total_lobbies": len(self.config_cache), "total_configs": total_configs, "cached_schemas": len(self.schema_cache), "lobbies": { lobby_id: len(configs) for lobby_id, configs in self.config_cache.items() } }