"""Enhanced AI Chatbot with Multi-Provider Support and Personality System. This bot demonstrates the advanced capabilities including: - Multi-provider AI integration (OpenAI, Anthropic, Local models) - Personality system with configurable traits - Conversation context and memory management - Enhanced response generation with streaming support """ import os import time import uuid import secrets from typing import Dict, Optional, Callable, Awaitable, Any, Union, AsyncGenerator from aiortc import MediaStreamTrack # Import system modules import sys sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from shared.logger import logger from shared.models import ChatMessageModel # Import advanced bot management modules try: from voicebot.ai_providers import ( AIProviderType, AIProviderConfig, ai_provider_manager, ConversationContext, MessageRole ) from voicebot.personality_system import personality_manager, PersonalityTrait, CommunicationStyle from voicebot.conversation_context import context_manager AI_PROVIDERS_AVAILABLE = True except ImportError as e: logger.warning(f"Advanced AI features not available: {e}") AI_PROVIDERS_AVAILABLE = False AGENT_NAME = "Generative Chat Bot" AGENT_DESCRIPTION = "Advanced AI chatbot with multi-provider support, personality system, and conversation memory" # Bot configuration from environment BOT_PERSONALITY = os.getenv("AI_CHATBOT_PERSONALITY", "helpful_assistant") BOT_AI_PROVIDER = os.getenv("AI_CHATBOT_PROVIDER", "openai") BOT_STREAMING = os.getenv("AI_CHATBOT_STREAMING", "false").lower() == "true" BOT_MEMORY_ENABLED = os.getenv("AI_CHATBOT_MEMORY", "true").lower() == "true" # Fallback responses when AI providers are not available FALLBACK_RESPONSES = { "greeting": [ "Hello! I'm an AI chatbot ready to help you.", "Hi there! How can I assist you today?", "Greetings! I'm here to chat and help with any questions you have." ], "help": [ "I'm an advanced AI chatbot that can help with various topics. Just ask me anything!", "I can assist with questions, have conversations, and provide information on many subjects.", "Feel free to ask me questions or just chat - I'm here to help!" ], "capabilities": [ "I support multiple AI providers, have configurable personalities, and maintain conversation context.", "My capabilities include natural conversation, information retrieval, and adaptive personality responses.", "I can remember our conversation context and adapt my responses based on configured personality traits." ], "default": [ "That's interesting! Tell me more about that.", "I understand. What would you like to discuss next?", "Thanks for sharing! How can I help you further?", "I see. Is there anything specific you'd like to know about?" ], "error": [ "I apologize, but I'm having trouble processing that right now. Could you try rephrasing?", "Something went wrong on my end. Could you ask that again?", "I encountered an issue. Please try your question again." ] } class EnhancedAIChatbot: """Enhanced AI chatbot with advanced features.""" def __init__(self, session_name: str): self.session_name = session_name self.session_id = str(uuid.uuid4()) self.ai_provider = None self.personality = None self.conversation_context = None self.initialized = False # Instance configuration variables self.bot_personality = BOT_PERSONALITY self.bot_ai_provider = BOT_AI_PROVIDER self.bot_streaming = BOT_STREAMING self.bot_memory_enabled = BOT_MEMORY_ENABLED # Per-lobby configurations self.lobby_configs: Dict[str, Dict[str, Any]] = {} # Initialize advanced features if available if AI_PROVIDERS_AVAILABLE: self._initialize_ai_features() else: logger.warning("Running in fallback mode - advanced AI features disabled") def _initialize_ai_features(self): """Initialize AI provider, personality, and context management.""" try: # Initialize personality self.personality = personality_manager.create_personality_from_template(self.bot_personality) if not self.personality: logger.warning(f"Personality template '{self.bot_personality}' not found, using default") self.personality = personality_manager.create_personality_from_template("helpful_assistant") # Initialize AI provider provider_type = AIProviderType(self.bot_ai_provider) self.ai_provider = ai_provider_manager.create_provider(provider_type) ai_provider_manager.register_provider(f"{AGENT_NAME}_{self.session_id}", self.ai_provider) # Initialize conversation context if memory is enabled if self.bot_memory_enabled: self.conversation_context = context_manager.get_or_create_context( session_id=self.session_id, bot_name=AGENT_NAME, conversation_id=f"{AGENT_NAME}_{self.session_id}_{int(time.time())}" ) self.initialized = True logger.info(f"Enhanced AI chatbot initialized: provider={self.bot_ai_provider}, personality={self.bot_personality}, memory={self.bot_memory_enabled}") except Exception as e: logger.error(f"Failed to initialize AI features: {e}") self.initialized = False async def generate_response(self, message: str) -> str: """Generate a response using AI provider with personality and context.""" if not self.initialized or not self.ai_provider: return self._get_fallback_response(message) try: # Prepare conversation context if self.conversation_context: # Create a new AI conversation context with personality ai_context = ConversationContext( session_id=self.session_id, bot_name=AGENT_NAME, personality_prompt=self.personality.generate_system_prompt() if self.personality else None ) # Add personality system message if self.personality: ai_context.add_message(MessageRole.SYSTEM, self.personality.generate_system_prompt()) # Add conversation history context context_summary = context_manager.get_context_for_response(self.conversation_context.conversation_id) if context_summary: ai_context.add_message(MessageRole.SYSTEM, f"Conversation context: {context_summary}") else: # Simple context without memory ai_context = ConversationContext( session_id=self.session_id, bot_name=AGENT_NAME ) if self.personality: ai_context.add_message(MessageRole.SYSTEM, self.personality.generate_system_prompt()) # Generate response if self.bot_streaming: # For streaming, collect the full response response_parts = [] async for chunk in self.ai_provider.stream_response(ai_context, message): response_parts.append(chunk) response = "".join(response_parts) else: response = await self.ai_provider.generate_response(ai_context, message) # Store conversation turn in context manager if self.conversation_context: context_manager.add_conversation_turn( conversation_id=self.conversation_context.conversation_id, user_message=message, bot_response=response, context_used={"ai_provider": self.bot_ai_provider, "personality": self.bot_personality}, metadata={"timestamp": time.time(), "streaming": self.bot_streaming} ) return response except Exception as e: logger.error(f"AI response generation failed: {e}") return self._get_fallback_response(message, error=True) def _get_fallback_response(self, message: str, error: bool = False) -> str: """Get fallback response when AI providers are unavailable.""" if error: return FALLBACK_RESPONSES["error"][hash(message) % len(FALLBACK_RESPONSES["error"])] message_lower = message.lower() # Simple keyword matching for fallback responses if any(word in message_lower for word in ["hello", "hi", "hey", "greetings"]): return FALLBACK_RESPONSES["greeting"][hash(message) % len(FALLBACK_RESPONSES["greeting"])] elif any(word in message_lower for word in ["help", "what can you do", "capabilities"]): return FALLBACK_RESPONSES["help"][hash(message) % len(FALLBACK_RESPONSES["help"])] elif any(word in message_lower for word in ["features", "abilities", "advanced"]): return FALLBACK_RESPONSES["capabilities"][hash(message) % len(FALLBACK_RESPONSES["capabilities"])] else: return FALLBACK_RESPONSES["default"][hash(message) % len(FALLBACK_RESPONSES["default"])] async def health_check(self) -> Dict[str, Any]: """Perform health check on bot components.""" health = { "bot_name": AGENT_NAME, "session_id": self.session_id, "initialized": self.initialized, "ai_providers_available": AI_PROVIDERS_AVAILABLE, "configuration": { "personality": BOT_PERSONALITY, "ai_provider": BOT_AI_PROVIDER, "streaming": BOT_STREAMING, "memory_enabled": BOT_MEMORY_ENABLED } } if self.initialized and self.ai_provider: try: provider_healthy = await self.ai_provider.health_check() health["ai_provider_status"] = "healthy" if provider_healthy else "unhealthy" except Exception as e: health["ai_provider_status"] = f"error: {e}" if self.personality: health["personality_loaded"] = True health["personality_traits"] = [trait.value for trait in self.personality.traits] if self.conversation_context: health["conversation_turns"] = len(self.conversation_context.turns) health["context_summary"] = self.conversation_context.get_conversation_summary() return health async def generate_streaming_response(self, message: str) -> AsyncGenerator[str, None]: """Generate a streaming response, yielding partial responses as chunks arrive.""" if not self.initialized or not self.ai_provider: yield self._get_fallback_response(message) return try: # Prepare conversation context (same as generate_response) if self.conversation_context: # Create a new AI conversation context with personality ai_context = ConversationContext( session_id=self.session_id, bot_name=AGENT_NAME, personality_prompt=self.personality.generate_system_prompt() if self.personality else None ) # Add personality system message if self.personality: ai_context.add_message(MessageRole.SYSTEM, self.personality.generate_system_prompt()) # Add conversation history context context_summary = context_manager.get_context_for_response(self.conversation_context.conversation_id) if context_summary: ai_context.add_message(MessageRole.SYSTEM, f"Conversation context: {context_summary}") else: # Simple context without memory ai_context = ConversationContext( session_id=self.session_id, bot_name=AGENT_NAME ) if self.personality: ai_context.add_message(MessageRole.SYSTEM, self.personality.generate_system_prompt()) # Stream the response accumulated_response = "" chunk_count = 0 async for chunk in self.ai_provider.stream_response(ai_context, message): accumulated_response += chunk chunk_count += 1 logger.info(f"AI provider yielded chunk {chunk_count}: '{chunk}' (accumulated: {len(accumulated_response)} chars)") yield accumulated_response # Store conversation turn in context manager after streaming is complete if self.conversation_context: context_manager.add_conversation_turn( conversation_id=self.conversation_context.conversation_id, user_message=message, bot_response=accumulated_response, context_used={"ai_provider": self.bot_ai_provider, "personality": self.bot_personality}, metadata={"timestamp": time.time(), "streaming": True} ) except Exception as e: logger.error(f"AI streaming response generation failed: {e}") yield self._get_fallback_response(message, error=True) # Global bot instance _bot_instance: Optional[EnhancedAIChatbot] = None def agent_info() -> Dict[str, str]: """Return agent information.""" return { "name": AGENT_NAME, "description": AGENT_DESCRIPTION, "has_media": "false", "configurable": "true", # This bot supports per-lobby configuration "features": [ "multi_provider_ai", "personality_system", "conversation_memory", "streaming_responses", "health_monitoring", "per_lobby_config" ] } def create_agent_tracks(session_name: str) -> Dict[str, MediaStreamTrack]: """AI chatbot doesn't provide media tracks - it's chat-only.""" return {} async def handle_chat_message( chat_message: ChatMessageModel, send_message_func: Callable[[Union[str, ChatMessageModel]], Awaitable[None]] ) -> Optional[str]: """Handle incoming chat messages and provide AI-powered responses.""" global _bot_instance try: # Initialize bot instance if needed if _bot_instance is None: _bot_instance = EnhancedAIChatbot(chat_message.sender_name) logger.info(f"Initialized enhanced AI chatbot for session: {chat_message.sender_name}") if _bot_instance.bot_streaming: # Handle streaming response logger.info(f"Using streaming response path, bot_streaming={_bot_instance.bot_streaming}") return await _handle_streaming_response(chat_message, send_message_func) else: # Generate non-streaming response logger.info(f"Using non-streaming response path, bot_streaming={_bot_instance.bot_streaming}") response = await _bot_instance.generate_response(chat_message.message) # Send response if response: await send_message_func(response) logger.info(f"AI Chatbot responded to {chat_message.sender_name}: {response[:100]}...") return response except Exception as e: logger.error(f"Error in AI chatbot: {e}") error_response = "I apologize, but I encountered an error. Please try again." await send_message_func(error_response) return error_response async def _handle_streaming_response( chat_message: ChatMessageModel, send_message_func: Callable[[Union[str, ChatMessageModel]], Awaitable[None]] ) -> Optional[str]: """Handle streaming response by sending updates as chunks arrive.""" global _bot_instance logger.info("Starting _handle_streaming_response") message_id = None try: # Generate a unique message ID for this streaming response message_id = secrets.token_hex(8) # Get the client's session_id from the bound method client = getattr(send_message_func, '__self__', None) client_session_id = client.session_id if client else chat_message.sender_session_id # Send initial empty message to establish the message in the chat initial_message = ChatMessageModel( id=str(message_id), message="", sender_name=_bot_instance.session_name if _bot_instance else "AI Chatbot", sender_session_id=client_session_id, timestamp=time.time(), lobby_id=chat_message.lobby_id, ) await send_message_func(initial_message) logger.info(f"Started streaming response with message ID: {message_id}") # Check if bot instance exists if not _bot_instance: error_msg = "Bot instance not available for streaming" update_message = ChatMessageModel( id=str(message_id), message=error_msg, sender_name="AI Chatbot", sender_session_id=client_session_id, timestamp=time.time(), lobby_id=chat_message.lobby_id, ) await send_message_func(update_message) return error_msg # Stream the response final_response = "" chunk_count = 0 async for partial_response in _bot_instance.generate_streaming_response(chat_message.message): final_response = partial_response chunk_count += 1 logger.info(f"Sending streaming chunk {chunk_count}: {partial_response[:50]}...") update_message = ChatMessageModel( id=str(message_id), message=partial_response, sender_name=_bot_instance.session_name, sender_session_id=client_session_id, timestamp=time.time(), lobby_id=chat_message.lobby_id, ) await send_message_func(update_message) logger.info(f"Completed streaming response to {chat_message.sender_name}: {final_response[:100]}...") return final_response except Exception as e: logger.error(f"Error in streaming response: {e}") error_response = "I apologize, but I encountered an error. Please try again." # Try to update the existing message with the error, or send a new one try: client = getattr(send_message_func, '__self__', None) client_session_id = client.session_id if client else chat_message.sender_session_id error_message = ChatMessageModel( id=str(message_id), message=error_response, sender_name=_bot_instance.session_name if _bot_instance else "AI Chatbot", sender_session_id=client_session_id, timestamp=time.time(), lobby_id=chat_message.lobby_id, ) await send_message_func(error_message) except (NameError, TypeError, AttributeError): # If message_id is not defined or other issues, send as string await send_message_func(error_response) return error_response async def get_bot_status() -> Dict[str, Any]: """Get detailed bot status and health information.""" global _bot_instance status = { "agent_name": AGENT_NAME, "agent_description": AGENT_DESCRIPTION, "features_available": AI_PROVIDERS_AVAILABLE, "configuration": { "personality_template": BOT_PERSONALITY, "ai_provider": BOT_AI_PROVIDER, "streaming_enabled": BOT_STREAMING, "memory_enabled": BOT_MEMORY_ENABLED } } if _bot_instance: health_info = await _bot_instance.health_check() status.update(health_info) else: status["instance_status"] = "not_initialized" # Add system-level information if AI_PROVIDERS_AVAILABLE: status["available_personalities"] = [ template.id for template in personality_manager.list_templates() ] status["available_providers"] = ai_provider_manager.list_providers() # Get context manager statistics if BOT_MEMORY_ENABLED: context_stats = context_manager.get_statistics() status["conversation_statistics"] = context_stats return status # Additional helper functions for advanced features async def switch_personality(bot_instance: EnhancedAIChatbot, personality_id: str) -> bool: """Switch bot personality at runtime.""" if not AI_PROVIDERS_AVAILABLE or not bot_instance: return False try: new_personality = personality_manager.create_personality_from_template(personality_id) if new_personality: bot_instance.personality = new_personality bot_instance.bot_personality = personality_id logger.info(f"Switched to personality: {personality_id}") return True except Exception as e: logger.error(f"Failed to switch personality: {e}") return False async def switch_ai_provider(bot_instance: EnhancedAIChatbot, provider_type: str) -> bool: """Switch AI provider at runtime.""" logger.info(f"Switching AI provider to: {provider_type}") if not AI_PROVIDERS_AVAILABLE: logger.error("AI providers not available") return False try: # If instance exists, switch its provider if bot_instance: logger.info("Switching existing bot instance provider") provider_enum = AIProviderType(provider_type) new_provider = ai_provider_manager.create_provider(provider_enum) bot_instance.ai_provider = new_provider bot_instance.bot_ai_provider = provider_type logger.info(f"Switched existing instance to AI provider: {provider_type}") else: logger.info("No existing bot instance to switch") return True except Exception as e: logger.error(f"Failed to switch AI provider to {provider_type}: {e}") return False def get_config_schema() -> Dict[str, Any]: """Get the configuration schema for this bot""" return { "bot_name": AGENT_NAME, "version": "1.0", "parameters": [ { "name": "personality", "type": "select", "label": "Bot Personality", "description": "The personality and communication style of the bot", "default_value": "helpful_assistant", "required": False, "options": [ {"value": "helpful_assistant", "label": "Helpful Assistant"}, {"value": "technical_expert", "label": "Technical Expert"}, {"value": "creative_companion", "label": "Creative Companion"}, {"value": "business_advisor", "label": "Business Advisor"}, {"value": "comedy_bot", "label": "Comedy Bot"}, {"value": "wise_mentor", "label": "Wise Mentor"} ] }, { "name": "ai_provider", "type": "select", "label": "AI Provider", "description": "The AI service to use for generating responses", "default_value": "openai", "required": False, "options": [ {"value": "openai", "label": "OpenAI (GPT)"}, {"value": "anthropic", "label": "Anthropic (Claude)"}, {"value": "local", "label": "Local Model"} ] }, { "name": "streaming", "type": "boolean", "label": "Streaming Responses", "description": "Enable real-time streaming of responses as they are generated", "default_value": False, "required": False }, { "name": "memory_enabled", "type": "boolean", "label": "Conversation Memory", "description": "Remember conversation context and history", "default_value": True, "required": False }, { "name": "response_length", "type": "select", "label": "Response Length", "description": "Preferred length of bot responses", "default_value": "medium", "required": False, "options": [ {"value": "short", "label": "Short & Concise"}, {"value": "medium", "label": "Medium Length"}, {"value": "long", "label": "Detailed & Comprehensive"} ] }, { "name": "creativity_level", "type": "range", "label": "Creativity Level", "description": "How creative and varied the responses should be (0-100)", "default_value": 50, "required": False, "min_value": 0, "max_value": 100, "step": 10 }, { "name": "response_style", "type": "select", "label": "Response Style", "description": "The communication style for responses", "default_value": "conversational", "required": False, "options": [ {"value": "formal", "label": "Formal & Professional"}, {"value": "conversational", "label": "Conversational & Friendly"}, {"value": "casual", "label": "Casual & Relaxed"}, {"value": "academic", "label": "Academic & Technical"} ] } ], "categories": [ {"AI Settings": ["personality", "ai_provider", "streaming"]}, {"Behavior Settings": ["memory_enabled", "response_length", "creativity_level"]}, {"Communication Style": ["response_style"]} ] } async def handle_config_update(lobby_id: str, config_values: Dict[str, Any]) -> bool: """Handle configuration update for a specific lobby""" global _bot_instance try: logger.info(f"Updating config for lobby {lobby_id}: {config_values}") # Get the bot instance (create if doesn't exist) if _bot_instance is None: _bot_instance = EnhancedAIChatbot("AI Chatbot") # Apply configuration changes config_applied = False if "personality" in config_values: success = await switch_personality(_bot_instance, config_values["personality"]) if success: config_applied = True logger.info(f"Applied personality: {config_values['personality']}") if "ai_provider" in config_values: success = await switch_ai_provider(_bot_instance, config_values["ai_provider"]) if success: config_applied = True logger.info(f"Applied AI provider: {config_values['ai_provider']}") else: logger.warning(f"Failed to apply AI provider: {config_values['ai_provider']}") if "streaming" in config_values: _bot_instance.bot_streaming = bool(config_values["streaming"]) config_applied = True logger.info(f"Applied streaming: {_bot_instance.bot_streaming}") if "memory_enabled" in config_values: _bot_instance.bot_memory_enabled = bool(config_values["memory_enabled"]) config_applied = True logger.info(f"Applied memory: {_bot_instance.bot_memory_enabled}") # Store other configuration values for use in response generation if not hasattr(_bot_instance, 'lobby_configs'): _bot_instance.lobby_configs = {} _bot_instance.lobby_configs[lobby_id] = config_values config_applied = True return config_applied except Exception as e: logger.error(f"Failed to apply config update: {e}") return False