344 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			344 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """
 | |
| Bot Configuration Manager
 | |
| 
 | |
| This module handles per-lobby bot configuration management including:
 | |
| - Configuration schema discovery from bot providers
 | |
| - Configuration storage and retrieval per lobby
 | |
| - Real-time configuration updates
 | |
| - Validation of configuration values
 | |
| """
 | |
| 
 | |
| import json
 | |
| import time
 | |
| import httpx
 | |
| from typing import Dict, List, Optional, Any
 | |
| from pathlib import Path
 | |
| 
 | |
| from shared.logger import logger
 | |
| 
 | |
| # Import shared models
 | |
| import sys
 | |
| import os
 | |
| 
 | |
| sys.path.append(
 | |
|     os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 | |
| )
 | |
| from shared.models import BotConfigSchema, BotLobbyConfig
 | |
| 
 | |
| class BotConfigManager:
 | |
|     """Manages bot configurations for lobbies"""
 | |
|     
 | |
|     def __init__(self, storage_dir: str = "./bot_configs"):
 | |
|         self.storage_dir = Path(storage_dir)
 | |
|         self.storage_dir.mkdir(exist_ok=True)
 | |
|         # In-memory cache for fast access
 | |
|         self.config_cache: Dict[
 | |
|             str, Dict[str, BotLobbyConfig]
 | |
|         ] = {}  # lobby_id -> bot_name -> config
 | |
| 
 | |
|         # Load existing configurations
 | |
|         self._load_configurations()
 | |
| 
 | |
|     def _get_config_file(self, lobby_id: str) -> Path:
 | |
|         """Get configuration file path for a lobby"""
 | |
|         return self.storage_dir / f"lobby_{lobby_id}.json"
 | |
| 
 | |
|     def _get_schema_file(self, bot_name: str) -> Path:
 | |
|         """Get schema file path for a bot"""
 | |
|         return self.storage_dir / f"schema_{bot_name}.json"
 | |
| 
 | |
|     def _load_configurations(self):
 | |
|         """Load all configurations from disk"""
 | |
|         try:
 | |
|             # Load lobby configurations
 | |
|             for config_file in self.storage_dir.glob("lobby_*.json"):
 | |
|                 try:
 | |
|                     lobby_id = config_file.stem.replace("lobby_", "")
 | |
|                     with open(config_file, "r") as f:
 | |
|                         data = json.load(f)
 | |
| 
 | |
|                     self.config_cache[lobby_id] = {}
 | |
|                     for bot_name, config_data in data.items():
 | |
|                         config = BotLobbyConfig(**config_data)
 | |
|                         self.config_cache[lobby_id][bot_name] = config
 | |
| 
 | |
|                 except Exception as e:
 | |
|                     logger.error(f"Failed to load lobby config {config_file}: {e}")
 | |
| 
 | |
|             logger.info(f"Loaded configurations for {len(self.config_cache)} lobbies")
 | |
| 
 | |
|         except Exception as e:
 | |
|             logger.error(f"Failed to load configurations: {e}")
 | |
| 
 | |
|     def _save_lobby_config(self, lobby_id: str):
 | |
|         """Save lobby configuration to disk"""
 | |
|         try:
 | |
|             config_file = self._get_config_file(lobby_id)
 | |
| 
 | |
|             if lobby_id not in self.config_cache:
 | |
|                 return
 | |
| 
 | |
|             # Convert to serializable format
 | |
|             data = {}
 | |
|             for bot_name, config in self.config_cache[lobby_id].items():
 | |
|                 data[bot_name] = config.model_dump()
 | |
| 
 | |
|             with open(config_file, "w") as f:
 | |
|                 json.dump(data, f, indent=2)
 | |
| 
 | |
|         except Exception as e:
 | |
|             logger.error(f"Failed to save lobby config {lobby_id}: {e}")
 | |
| 
 | |
|     def _save_bot_schema(self, bot_name: str):
 | |
|         """Save bot schema to disk"""
 | |
|         # Schema file persistence disabled: server no longer caches provider schemas to disk.
 | |
|         logger.debug(
 | |
|             f"_save_bot_schema called for {bot_name}, but schema persistence is disabled"
 | |
|         )
 | |
| 
 | |
|     async def discover_bot_config_schema(
 | |
|         self, bot_name: str, provider_url: str, force_refresh: bool = False
 | |
|     ) -> Optional[BotConfigSchema]:
 | |
|         """Discover configuration schema from bot provider"""
 | |
|         try:
 | |
|             # Always fetch schema directly from the provider; do not use or
 | |
|             # update any server-side caches or files. This ensures callers
 | |
|             # receive the live schema from the provider on each request.
 | |
|             async with httpx.AsyncClient() as client:
 | |
|                 response = await client.get(
 | |
|                     f"{provider_url}/bots/{bot_name}/config-schema",
 | |
|                     timeout=10.0,
 | |
|                 )
 | |
| 
 | |
|                 if response.status_code == 200:
 | |
|                     schema_data = response.json()
 | |
|                     schema = BotConfigSchema(**schema_data)
 | |
|                     logger.info(f"Fetched live config schema for bot {bot_name}")
 | |
|                     return schema
 | |
|                 else:
 | |
|                     logger.warning(
 | |
|                         f"Bot {bot_name} does not support configuration (HTTP {response.status_code})"
 | |
|                     )
 | |
| 
 | |
|         except Exception as e:
 | |
|             logger.warning(f"Failed to discover config schema for bot {bot_name}: {e}")
 | |
| 
 | |
|         return None
 | |
|     
 | |
|     def get_bot_config_schema(self, bot_name: str) -> Optional[BotConfigSchema]:
 | |
|         """Deprecated: server no longer maintains a cached schema.
 | |
| 
 | |
|         Return None to indicate no server-side cached schema is available.
 | |
|         """
 | |
|         logger.debug(
 | |
|             "get_bot_config_schema called but server-side schema cache is disabled"
 | |
|         )
 | |
|         return None
 | |
| 
 | |
|     async def refresh_bot_schema(
 | |
|         self, bot_name: str, provider_url: str
 | |
|     ) -> Optional[BotConfigSchema]:
 | |
|         """Force refresh of bot schema from provider"""
 | |
|         return await self.discover_bot_config_schema(
 | |
|             bot_name, provider_url, force_refresh=True
 | |
|         )
 | |
| 
 | |
|     def clear_bot_schema_cache(self, bot_name: str) -> bool:
 | |
|         """Clear cached schema for a specific bot"""
 | |
|         # No-op: server-side schema caching has been disabled. Return False to
 | |
|         # indicate there was no cached schema to clear.
 | |
|         logger.info(
 | |
|             f"clear_bot_schema_cache called for {bot_name} but caching is disabled"
 | |
|         )
 | |
|         return False
 | |
|     
 | |
|     def get_lobby_bot_config(self, lobby_id: str, bot_name: str) -> Optional[BotLobbyConfig]:
 | |
|         """Get bot configuration for a specific lobby"""
 | |
|         if lobby_id in self.config_cache and bot_name in self.config_cache[lobby_id]:
 | |
|             return self.config_cache[lobby_id][bot_name]
 | |
|         return None
 | |
|     
 | |
|     def get_lobby_configs(self, lobby_id: str) -> List[BotLobbyConfig]:
 | |
|         """Get all bot configurations for a lobby"""
 | |
|         if lobby_id in self.config_cache:
 | |
|             return list(self.config_cache[lobby_id].values())
 | |
|         return []
 | |
|     
 | |
|     def set_bot_config(self, 
 | |
|                       lobby_id: str, 
 | |
|                       bot_name: str, 
 | |
|                       provider_id: str,
 | |
|                       config_values: Dict[str, Any], 
 | |
|                       session_id: str) -> BotLobbyConfig:
 | |
|         """Set or update bot configuration for a lobby"""
 | |
|         # Schema validation against a server-side cache is disabled. If
 | |
|         # callers want strict validation, they should fetch the provider's
 | |
|         # live schema and validate prior to calling set_bot_config.
 | |
|         validated_values = config_values
 | |
|         
 | |
|         # Create or update configuration
 | |
|         now = time.time()
 | |
|         
 | |
|         existing_config = self.get_lobby_bot_config(lobby_id, bot_name)
 | |
|         if existing_config:
 | |
|             # Update existing
 | |
|             config = BotLobbyConfig(
 | |
|                 bot_name=bot_name,
 | |
|                 lobby_id=lobby_id,
 | |
|                 provider_id=provider_id,
 | |
|                 config_values=validated_values,
 | |
|                 created_at=existing_config.created_at,
 | |
|                 updated_at=now,
 | |
|                 created_by=existing_config.created_by
 | |
|             )
 | |
|         else:
 | |
|             # Create new
 | |
|             config = BotLobbyConfig(
 | |
|                 bot_name=bot_name,
 | |
|                 lobby_id=lobby_id,
 | |
|                 provider_id=provider_id,
 | |
|                 config_values=validated_values,
 | |
|                 created_at=now,
 | |
|                 updated_at=now,
 | |
|                 created_by=session_id
 | |
|             )
 | |
|         
 | |
|         # Store in cache
 | |
|         if lobby_id not in self.config_cache:
 | |
|             self.config_cache[lobby_id] = {}
 | |
|         
 | |
|         self.config_cache[lobby_id][bot_name] = config
 | |
|         
 | |
|         # Save to disk
 | |
|         self._save_lobby_config(lobby_id)
 | |
|         
 | |
|         logger.info(f"Updated config for bot {bot_name} in lobby {lobby_id}")
 | |
|         return config
 | |
|     
 | |
|     def _validate_config_values(self, values: Dict[str, Any], schema: BotConfigSchema) -> Dict[str, Any]:
 | |
|         """Validate configuration values against schema"""
 | |
|         validated = {}
 | |
|         
 | |
|         for param in schema.parameters:
 | |
|             value = values.get(param.name)
 | |
|             
 | |
|             # Check required parameters
 | |
|             if param.required and value is None:
 | |
|                 if param.default_value is not None:
 | |
|                     value = param.default_value
 | |
|                 else:
 | |
|                     raise ValueError(f"Required parameter '{param.name}' is missing")
 | |
|             
 | |
|             # Skip None values for optional parameters
 | |
|             if value is None:
 | |
|                 continue
 | |
|             
 | |
|             # Type validation
 | |
|             if param.type == "string":
 | |
|                 value = str(value)
 | |
|                 if param.max_length and len(value) > param.max_length:
 | |
|                     raise ValueError(f"Parameter '{param.name}' exceeds max length {param.max_length}")
 | |
|                 
 | |
|             elif param.type == "number":
 | |
|                 value = float(value)
 | |
|                 if param.min_value is not None and value < param.min_value:
 | |
|                     raise ValueError(f"Parameter '{param.name}' below minimum {param.min_value}")
 | |
|                 if param.max_value is not None and value > param.max_value:
 | |
|                     raise ValueError(f"Parameter '{param.name}' above maximum {param.max_value}")
 | |
|                     
 | |
|             elif param.type == "boolean":
 | |
|                 value = bool(value)
 | |
|                 
 | |
|             elif param.type == "select":
 | |
|                 if param.options:
 | |
|                     valid_values = [opt["value"] for opt in param.options]
 | |
|                     if value not in valid_values:
 | |
|                         raise ValueError(f"Parameter '{param.name}' must be one of {valid_values}")
 | |
|             
 | |
|             elif param.type == "range":
 | |
|                 value = float(value)
 | |
|                 if param.min_value is not None and value < param.min_value:
 | |
|                     value = param.min_value
 | |
|                 if param.max_value is not None and value > param.max_value:
 | |
|                     value = param.max_value
 | |
|             
 | |
|             validated[param.name] = value
 | |
|         
 | |
|         return validated
 | |
|     
 | |
|     def delete_bot_config(self, lobby_id: str, bot_name: str) -> bool:
 | |
|         """Delete bot configuration for a lobby"""
 | |
|         if lobby_id in self.config_cache and bot_name in self.config_cache[lobby_id]:
 | |
|             del self.config_cache[lobby_id][bot_name]
 | |
|             
 | |
|             # Clean up empty lobby configs
 | |
|             if not self.config_cache[lobby_id]:
 | |
|                 del self.config_cache[lobby_id]
 | |
|                 config_file = self._get_config_file(lobby_id)
 | |
|                 if config_file.exists():
 | |
|                     config_file.unlink()
 | |
|             else:
 | |
|                 self._save_lobby_config(lobby_id)
 | |
|             
 | |
|             logger.info(f"Deleted config for bot {bot_name} in lobby {lobby_id}")
 | |
|             return True
 | |
|         
 | |
|         return False
 | |
|     
 | |
|     def delete_lobby_configs(self, lobby_id: str) -> bool:
 | |
|         """Delete all bot configurations for a lobby"""
 | |
|         if lobby_id in self.config_cache:
 | |
|             del self.config_cache[lobby_id]
 | |
|             
 | |
|             config_file = self._get_config_file(lobby_id)
 | |
|             if config_file.exists():
 | |
|                 config_file.unlink()
 | |
|             
 | |
|             logger.info(f"Deleted all configs for lobby {lobby_id}")
 | |
|             return True
 | |
|         
 | |
|         return False
 | |
|     
 | |
|     async def notify_bot_config_change(self, 
 | |
|                                      provider_url: str, 
 | |
|                                      bot_name: str, 
 | |
|                                      lobby_id: str, 
 | |
|                                      config: BotLobbyConfig):
 | |
|         """Notify bot provider of configuration change"""
 | |
|         try:
 | |
|             async with httpx.AsyncClient() as client:
 | |
|                 # Notify the bot provider of the configuration change
 | |
|                 response = await client.post(
 | |
|                     f"{provider_url}/bots/{bot_name}/config",
 | |
|                     json={
 | |
|                         "lobby_id": lobby_id,
 | |
|                         "config_values": config.config_values
 | |
|                     },
 | |
|                     timeout=10.0
 | |
|                 )
 | |
|                 
 | |
|                 if response.status_code == 200:
 | |
|                     logger.info(f"Successfully notified bot {bot_name} of config change")
 | |
|                     return True
 | |
|                 else:
 | |
|                     logger.warning(f"Failed to notify bot {bot_name} of config change: HTTP {response.status_code}")
 | |
|                     
 | |
|         except Exception as e:
 | |
|             logger.error(f"Failed to notify bot {bot_name} of config change: {e}")
 | |
|         
 | |
|         return False
 | |
|     
 | |
|     def get_statistics(self) -> Dict[str, Any]:
 | |
|         """Get configuration manager statistics"""
 | |
|         total_configs = sum(len(lobby_configs) for lobby_configs in self.config_cache.values())
 | |
|         
 | |
|         return {
 | |
|             "total_lobbies": len(self.config_cache),
 | |
|             "total_configs": total_configs,
 | |
|             # server-side schema caching is disabled; omit cached_schemas
 | |
|             "lobbies": {
 | |
|                 lobby_id: len(configs)
 | |
|                 for lobby_id, configs in self.config_cache.items()
 | |
|             },
 | |
|         }
 |