From 15641aa5428ac101e09948552ad7d90fb95afb80 Mon Sep 17 00:00:00 2001 From: James Ketrenos Date: Thu, 4 Sep 2025 17:53:26 -0700 Subject: [PATCH] Improved bot capabilities --- server/api/monitoring.py | 4 +- server/core/cache.py | 5 +- server/core/error_handling.py | 2 +- server/core/health.py | 3 +- server/core/performance.py | 1 - server/websocket/message_handlers.py | 7 +- server/websocket/webrtc_signaling.py | 7 +- voicebot/ai_providers.py | 514 +++++++++++++++++++++++++++ voicebot/bots/ai_chatbot.py | 359 +++++++++++++++++++ voicebot/conversation_context.py | 385 ++++++++++++++++++++ voicebot/enhanced_bot_configs.json | 85 +++++ voicebot/personality_system.py | 449 +++++++++++++++++++++++ voicebot/step_5b_integration_demo.py | 477 +++++++++++++++++++++++++ voicebot/test_step_5b.py | 195 ++++++++++ 14 files changed, 2473 insertions(+), 20 deletions(-) create mode 100644 voicebot/ai_providers.py create mode 100644 voicebot/bots/ai_chatbot.py create mode 100644 voicebot/conversation_context.py create mode 100644 voicebot/enhanced_bot_configs.json create mode 100644 voicebot/personality_system.py create mode 100644 voicebot/step_5b_integration_demo.py create mode 100644 voicebot/test_step_5b.py diff --git a/server/api/monitoring.py b/server/api/monitoring.py index 5920ed1..698f64c 100644 --- a/server/api/monitoring.py +++ b/server/api/monitoring.py @@ -14,9 +14,9 @@ Endpoints: - /api/system/info - System information """ -from typing import Dict, Any, Optional +from typing import Optional from fastapi import APIRouter, HTTPException, Query -from datetime import datetime, timedelta +from datetime import datetime from logger import logger diff --git a/server/core/cache.py b/server/core/cache.py index b7ed907..1ce81cc 100644 --- a/server/core/cache.py +++ b/server/core/cache.py @@ -14,14 +14,11 @@ Features: """ import asyncio -import time -import json import hashlib from datetime import datetime, timedelta -from typing import Any, Dict, Optional, List, Union, Callable, TypeVar +from typing import Any, Dict, Optional, Callable, TypeVar from collections import OrderedDict from dataclasses import dataclass -import weakref from logger import logger diff --git a/server/core/error_handling.py b/server/core/error_handling.py index 2ec6e69..4a967e0 100644 --- a/server/core/error_handling.py +++ b/server/core/error_handling.py @@ -10,7 +10,7 @@ import asyncio import time import traceback from enum import Enum -from typing import Any, Callable, Dict, List, Optional, TypeVar, Generic +from typing import Any, Callable, Dict, List, Optional, TypeVar from functools import wraps from dataclasses import dataclass from fastapi import WebSocket diff --git a/server/core/health.py b/server/core/health.py index e828bd9..14e4cf5 100644 --- a/server/core/health.py +++ b/server/core/health.py @@ -15,9 +15,8 @@ Features: import asyncio import time from datetime import datetime, timedelta -from typing import Dict, Any, List, Optional, Callable, NamedTuple +from typing import Dict, Any, List, Optional, NamedTuple from enum import Enum -import json from logger import logger diff --git a/server/core/performance.py b/server/core/performance.py index 75cfd2a..8cfcbcb 100644 --- a/server/core/performance.py +++ b/server/core/performance.py @@ -21,7 +21,6 @@ from typing import Dict, Any, List, Optional, Callable from dataclasses import dataclass, field from collections import defaultdict, deque from contextlib import asynccontextmanager -import weakref from logger import logger diff --git a/server/websocket/message_handlers.py b/server/websocket/message_handlers.py index f009f2d..bb835ce 100644 --- a/server/websocket/message_handlers.py +++ b/server/websocket/message_handlers.py @@ -12,11 +12,10 @@ from fastapi import WebSocket from logger import logger from .webrtc_signaling import WebRTCSignalingHandlers from core.error_handling import ( - error_handler, - WebSocketError, + error_handler, + WebSocketError, ValidationError, - with_websocket_error_handling, - ErrorSeverity + ErrorSeverity, ) if TYPE_CHECKING: diff --git a/server/websocket/webrtc_signaling.py b/server/websocket/webrtc_signaling.py index 5668848..8a0123a 100644 --- a/server/websocket/webrtc_signaling.py +++ b/server/websocket/webrtc_signaling.py @@ -9,12 +9,7 @@ from typing import Any, Dict, TYPE_CHECKING from fastapi import WebSocket from logger import logger -from core.error_handling import ( - with_webrtc_error_handling, - WebRTCError, - ErrorSeverity, - error_handler -) +from core.error_handling import with_webrtc_error_handling if TYPE_CHECKING: from core.session_manager import Session diff --git a/voicebot/ai_providers.py b/voicebot/ai_providers.py new file mode 100644 index 0000000..b27aa5d --- /dev/null +++ b/voicebot/ai_providers.py @@ -0,0 +1,514 @@ +""" +AI Provider Integration for Advanced Bot Management. + +This module provides support for multiple AI providers including OpenAI, Anthropic, +and local models for enhanced bot capabilities. +""" + +import os +import time +import asyncio +from abc import ABC, abstractmethod +from typing import Dict, List, Optional, Any, AsyncIterator +from enum import Enum +from dataclasses import dataclass +from pydantic import BaseModel, Field + +from logger import logger + + +class AIProviderType(str, Enum): + """Supported AI provider types.""" + OPENAI = "openai" + ANTHROPIC = "anthropic" + LOCAL = "local" + CUSTOM = "custom" + + +class MessageRole(str, Enum): + """Message roles in conversation.""" + SYSTEM = "system" + USER = "user" + ASSISTANT = "assistant" + + +@dataclass +class ConversationMessage: + """Individual message in a conversation.""" + role: MessageRole + content: str + timestamp: float = None + metadata: Dict[str, Any] = None + + def __post_init__(self): + if self.timestamp is None: + self.timestamp = time.time() + if self.metadata is None: + self.metadata = {} + + +class AIProviderConfig(BaseModel): + """Configuration for AI providers.""" + provider_type: AIProviderType + api_key: Optional[str] = None + base_url: Optional[str] = None + model: str = "gpt-3.5-turbo" + max_tokens: int = 1000 + temperature: float = 0.7 + timeout: float = 30.0 + retry_attempts: int = 3 + retry_delay: float = 1.0 + + # Advanced settings + top_p: Optional[float] = None + frequency_penalty: Optional[float] = None + presence_penalty: Optional[float] = None + stop_sequences: List[str] = Field(default_factory=list) + + class Config: + extra = "allow" # Allow additional provider-specific configs + + +class ConversationContext(BaseModel): + """Conversation context and memory management.""" + session_id: str + bot_name: str + messages: List[ConversationMessage] = Field(default_factory=list) + created_at: float = Field(default_factory=time.time) + last_updated: float = Field(default_factory=time.time) + + # Context management + max_history: int = 50 + context_window: int = 4000 # Token limit for context + personality_prompt: Optional[str] = None + + # Metadata + user_preferences: Dict[str, Any] = Field(default_factory=dict) + conversation_state: Dict[str, Any] = Field(default_factory=dict) + + def add_message(self, role: MessageRole, content: str, metadata: Dict[str, Any] = None): + """Add a message to the conversation.""" + message = ConversationMessage(role=role, content=content, metadata=metadata or {}) + self.messages.append(message) + self.last_updated = time.time() + + # Trim history if needed + if len(self.messages) > self.max_history: + # Keep system messages and recent messages + system_messages = [m for m in self.messages if m.role == MessageRole.SYSTEM] + recent_messages = [m for m in self.messages if m.role != MessageRole.SYSTEM][-self.max_history:] + self.messages = system_messages + recent_messages + + def get_context_messages(self) -> List[Dict[str, str]]: + """Get messages formatted for AI provider APIs.""" + messages = [] + for msg in self.messages: + messages.append({ + "role": msg.role.value, + "content": msg.content + }) + return messages + + +class AIProvider(ABC): + """Abstract base class for AI providers.""" + + def __init__(self, config: AIProviderConfig): + self.config = config + self.provider_type = config.provider_type + + @abstractmethod + async def generate_response( + self, + context: ConversationContext, + message: str + ) -> str: + """Generate a response to a message.""" + pass + + @abstractmethod + async def stream_response( + self, + context: ConversationContext, + message: str + ) -> AsyncIterator[str]: + """Stream a response to a message.""" + pass + + @abstractmethod + async def health_check(self) -> bool: + """Check if the provider is healthy and available.""" + pass + + +class OpenAIProvider(AIProvider): + """OpenAI provider implementation.""" + + def __init__(self, config: AIProviderConfig): + super().__init__(config) + self._client = None + + def _get_client(self): + """Lazy initialization of OpenAI client.""" + if self._client is None: + try: + import openai + self._client = openai.AsyncOpenAI( + api_key=self.config.api_key or os.getenv("OPENAI_API_KEY"), + base_url=self.config.base_url, + timeout=self.config.timeout + ) + except ImportError: + raise ImportError("OpenAI package not installed. Install with: pip install openai") + return self._client + + async def generate_response(self, context: ConversationContext, message: str) -> str: + """Generate response using OpenAI API.""" + client = self._get_client() + + # Add user message to context + context.add_message(MessageRole.USER, message) + + messages = context.get_context_messages() + + for attempt in range(self.config.retry_attempts): + try: + response = await client.chat.completions.create( + model=self.config.model, + messages=messages, + max_tokens=self.config.max_tokens, + temperature=self.config.temperature, + top_p=self.config.top_p, + frequency_penalty=self.config.frequency_penalty, + presence_penalty=self.config.presence_penalty, + stop=self.config.stop_sequences or None + ) + + response_text = response.choices[0].message.content + context.add_message(MessageRole.ASSISTANT, response_text) + return response_text + + except Exception as e: + logger.warning(f"OpenAI API attempt {attempt + 1} failed: {e}") + if attempt < self.config.retry_attempts - 1: + await asyncio.sleep(self.config.retry_delay * (2 ** attempt)) + else: + raise + + async def stream_response(self, context: ConversationContext, message: str) -> AsyncIterator[str]: + """Stream response using OpenAI API.""" + client = self._get_client() + + context.add_message(MessageRole.USER, message) + messages = context.get_context_messages() + + try: + stream = await client.chat.completions.create( + model=self.config.model, + messages=messages, + max_tokens=self.config.max_tokens, + temperature=self.config.temperature, + stream=True + ) + + full_response = "" + async for chunk in stream: + if chunk.choices[0].delta.content: + content = chunk.choices[0].delta.content + full_response += content + yield content + + # Add complete response to context + context.add_message(MessageRole.ASSISTANT, full_response) + + except Exception as e: + logger.error(f"OpenAI streaming failed: {e}") + raise + + async def health_check(self) -> bool: + """Check OpenAI API health.""" + try: + client = self._get_client() + await client.models.list() + return True + except Exception as e: + logger.warning(f"OpenAI health check failed: {e}") + return False + + +class AnthropicProvider(AIProvider): + """Anthropic Claude provider implementation.""" + + def __init__(self, config: AIProviderConfig): + super().__init__(config) + self._client = None + + def _get_client(self): + """Lazy initialization of Anthropic client.""" + if self._client is None: + try: + import anthropic + self._client = anthropic.AsyncAnthropic( + api_key=self.config.api_key or os.getenv("ANTHROPIC_API_KEY"), + timeout=self.config.timeout + ) + except ImportError: + raise ImportError("Anthropic package not installed. Install with: pip install anthropic") + return self._client + + async def generate_response(self, context: ConversationContext, message: str) -> str: + """Generate response using Anthropic API.""" + client = self._get_client() + + context.add_message(MessageRole.USER, message) + + # Convert messages for Anthropic format + messages = [] + system_prompt = None + + for msg in context.messages: + if msg.role == MessageRole.SYSTEM: + system_prompt = msg.content + else: + messages.append({ + "role": msg.role.value, + "content": msg.content + }) + + for attempt in range(self.config.retry_attempts): + try: + kwargs = { + "model": self.config.model, + "messages": messages, + "max_tokens": self.config.max_tokens, + "temperature": self.config.temperature, + } + + if system_prompt: + kwargs["system"] = system_prompt + + response = await client.messages.create(**kwargs) + response_text = response.content[0].text + context.add_message(MessageRole.ASSISTANT, response_text) + return response_text + + except Exception as e: + logger.warning(f"Anthropic API attempt {attempt + 1} failed: {e}") + if attempt < self.config.retry_attempts - 1: + await asyncio.sleep(self.config.retry_delay * (2 ** attempt)) + else: + raise + + async def stream_response(self, context: ConversationContext, message: str) -> AsyncIterator[str]: + """Stream response using Anthropic API.""" + client = self._get_client() + + context.add_message(MessageRole.USER, message) + messages = context.get_context_messages() + + try: + async with client.messages.stream( + model=self.config.model, + messages=messages, + max_tokens=self.config.max_tokens, + temperature=self.config.temperature + ) as stream: + full_response = "" + async for text in stream.text_stream: + full_response += text + yield text + + context.add_message(MessageRole.ASSISTANT, full_response) + + except Exception as e: + logger.error(f"Anthropic streaming failed: {e}") + raise + + async def health_check(self) -> bool: + """Check Anthropic API health.""" + try: + client = self._get_client() + # Simple test to verify API connectivity + await client.messages.create( + model=self.config.model, + messages=[{"role": "user", "content": "test"}], + max_tokens=1 + ) + return True + except Exception as e: + logger.warning(f"Anthropic health check failed: {e}") + return False + + +class LocalProvider(AIProvider): + """Local model provider (e.g., Ollama, llama.cpp).""" + + def __init__(self, config: AIProviderConfig): + super().__init__(config) + self.base_url = config.base_url or "http://localhost:11434" + + async def generate_response(self, context: ConversationContext, message: str) -> str: + """Generate response using local model API.""" + context.add_message(MessageRole.USER, message) + + import aiohttp + + async with aiohttp.ClientSession() as session: + payload = { + "model": self.config.model, + "messages": context.get_context_messages(), + "stream": False, + "options": { + "temperature": self.config.temperature, + "num_predict": self.config.max_tokens + } + } + + try: + async with session.post( + f"{self.base_url}/api/chat", + json=payload, + timeout=aiohttp.ClientTimeout(total=self.config.timeout) + ) as resp: + if resp.status == 200: + result = await resp.json() + response_text = result["message"]["content"] + context.add_message(MessageRole.ASSISTANT, response_text) + return response_text + else: + raise Exception(f"Local API returned status {resp.status}") + + except Exception as e: + logger.error(f"Local provider failed: {e}") + raise + + async def stream_response(self, context: ConversationContext, message: str) -> AsyncIterator[str]: + """Stream response using local model API.""" + context.add_message(MessageRole.USER, message) + + import aiohttp + + async with aiohttp.ClientSession() as session: + payload = { + "model": self.config.model, + "messages": context.get_context_messages(), + "stream": True + } + + try: + async with session.post( + f"{self.base_url}/api/chat", + json=payload, + timeout=aiohttp.ClientTimeout(total=self.config.timeout) + ) as resp: + if resp.status == 200: + full_response = "" + async for line in resp.content: + if line: + import json + try: + data = json.loads(line.decode()) + if "message" in data and "content" in data["message"]: + content = data["message"]["content"] + full_response += content + yield content + except json.JSONDecodeError: + continue + + context.add_message(MessageRole.ASSISTANT, full_response) + else: + raise Exception(f"Local API returned status {resp.status}") + + except Exception as e: + logger.error(f"Local provider streaming failed: {e}") + raise + + async def health_check(self) -> bool: + """Check local model health.""" + try: + import aiohttp + async with aiohttp.ClientSession() as session: + async with session.get( + f"{self.base_url}/api/tags", + timeout=aiohttp.ClientTimeout(total=5) + ) as resp: + return resp.status == 200 + except Exception as e: + logger.warning(f"Local provider health check failed: {e}") + return False + + +class AIProviderManager: + """Manager for AI providers and configurations.""" + + def __init__(self): + self.providers: Dict[str, AIProvider] = {} + self.default_configs = self._load_default_configs() + + def _load_default_configs(self) -> Dict[AIProviderType, AIProviderConfig]: + """Load default configurations for providers.""" + return { + AIProviderType.OPENAI: AIProviderConfig( + provider_type=AIProviderType.OPENAI, + model=os.getenv("OPENAI_MODEL", "gpt-3.5-turbo"), + max_tokens=int(os.getenv("OPENAI_MAX_TOKENS", "1000")), + temperature=float(os.getenv("OPENAI_TEMPERATURE", "0.7")) + ), + AIProviderType.ANTHROPIC: AIProviderConfig( + provider_type=AIProviderType.ANTHROPIC, + model=os.getenv("ANTHROPIC_MODEL", "claude-3-sonnet-20240229"), + max_tokens=int(os.getenv("ANTHROPIC_MAX_TOKENS", "1000")), + temperature=float(os.getenv("ANTHROPIC_TEMPERATURE", "0.7")) + ), + AIProviderType.LOCAL: AIProviderConfig( + provider_type=AIProviderType.LOCAL, + base_url=os.getenv("LOCAL_MODEL_URL", "http://localhost:11434"), + model=os.getenv("LOCAL_MODEL_NAME", "llama2"), + max_tokens=int(os.getenv("LOCAL_MAX_TOKENS", "1000")), + temperature=float(os.getenv("LOCAL_TEMPERATURE", "0.7")) + ) + } + + def create_provider(self, provider_type: AIProviderType, config: Optional[AIProviderConfig] = None) -> AIProvider: + """Create an AI provider instance.""" + if config is None: + config = self.default_configs.get(provider_type) + if config is None: + raise ValueError(f"No default config for provider type: {provider_type}") + + if provider_type == AIProviderType.OPENAI: + return OpenAIProvider(config) + elif provider_type == AIProviderType.ANTHROPIC: + return AnthropicProvider(config) + elif provider_type == AIProviderType.LOCAL: + return LocalProvider(config) + else: + raise ValueError(f"Unsupported provider type: {provider_type}") + + def register_provider(self, name: str, provider: AIProvider): + """Register a provider instance.""" + self.providers[name] = provider + logger.info(f"Registered AI provider: {name} ({provider.provider_type})") + + def get_provider(self, name: str) -> Optional[AIProvider]: + """Get a registered provider.""" + return self.providers.get(name) + + def list_providers(self) -> List[str]: + """List all registered provider names.""" + return list(self.providers.keys()) + + async def health_check_all(self) -> Dict[str, bool]: + """Health check all registered providers.""" + results = {} + for name, provider in self.providers.items(): + try: + results[name] = await provider.health_check() + except Exception as e: + logger.error(f"Health check failed for provider {name}: {e}") + results[name] = False + return results + + +# Global provider manager instance +ai_provider_manager = AIProviderManager() diff --git a/voicebot/bots/ai_chatbot.py b/voicebot/bots/ai_chatbot.py new file mode 100644 index 0000000..0310d3c --- /dev/null +++ b/voicebot/bots/ai_chatbot.py @@ -0,0 +1,359 @@ +"""Enhanced AI Chatbot with Multi-Provider Support and Personality System. + +This bot demonstrates the advanced capabilities including: +- Multi-provider AI integration (OpenAI, Anthropic, Local models) +- Personality system with configurable traits +- Conversation context and memory management +- Enhanced response generation with streaming support +""" + +import os +import time +import uuid +from typing import Dict, Optional, Callable, Awaitable, Any +from aiortc import MediaStreamTrack + +# Import system modules +import sys +sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) + +from logger import logger +from shared.models import ChatMessageModel + +# Import advanced bot management modules +try: + from voicebot.ai_providers import ( + AIProviderType, AIProviderConfig, ai_provider_manager, + ConversationContext, MessageRole + ) + from voicebot.personality_system import personality_manager, PersonalityTrait, CommunicationStyle + from voicebot.conversation_context import context_manager + AI_PROVIDERS_AVAILABLE = True +except ImportError as e: + logger.warning(f"Advanced AI features not available: {e}") + AI_PROVIDERS_AVAILABLE = False + + +AGENT_NAME = "ai_chatbot" +AGENT_DESCRIPTION = "Advanced AI chatbot with multi-provider support, personality system, and conversation memory" + +# Bot configuration from environment +BOT_PERSONALITY = os.getenv("AI_CHATBOT_PERSONALITY", "helpful_assistant") +BOT_AI_PROVIDER = os.getenv("AI_CHATBOT_PROVIDER", "openai") +BOT_STREAMING = os.getenv("AI_CHATBOT_STREAMING", "false").lower() == "true" +BOT_MEMORY_ENABLED = os.getenv("AI_CHATBOT_MEMORY", "true").lower() == "true" + +# Fallback responses when AI providers are not available +FALLBACK_RESPONSES = { + "greeting": [ + "Hello! I'm an AI chatbot ready to help you.", + "Hi there! How can I assist you today?", + "Greetings! I'm here to chat and help with any questions you have." + ], + "help": [ + "I'm an advanced AI chatbot that can help with various topics. Just ask me anything!", + "I can assist with questions, have conversations, and provide information on many subjects.", + "Feel free to ask me questions or just chat - I'm here to help!" + ], + "capabilities": [ + "I support multiple AI providers, have configurable personalities, and maintain conversation context.", + "My capabilities include natural conversation, information retrieval, and adaptive personality responses.", + "I can remember our conversation context and adapt my responses based on configured personality traits." + ], + "default": [ + "That's interesting! Tell me more about that.", + "I understand. What would you like to discuss next?", + "Thanks for sharing! How can I help you further?", + "I see. Is there anything specific you'd like to know about?" + ], + "error": [ + "I apologize, but I'm having trouble processing that right now. Could you try rephrasing?", + "Something went wrong on my end. Could you ask that again?", + "I encountered an issue. Please try your question again." + ] +} + + +class EnhancedAIChatbot: + """Enhanced AI chatbot with advanced features.""" + + def __init__(self, session_name: str): + self.session_name = session_name + self.session_id = str(uuid.uuid4()) + self.ai_provider = None + self.personality = None + self.conversation_context = None + self.initialized = False + + # Initialize advanced features if available + if AI_PROVIDERS_AVAILABLE: + self._initialize_ai_features() + else: + logger.warning("Running in fallback mode - advanced AI features disabled") + + def _initialize_ai_features(self): + """Initialize AI provider, personality, and context management.""" + try: + # Initialize personality + self.personality = personality_manager.create_personality_from_template(BOT_PERSONALITY) + if not self.personality: + logger.warning(f"Personality template '{BOT_PERSONALITY}' not found, using default") + self.personality = personality_manager.create_personality_from_template("helpful_assistant") + + # Initialize AI provider + provider_type = AIProviderType(BOT_AI_PROVIDER) + self.ai_provider = ai_provider_manager.create_provider(provider_type) + ai_provider_manager.register_provider(f"{AGENT_NAME}_{self.session_id}", self.ai_provider) + + # Initialize conversation context if memory is enabled + if BOT_MEMORY_ENABLED: + self.conversation_context = context_manager.get_or_create_context( + session_id=self.session_id, + bot_name=AGENT_NAME, + conversation_id=f"{AGENT_NAME}_{self.session_id}_{int(time.time())}" + ) + + self.initialized = True + logger.info(f"Enhanced AI chatbot initialized: provider={BOT_AI_PROVIDER}, personality={BOT_PERSONALITY}, memory={BOT_MEMORY_ENABLED}") + + except Exception as e: + logger.error(f"Failed to initialize AI features: {e}") + self.initialized = False + + async def generate_response(self, message: str) -> str: + """Generate a response using AI provider with personality and context.""" + if not self.initialized or not self.ai_provider: + return self._get_fallback_response(message) + + try: + # Prepare conversation context + if self.conversation_context: + # Create a new AI conversation context with personality + ai_context = ConversationContext( + session_id=self.session_id, + bot_name=AGENT_NAME, + personality_prompt=self.personality.generate_system_prompt() if self.personality else None + ) + + # Add personality system message + if self.personality: + ai_context.add_message(MessageRole.SYSTEM, self.personality.generate_system_prompt()) + + # Add conversation history context + context_summary = context_manager.get_context_for_response(self.conversation_context.conversation_id) + if context_summary: + ai_context.add_message(MessageRole.SYSTEM, f"Conversation context: {context_summary}") + else: + # Simple context without memory + ai_context = ConversationContext( + session_id=self.session_id, + bot_name=AGENT_NAME + ) + if self.personality: + ai_context.add_message(MessageRole.SYSTEM, self.personality.generate_system_prompt()) + + # Generate response + if BOT_STREAMING: + # For streaming, collect the full response + response_parts = [] + async for chunk in self.ai_provider.stream_response(ai_context, message): + response_parts.append(chunk) + response = "".join(response_parts) + else: + response = await self.ai_provider.generate_response(ai_context, message) + + # Store conversation turn in context manager + if self.conversation_context: + context_manager.add_conversation_turn( + conversation_id=self.conversation_context.conversation_id, + user_message=message, + bot_response=response, + context_used={"ai_provider": BOT_AI_PROVIDER, "personality": BOT_PERSONALITY}, + metadata={"timestamp": time.time(), "streaming": BOT_STREAMING} + ) + + return response + + except Exception as e: + logger.error(f"AI response generation failed: {e}") + return self._get_fallback_response(message, error=True) + + def _get_fallback_response(self, message: str, error: bool = False) -> str: + """Get fallback response when AI providers are unavailable.""" + if error: + return FALLBACK_RESPONSES["error"][hash(message) % len(FALLBACK_RESPONSES["error"])] + + message_lower = message.lower() + + # Simple keyword matching for fallback responses + if any(word in message_lower for word in ["hello", "hi", "hey", "greetings"]): + return FALLBACK_RESPONSES["greeting"][hash(message) % len(FALLBACK_RESPONSES["greeting"])] + elif any(word in message_lower for word in ["help", "what can you do", "capabilities"]): + return FALLBACK_RESPONSES["help"][hash(message) % len(FALLBACK_RESPONSES["help"])] + elif any(word in message_lower for word in ["features", "abilities", "advanced"]): + return FALLBACK_RESPONSES["capabilities"][hash(message) % len(FALLBACK_RESPONSES["capabilities"])] + else: + return FALLBACK_RESPONSES["default"][hash(message) % len(FALLBACK_RESPONSES["default"])] + + async def health_check(self) -> Dict[str, Any]: + """Perform health check on bot components.""" + health = { + "bot_name": AGENT_NAME, + "session_id": self.session_id, + "initialized": self.initialized, + "ai_providers_available": AI_PROVIDERS_AVAILABLE, + "configuration": { + "personality": BOT_PERSONALITY, + "ai_provider": BOT_AI_PROVIDER, + "streaming": BOT_STREAMING, + "memory_enabled": BOT_MEMORY_ENABLED + } + } + + if self.initialized and self.ai_provider: + try: + provider_healthy = await self.ai_provider.health_check() + health["ai_provider_status"] = "healthy" if provider_healthy else "unhealthy" + except Exception as e: + health["ai_provider_status"] = f"error: {e}" + + if self.personality: + health["personality_loaded"] = True + health["personality_traits"] = [trait.value for trait in self.personality.traits] + + if self.conversation_context: + health["conversation_turns"] = len(self.conversation_context.turns) + health["context_summary"] = self.conversation_context.get_conversation_summary() + + return health + + +# Global bot instance +_bot_instance: Optional[EnhancedAIChatbot] = None + + +def agent_info() -> Dict[str, str]: + """Return agent information.""" + return { + "name": AGENT_NAME, + "description": AGENT_DESCRIPTION, + "has_media": "false", + "features": [ + "multi_provider_ai", + "personality_system", + "conversation_memory", + "streaming_responses", + "health_monitoring" + ] + } + + +def create_agent_tracks(session_name: str) -> Dict[str, MediaStreamTrack]: + """AI chatbot doesn't provide media tracks - it's chat-only.""" + return {} + + +async def handle_chat_message( + chat_message: ChatMessageModel, + send_message_func: Callable[[str], Awaitable[None]] +) -> Optional[str]: + """Handle incoming chat messages and provide AI-powered responses.""" + global _bot_instance + + try: + # Initialize bot instance if needed + if _bot_instance is None: + _bot_instance = EnhancedAIChatbot(chat_message.nick) + logger.info(f"Initialized enhanced AI chatbot for session: {chat_message.nick}") + + # Generate response + response = await _bot_instance.generate_response(chat_message.message) + + # Send response + if response: + await send_message_func(response) + logger.info(f"AI Chatbot responded to {chat_message.nick}: {response[:100]}...") + + return response + + except Exception as e: + logger.error(f"Error in AI chatbot: {e}") + error_response = "I apologize, but I encountered an error. Please try again." + await send_message_func(error_response) + return error_response + + +async def get_bot_status() -> Dict[str, Any]: + """Get detailed bot status and health information.""" + global _bot_instance + + status = { + "agent_name": AGENT_NAME, + "agent_description": AGENT_DESCRIPTION, + "features_available": AI_PROVIDERS_AVAILABLE, + "configuration": { + "personality_template": BOT_PERSONALITY, + "ai_provider": BOT_AI_PROVIDER, + "streaming_enabled": BOT_STREAMING, + "memory_enabled": BOT_MEMORY_ENABLED + } + } + + if _bot_instance: + health_info = await _bot_instance.health_check() + status.update(health_info) + else: + status["instance_status"] = "not_initialized" + + # Add system-level information + if AI_PROVIDERS_AVAILABLE: + status["available_personalities"] = [ + template.id for template in personality_manager.list_templates() + ] + status["available_providers"] = ai_provider_manager.list_providers() + + # Get context manager statistics + if BOT_MEMORY_ENABLED: + context_stats = context_manager.get_statistics() + status["conversation_statistics"] = context_stats + + return status + + +# Additional helper functions for advanced features +async def switch_personality(personality_id: str) -> bool: + """Switch bot personality at runtime.""" + global _bot_instance + + if not AI_PROVIDERS_AVAILABLE or not _bot_instance: + return False + + try: + new_personality = personality_manager.create_personality_from_template(personality_id) + if new_personality: + _bot_instance.personality = new_personality + logger.info(f"Switched to personality: {personality_id}") + return True + except Exception as e: + logger.error(f"Failed to switch personality: {e}") + + return False + + +async def switch_ai_provider(provider_type: str) -> bool: + """Switch AI provider at runtime.""" + global _bot_instance + + if not AI_PROVIDERS_AVAILABLE or not _bot_instance: + return False + + try: + provider_enum = AIProviderType(provider_type) + new_provider = ai_provider_manager.create_provider(provider_enum) + _bot_instance.ai_provider = new_provider + logger.info(f"Switched to AI provider: {provider_type}") + return True + except Exception as e: + logger.error(f"Failed to switch AI provider: {e}") + + return False diff --git a/voicebot/conversation_context.py b/voicebot/conversation_context.py new file mode 100644 index 0000000..757418c --- /dev/null +++ b/voicebot/conversation_context.py @@ -0,0 +1,385 @@ +""" +Conversation Context Management for Advanced Bot Management. + +This module manages conversation context, memory, and state for enhanced +bot interactions with persistent conversation awareness. +""" + +import json +import time +import os +from typing import Dict, List, Optional, Any +from dataclasses import dataclass, field +from pydantic import BaseModel, Field +from collections import defaultdict + +from logger import logger + + +@dataclass +class ConversationTurn: + """Individual turn in a conversation.""" + turn_id: str + timestamp: float + user_message: str + bot_response: str + context_used: Dict[str, Any] = field(default_factory=dict) + metadata: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + "turn_id": self.turn_id, + "timestamp": self.timestamp, + "user_message": self.user_message, + "bot_response": self.bot_response, + "context_used": self.context_used, + "metadata": self.metadata + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'ConversationTurn': + """Create from dictionary.""" + return cls(**data) + + +class ConversationMemory(BaseModel): + """Memory system for conversation context.""" + + # Core conversation data + session_id: str + bot_name: str + user_name: Optional[str] = None + conversation_id: str + + # Conversation history + turns: List[ConversationTurn] = Field(default_factory=list) + created_at: float = Field(default_factory=time.time) + last_updated: float = Field(default_factory=time.time) + + # Memory components + facts_learned: Dict[str, Any] = Field(default_factory=dict) # Facts about user/context + preferences: Dict[str, Any] = Field(default_factory=dict) # User preferences + topics_discussed: List[str] = Field(default_factory=list) # Topics covered + emotional_context: Dict[str, Any] = Field(default_factory=dict) # Emotional state + + # Conversation state + current_topic: Optional[str] = None + conversation_stage: str = "greeting" # greeting, discussion, conclusion + user_intent: Optional[str] = None + bot_goals: List[str] = Field(default_factory=list) + + # Memory management + max_turns: int = 100 + max_facts: int = 50 + memory_decay_factor: float = 0.95 # How quickly old memories fade + + class Config: + arbitrary_types_allowed = True + + def add_turn(self, turn: ConversationTurn): + """Add a conversation turn to memory.""" + self.turns.append(turn) + self.last_updated = time.time() + + # Extract facts and context from the turn + self._extract_context_from_turn(turn) + + # Trim history if needed + if len(self.turns) > self.max_turns: + self.turns = self.turns[-self.max_turns:] + + def _extract_context_from_turn(self, turn: ConversationTurn): + """Extract contextual information from a conversation turn.""" + # Simple keyword-based fact extraction (can be enhanced with NLP) + user_message = turn.user_message.lower() + + # Extract preferences + if "i like" in user_message or "i love" in user_message: + # Simple preference extraction + preference_start = max(user_message.find("i like"), user_message.find("i love")) + preference_text = user_message[preference_start:].split('.')[0] + self.preferences[f"preference_{len(self.preferences)}"] = preference_text + + # Extract facts + if "my name is" in user_message: + name_start = user_message.find("my name is") + len("my name is") + name = user_message[name_start:].split()[0].strip() + if name: + self.facts_learned["user_name"] = name + self.user_name = name + + # Topic tracking + if turn.metadata.get("detected_topics"): + for topic in turn.metadata["detected_topics"]: + if topic not in self.topics_discussed: + self.topics_discussed.append(topic) + + # Emotional context (simple sentiment analysis) + emotional_indicators = { + "happy": ["happy", "great", "wonderful", "excited", "joy"], + "sad": ["sad", "unhappy", "disappointed", "depressed"], + "frustrated": ["frustrated", "annoyed", "angry", "upset"], + "confused": ["confused", "don't understand", "unclear", "puzzled"], + "satisfied": ["good", "thanks", "helpful", "satisfied"] + } + + for emotion, indicators in emotional_indicators.items(): + if any(indicator in user_message for indicator in indicators): + self.emotional_context["current_emotion"] = emotion + self.emotional_context["last_emotion_update"] = time.time() + break + + def get_recent_context(self, turns: int = 5) -> List[ConversationTurn]: + """Get recent conversation turns for context.""" + return self.turns[-turns:] if self.turns else [] + + def get_relevant_facts(self, query: str) -> Dict[str, Any]: + """Get facts relevant to a query.""" + relevant_facts = {} + query_lower = query.lower() + + for key, value in self.facts_learned.items(): + if isinstance(value, str) and any(word in value.lower() for word in query_lower.split()): + relevant_facts[key] = value + + return relevant_facts + + def get_conversation_summary(self) -> str: + """Generate a summary of the conversation.""" + if not self.turns: + return "No conversation history." + + summary_parts = [] + + if self.user_name: + summary_parts.append(f"User: {self.user_name}") + + if self.topics_discussed: + topics_str = ", ".join(self.topics_discussed[:5]) + summary_parts.append(f"Topics discussed: {topics_str}") + + if self.preferences: + prefs = list(self.preferences.values())[:3] + summary_parts.append(f"User preferences: {'; '.join(prefs)}") + + if self.emotional_context.get("current_emotion"): + summary_parts.append(f"Current mood: {self.emotional_context['current_emotion']}") + + summary_parts.append(f"Conversation turns: {len(self.turns)}") + + return " | ".join(summary_parts) + + +class ConversationContextManager: + """Manager for conversation contexts and memory.""" + + def __init__(self, storage_path: Optional[str] = None): + self.storage_path = storage_path or "./conversation_contexts" + self.active_contexts: Dict[str, ConversationMemory] = {} + self.context_index: Dict[str, List[str]] = defaultdict(list) # bot_name -> conversation_ids + + # Ensure storage directory exists + os.makedirs(self.storage_path, exist_ok=True) + + # Load existing contexts + self._load_existing_contexts() + + def _load_existing_contexts(self): + """Load existing conversation contexts from storage.""" + try: + context_files = [f for f in os.listdir(self.storage_path) if f.endswith('.json')] + for file in context_files: + try: + file_path = os.path.join(self.storage_path, file) + with open(file_path, 'r') as f: + data = json.load(f) + + # Convert turn data back to ConversationTurn objects + turns = [ConversationTurn.from_dict(turn_data) for turn_data in data.get('turns', [])] + data['turns'] = turns + + context = ConversationMemory(**data) + conversation_id = context.conversation_id + + self.active_contexts[conversation_id] = context + self.context_index[context.bot_name].append(conversation_id) + + except Exception as e: + logger.warning(f"Failed to load context from {file}: {e}") + + logger.info(f"Loaded {len(self.active_contexts)} conversation contexts") + + except Exception as e: + logger.error(f"Failed to load conversation contexts: {e}") + + def get_or_create_context( + self, + session_id: str, + bot_name: str, + conversation_id: Optional[str] = None + ) -> ConversationMemory: + """Get existing context or create a new one.""" + + if conversation_id and conversation_id in self.active_contexts: + return self.active_contexts[conversation_id] + + # Create new conversation ID if not provided + if not conversation_id: + conversation_id = f"{session_id}_{bot_name}_{int(time.time())}" + + # Create new context + context = ConversationMemory( + session_id=session_id, + bot_name=bot_name, + conversation_id=conversation_id + ) + + self.active_contexts[conversation_id] = context + self.context_index[bot_name].append(conversation_id) + + logger.info(f"Created new conversation context: {conversation_id}") + return context + + def save_context(self, conversation_id: str): + """Save a conversation context to storage.""" + if conversation_id not in self.active_contexts: + logger.warning(f"Context {conversation_id} not found for saving") + return + + context = self.active_contexts[conversation_id] + + try: + # Convert to dict for serialization + data = context.model_dump() + # Convert ConversationTurn objects to dicts + data['turns'] = [turn.to_dict() for turn in context.turns] + + file_path = os.path.join(self.storage_path, f"{conversation_id}.json") + with open(file_path, 'w') as f: + json.dump(data, f, indent=2) + + logger.debug(f"Saved context: {conversation_id}") + + except Exception as e: + logger.error(f"Failed to save context {conversation_id}: {e}") + + def add_conversation_turn( + self, + conversation_id: str, + user_message: str, + bot_response: str, + context_used: Optional[Dict[str, Any]] = None, + metadata: Optional[Dict[str, Any]] = None + ): + """Add a conversation turn to the specified context.""" + if conversation_id not in self.active_contexts: + logger.warning(f"Context {conversation_id} not found") + return + + turn = ConversationTurn( + turn_id=f"{conversation_id}_{len(self.active_contexts[conversation_id].turns)}", + timestamp=time.time(), + user_message=user_message, + bot_response=bot_response, + context_used=context_used or {}, + metadata=metadata or {} + ) + + self.active_contexts[conversation_id].add_turn(turn) + + # Auto-save after each turn + self.save_context(conversation_id) + + def get_context_for_response(self, conversation_id: str) -> Optional[str]: + """Get formatted context for generating bot responses.""" + if conversation_id not in self.active_contexts: + return None + + context = self.active_contexts[conversation_id] + context_parts = [] + + # Add conversation summary + summary = context.get_conversation_summary() + if summary != "No conversation history.": + context_parts.append(f"Conversation context: {summary}") + + # Add recent turns for immediate context + recent_turns = context.get_recent_context(3) + if recent_turns: + context_parts.append("Recent conversation:") + for turn in recent_turns: + context_parts.append(f"User: {turn.user_message}") + context_parts.append(f"Bot: {turn.bot_response}") + + # Add relevant facts + if context.facts_learned: + facts_str = "; ".join([f"{k}: {v}" for k, v in list(context.facts_learned.items())[:3]]) + context_parts.append(f"Known facts: {facts_str}") + + # Add emotional context + if context.emotional_context.get("current_emotion"): + context_parts.append(f"User's current mood: {context.emotional_context['current_emotion']}") + + return "\n".join(context_parts) if context_parts else None + + def get_contexts_for_bot(self, bot_name: str) -> List[ConversationMemory]: + """Get all contexts for a specific bot.""" + conversation_ids = self.context_index.get(bot_name, []) + return [self.active_contexts[cid] for cid in conversation_ids if cid in self.active_contexts] + + def cleanup_old_contexts(self, max_age_days: int = 30): + """Clean up old conversation contexts.""" + current_time = time.time() + max_age_seconds = max_age_days * 24 * 60 * 60 + + contexts_to_remove = [] + for conversation_id, context in self.active_contexts.items(): + if current_time - context.last_updated > max_age_seconds: + contexts_to_remove.append(conversation_id) + + for conversation_id in contexts_to_remove: + context = self.active_contexts[conversation_id] + + # Remove from index + if context.bot_name in self.context_index: + if conversation_id in self.context_index[context.bot_name]: + self.context_index[context.bot_name].remove(conversation_id) + + # Remove context file + try: + file_path = os.path.join(self.storage_path, f"{conversation_id}.json") + if os.path.exists(file_path): + os.remove(file_path) + except Exception as e: + logger.warning(f"Failed to remove context file {conversation_id}: {e}") + + # Remove from active contexts + del self.active_contexts[conversation_id] + + if contexts_to_remove: + logger.info(f"Cleaned up {len(contexts_to_remove)} old conversation contexts") + + def get_statistics(self) -> Dict[str, Any]: + """Get statistics about conversation contexts.""" + total_contexts = len(self.active_contexts) + total_turns = sum(len(context.turns) for context in self.active_contexts.values()) + + bot_stats = {} + for bot_name, conversation_ids in self.context_index.items(): + active_conversations = [cid for cid in conversation_ids if cid in self.active_contexts] + bot_stats[bot_name] = { + "active_conversations": len(active_conversations), + "total_turns": sum(len(self.active_contexts[cid].turns) for cid in active_conversations) + } + + return { + "total_contexts": total_contexts, + "total_turns": total_turns, + "average_turns_per_context": total_turns / total_contexts if total_contexts > 0 else 0, + "bot_statistics": bot_stats + } + + +# Global context manager instance +context_manager = ConversationContextManager() diff --git a/voicebot/enhanced_bot_configs.json b/voicebot/enhanced_bot_configs.json new file mode 100644 index 0000000..fea58c0 --- /dev/null +++ b/voicebot/enhanced_bot_configs.json @@ -0,0 +1,85 @@ +{ + "ai_chatbot": { + "personality": "technical_expert", + "ai_provider": "openai", + "streaming": false, + "memory_enabled": true, + "advanced_features": true, + "description": "Advanced AI chatbot with OpenAI integration and helpful personality", + "features": [ + "multi_provider_ai", + "personality_system", + "conversation_memory", + "streaming_responses" + ] + }, + "technical_expert": { + "personality": "technical_expert", + "ai_provider": "anthropic", + "streaming": false, + "memory_enabled": true, + "advanced_features": true, + "description": "Technical expert bot with detailed explanations and Anthropic Claude integration", + "features": [ + "technical_expertise", + "detailed_explanations", + "conversation_memory" + ] + }, + "creative_companion": { + "personality": "creative_companion", + "ai_provider": "local", + "streaming": true, + "memory_enabled": true, + "advanced_features": true, + "description": "Creative writing and brainstorming companion with local AI model", + "features": [ + "creative_writing", + "brainstorming", + "streaming_responses", + "conversation_memory" + ] + }, + "business_advisor": { + "personality": "business_advisor", + "ai_provider": "openai", + "streaming": false, + "memory_enabled": true, + "advanced_features": true, + "description": "Business and strategic advisor with professional communication style", + "features": [ + "business_analysis", + "strategic_planning", + "professional_communication", + "conversation_memory" + ] + }, + "comedy_bot": { + "personality": "comedy_bot", + "ai_provider": "local", + "streaming": true, + "memory_enabled": false, + "advanced_features": true, + "description": "Entertainment-focused bot with humor and casual conversation", + "features": [ + "humor_generation", + "casual_conversation", + "entertainment", + "streaming_responses" + ] + }, + "wise_mentor": { + "personality": "wise_mentor", + "ai_provider": "anthropic", + "streaming": false, + "memory_enabled": true, + "advanced_features": true, + "description": "Wise mentor providing thoughtful guidance and life advice", + "features": [ + "life_advice", + "thoughtful_guidance", + "wisdom_sharing", + "conversation_memory" + ] + } +} \ No newline at end of file diff --git a/voicebot/personality_system.py b/voicebot/personality_system.py new file mode 100644 index 0000000..befa69c --- /dev/null +++ b/voicebot/personality_system.py @@ -0,0 +1,449 @@ +""" +Bot Personality System for Advanced Bot Management. + +This module provides personality templates, configuration, and behavior +management for creating diverse and engaging bot characters. +""" + +import json +import os +from typing import Dict, List, Optional, Any +from enum import Enum +from pydantic import BaseModel, Field + +from logger import logger + + +class PersonalityTrait(str, Enum): + """Personality traits that can be assigned to bots.""" + FRIENDLY = "friendly" + PROFESSIONAL = "professional" + HUMOROUS = "humorous" + SARCASTIC = "sarcastic" + EMPATHETIC = "empathetic" + ANALYTICAL = "analytical" + CREATIVE = "creative" + ASSERTIVE = "assertive" + PATIENT = "patient" + ENTHUSIASTIC = "enthusiastic" + MYSTERIOUS = "mysterious" + WISE = "wise" + PLAYFUL = "playful" + FORMAL = "formal" + CASUAL = "casual" + + +class CommunicationStyle(str, Enum): + """Communication styles for bot responses.""" + CONCISE = "concise" + DETAILED = "detailed" + CONVERSATIONAL = "conversational" + TECHNICAL = "technical" + STORYTELLING = "storytelling" + QUESTION_BASED = "question_based" + SUPPORTIVE = "supportive" + DIRECT = "direct" + DIPLOMATIC = "diplomatic" + + +class ExpertiseDomain(str, Enum): + """Domains of expertise for specialized bots.""" + GENERAL = "general" + TECHNOLOGY = "technology" + SCIENCE = "science" + ARTS = "arts" + BUSINESS = "business" + EDUCATION = "education" + HEALTH = "health" + ENTERTAINMENT = "entertainment" + SPORTS = "sports" + COOKING = "cooking" + TRAVEL = "travel" + FINANCE = "finance" + LEGAL = "legal" + PSYCHOLOGY = "psychology" + PHILOSOPHY = "philosophy" + + +class BotPersonality(BaseModel): + """Complete personality configuration for a bot.""" + + # Identity + name: str = "Assistant" + description: str = "A helpful AI assistant" + backstory: Optional[str] = None + + # Core personality + traits: List[PersonalityTrait] = Field(default_factory=lambda: [PersonalityTrait.FRIENDLY]) + communication_style: CommunicationStyle = CommunicationStyle.CONVERSATIONAL + expertise_domains: List[ExpertiseDomain] = Field(default_factory=lambda: [ExpertiseDomain.GENERAL]) + + # Behavior settings + response_length_preference: str = "medium" # short, medium, long + emoji_usage: bool = True + formality_level: float = 0.5 # 0.0 = very casual, 1.0 = very formal + humor_level: float = 0.3 # 0.0 = no humor, 1.0 = very humorous + empathy_level: float = 0.7 # 0.0 = analytical only, 1.0 = highly empathetic + + # Language preferences + preferred_language: str = "en" + technical_complexity: float = 0.5 # 0.0 = simple terms, 1.0 = technical jargon + + # Conversation patterns + greeting_style: str = "warm" # formal, warm, casual, quirky + farewell_style: str = "friendly" # formal, friendly, casual, memorable + + # Custom prompts and examples + system_prompt: Optional[str] = None + example_responses: List[Dict[str, str]] = Field(default_factory=list) + custom_instructions: List[str] = Field(default_factory=list) + + # Behavioral boundaries + topics_to_avoid: List[str] = Field(default_factory=list) + preferred_topics: List[str] = Field(default_factory=list) + conversation_limits: Dict[str, Any] = Field(default_factory=dict) + + def generate_system_prompt(self) -> str: + """Generate a comprehensive system prompt based on personality.""" + if self.system_prompt: + return self.system_prompt + + prompt_parts = [] + + # Identity + prompt_parts.append(f"You are {self.name}, {self.description}") + if self.backstory: + prompt_parts.append(f"Background: {self.backstory}") + + # Personality traits + if self.traits: + traits_str = ", ".join([trait.value for trait in self.traits]) + prompt_parts.append(f"Your personality is: {traits_str}") + + # Communication style + style_desc = self._get_style_description() + prompt_parts.append(f"Communication style: {style_desc}") + + # Expertise + if self.expertise_domains and self.expertise_domains != [ExpertiseDomain.GENERAL]: + domains_str = ", ".join([domain.value for domain in self.expertise_domains]) + prompt_parts.append(f"Areas of expertise: {domains_str}") + + # Behavior guidelines + behavior_guidelines = self._generate_behavior_guidelines() + if behavior_guidelines: + prompt_parts.append(f"Behavior guidelines: {behavior_guidelines}") + + # Custom instructions + if self.custom_instructions: + prompt_parts.append("Additional instructions:") + prompt_parts.extend([f"- {instruction}" for instruction in self.custom_instructions]) + + # Topic boundaries + if self.topics_to_avoid: + avoid_str = ", ".join(self.topics_to_avoid) + prompt_parts.append(f"Avoid discussing: {avoid_str}") + + if self.preferred_topics: + prefer_str = ", ".join(self.preferred_topics) + prompt_parts.append(f"Preferred discussion topics: {prefer_str}") + + return "\n\n".join(prompt_parts) + + def _get_style_description(self) -> str: + """Get communication style description.""" + style_map = { + CommunicationStyle.CONCISE: "Keep responses brief and to the point", + CommunicationStyle.DETAILED: "Provide comprehensive, detailed explanations", + CommunicationStyle.CONVERSATIONAL: "Use natural, conversational language", + CommunicationStyle.TECHNICAL: "Use precise, technical terminology when appropriate", + CommunicationStyle.STORYTELLING: "Frame responses as engaging narratives", + CommunicationStyle.QUESTION_BASED: "Ask follow-up questions to better understand needs", + CommunicationStyle.SUPPORTIVE: "Provide encouraging and supportive responses", + CommunicationStyle.DIRECT: "Be straightforward and direct in communication", + CommunicationStyle.DIPLOMATIC: "Use diplomatic and tactful language" + } + return style_map.get(self.communication_style, "Communicate naturally") + + def _generate_behavior_guidelines(self) -> str: + """Generate behavior guidelines based on settings.""" + guidelines = [] + + if self.formality_level > 0.7: + guidelines.append("maintain a formal and professional tone") + elif self.formality_level < 0.3: + guidelines.append("use casual and relaxed language") + + if self.humor_level > 0.5: + guidelines.append("incorporate appropriate humor when suitable") + elif self.humor_level < 0.2: + guidelines.append("maintain a serious and professional demeanor") + + if self.empathy_level > 0.7: + guidelines.append("show high emotional intelligence and empathy") + elif self.empathy_level < 0.3: + guidelines.append("focus on logical and analytical responses") + + if self.emoji_usage: + guidelines.append("use emojis appropriately to enhance communication") + + if self.response_length_preference == "short": + guidelines.append("keep responses concise") + elif self.response_length_preference == "long": + guidelines.append("provide detailed explanations") + + return "; ".join(guidelines) if guidelines else "" + + +class PersonalityTemplate(BaseModel): + """Pre-defined personality templates for common bot types.""" + + id: str + name: str + description: str + personality: BotPersonality + category: str = "general" + tags: List[str] = Field(default_factory=list) + + class Config: + extra = "allow" + + +class PersonalityManager: + """Manager for bot personalities and templates.""" + + def __init__(self): + self.templates: Dict[str, PersonalityTemplate] = {} + self.custom_personalities: Dict[str, BotPersonality] = {} + self._load_default_templates() + + def _load_default_templates(self): + """Load default personality templates.""" + default_templates = [ + PersonalityTemplate( + id="helpful_assistant", + name="Helpful Assistant", + description="A friendly and helpful general-purpose assistant", + personality=BotPersonality( + name="Assistant", + description="a helpful and friendly AI assistant", + traits=[PersonalityTrait.FRIENDLY, PersonalityTrait.EMPATHETIC, PersonalityTrait.PATIENT], + communication_style=CommunicationStyle.CONVERSATIONAL, + expertise_domains=[ExpertiseDomain.GENERAL], + formality_level=0.4, + humor_level=0.3, + empathy_level=0.8 + ), + category="general", + tags=["helpful", "friendly", "general"] + ), + + PersonalityTemplate( + id="technical_expert", + name="Technical Expert", + description="A knowledgeable technical specialist", + personality=BotPersonality( + name="TechBot", + description="a knowledgeable technical expert and problem solver", + traits=[PersonalityTrait.ANALYTICAL, PersonalityTrait.PROFESSIONAL, PersonalityTrait.PATIENT], + communication_style=CommunicationStyle.TECHNICAL, + expertise_domains=[ExpertiseDomain.TECHNOLOGY, ExpertiseDomain.SCIENCE], + formality_level=0.7, + humor_level=0.1, + empathy_level=0.4, + technical_complexity=0.8, + emoji_usage=False + ), + category="technical", + tags=["technical", "expert", "analytical"] + ), + + PersonalityTemplate( + id="creative_companion", + name="Creative Companion", + description="An imaginative and inspiring creative partner", + personality=BotPersonality( + name="CreativeBot", + description="an imaginative and inspiring creative companion", + traits=[PersonalityTrait.CREATIVE, PersonalityTrait.ENTHUSIASTIC, PersonalityTrait.PLAYFUL], + communication_style=CommunicationStyle.STORYTELLING, + expertise_domains=[ExpertiseDomain.ARTS, ExpertiseDomain.ENTERTAINMENT], + formality_level=0.2, + humor_level=0.7, + empathy_level=0.6, + emoji_usage=True, + custom_instructions=[ + "Encourage creative thinking and exploration", + "Offer multiple perspectives and ideas", + "Use vivid and imaginative language" + ] + ), + category="creative", + tags=["creative", "artistic", "inspiring"] + ), + + PersonalityTemplate( + id="business_advisor", + name="Business Advisor", + description="A professional business consultant and strategist", + personality=BotPersonality( + name="BusinessBot", + description="a professional business advisor and strategic consultant", + traits=[PersonalityTrait.PROFESSIONAL, PersonalityTrait.ANALYTICAL, PersonalityTrait.ASSERTIVE], + communication_style=CommunicationStyle.DIRECT, + expertise_domains=[ExpertiseDomain.BUSINESS, ExpertiseDomain.FINANCE], + formality_level=0.8, + humor_level=0.2, + empathy_level=0.5, + emoji_usage=False, + response_length_preference="detailed", + custom_instructions=[ + "Provide actionable business insights", + "Focus on practical solutions and ROI", + "Use business terminology appropriately" + ] + ), + category="business", + tags=["business", "professional", "strategic"] + ), + + PersonalityTemplate( + id="comedy_bot", + name="Comedy Bot", + description="A humorous entertainer that loves jokes and wordplay", + personality=BotPersonality( + name="ComedyBot", + description="a humorous entertainer who loves jokes, puns, and making people laugh", + traits=[PersonalityTrait.HUMOROUS, PersonalityTrait.PLAYFUL, PersonalityTrait.ENTHUSIASTIC], + communication_style=CommunicationStyle.CONVERSATIONAL, + expertise_domains=[ExpertiseDomain.ENTERTAINMENT], + formality_level=0.1, + humor_level=0.9, + empathy_level=0.6, + emoji_usage=True, + greeting_style="quirky", + farewell_style="memorable", + custom_instructions=[ + "Look for opportunities to make appropriate jokes", + "Use wordplay and puns when fitting", + "Keep humor light and positive" + ] + ), + category="entertainment", + tags=["funny", "jokes", "entertainment"] + ), + + PersonalityTemplate( + id="wise_mentor", + name="Wise Mentor", + description="A thoughtful mentor who provides wisdom and guidance", + personality=BotPersonality( + name="Mentor", + description="a wise and thoughtful mentor who provides guidance and wisdom", + traits=[PersonalityTrait.WISE, PersonalityTrait.EMPATHETIC, PersonalityTrait.PATIENT], + communication_style=CommunicationStyle.SUPPORTIVE, + expertise_domains=[ExpertiseDomain.PSYCHOLOGY, ExpertiseDomain.PHILOSOPHY, ExpertiseDomain.EDUCATION], + formality_level=0.5, + humor_level=0.3, + empathy_level=0.9, + response_length_preference="detailed", + custom_instructions=[ + "Ask thoughtful questions to understand deeper needs", + "Provide perspective and context for challenges", + "Encourage personal growth and reflection" + ] + ), + category="guidance", + tags=["wise", "mentor", "supportive"] + ) + ] + + for template in default_templates: + self.templates[template.id] = template + + logger.info(f"Loaded {len(default_templates)} default personality templates") + + def get_template(self, template_id: str) -> Optional[PersonalityTemplate]: + """Get a personality template by ID.""" + return self.templates.get(template_id) + + def list_templates(self, category: Optional[str] = None) -> List[PersonalityTemplate]: + """List all templates, optionally filtered by category.""" + templates = list(self.templates.values()) + if category: + templates = [t for t in templates if t.category == category] + return templates + + def get_categories(self) -> List[str]: + """Get all available template categories.""" + categories = set(t.category for t in self.templates.values()) + return sorted(list(categories)) + + def create_personality_from_template(self, template_id: str, customizations: Optional[Dict[str, Any]] = None) -> Optional[BotPersonality]: + """Create a personality instance from a template with optional customizations.""" + template = self.get_template(template_id) + if not template: + return None + + # Start with template personality + personality_dict = template.personality.model_dump() + + # Apply customizations + if customizations: + for key, value in customizations.items(): + if key in personality_dict: + personality_dict[key] = value + + return BotPersonality(**personality_dict) + + def save_custom_personality(self, personality_id: str, personality: BotPersonality): + """Save a custom personality.""" + self.custom_personalities[personality_id] = personality + logger.info(f"Saved custom personality: {personality_id}") + + def get_custom_personality(self, personality_id: str) -> Optional[BotPersonality]: + """Get a custom personality by ID.""" + return self.custom_personalities.get(personality_id) + + def list_custom_personalities(self) -> List[str]: + """List all custom personality IDs.""" + return list(self.custom_personalities.keys()) + + def export_personality(self, personality: BotPersonality) -> str: + """Export a personality to JSON string.""" + return personality.model_dump_json(indent=2) + + def import_personality(self, json_str: str) -> BotPersonality: + """Import a personality from JSON string.""" + return BotPersonality.model_validate_json(json_str) + + def load_personalities_from_file(self, file_path: str): + """Load custom personalities from a JSON file.""" + try: + if os.path.exists(file_path): + with open(file_path, 'r') as f: + data = json.load(f) + for personality_id, personality_data in data.items(): + personality = BotPersonality(**personality_data) + self.custom_personalities[personality_id] = personality + logger.info(f"Loaded {len(data)} custom personalities from {file_path}") + except Exception as e: + logger.error(f"Failed to load personalities from {file_path}: {e}") + + def save_personalities_to_file(self, file_path: str): + """Save custom personalities to a JSON file.""" + try: + data = {} + for personality_id, personality in self.custom_personalities.items(): + data[personality_id] = personality.model_dump() + + os.makedirs(os.path.dirname(file_path), exist_ok=True) + with open(file_path, 'w') as f: + json.dump(data, f, indent=2) + logger.info(f"Saved {len(data)} custom personalities to {file_path}") + except Exception as e: + logger.error(f"Failed to save personalities to {file_path}: {e}") + + +# Global personality manager instance +personality_manager = PersonalityManager() diff --git a/voicebot/step_5b_integration_demo.py b/voicebot/step_5b_integration_demo.py new file mode 100644 index 0000000..228e374 --- /dev/null +++ b/voicebot/step_5b_integration_demo.py @@ -0,0 +1,477 @@ +"""Step 5B Integration: Enhanced Bot Orchestrator with Advanced Bot Management. + +This module demonstrates how the new advanced bot management features integrate +with the existing bot orchestrator to provide: + +1. AI Provider-powered bots with multiple backend support +2. Personality-driven bot behavior and responses +3. Conversation context and memory management +4. Dynamic bot configuration and health monitoring + +This integration enhances the existing bot discovery and management system +without breaking compatibility with existing bot implementations. +""" + +import os +import json +import time +import asyncio +from typing import Dict, Optional, Any +from pathlib import Path + +# Import existing bot orchestrator functionality +from bot_orchestrator import discover_bots + +# Import advanced bot management modules +try: + from voicebot.ai_providers import ai_provider_manager, AIProviderType + from voicebot.personality_system import personality_manager + from voicebot.conversation_context import context_manager + AI_FEATURES_AVAILABLE = True +except ImportError as e: + print(f"Warning: Advanced AI features not available: {e}") + AI_FEATURES_AVAILABLE = False + +from logger import logger + + +class EnhancedBotOrchestrator: + """Enhanced bot orchestrator with Step 5B advanced management features.""" + + def __init__(self): + self.enhanced_bots = {} # Enhanced bots with AI features + self.bot_configurations = {} # Bot-specific configurations + self.health_stats = {} # Health monitoring data + + # Load configurations + self._load_bot_configurations() + + # Initialize AI systems if available + if AI_FEATURES_AVAILABLE: + self._initialize_ai_systems() + + def _load_bot_configurations(self): + """Load bot configurations from JSON file.""" + config_path = Path(__file__).parent / "enhanced_bot_configs.json" + + default_configs = { + "ai_chatbot": { + "personality": "helpful_assistant", + "ai_provider": "openai", + "streaming": True, + "memory_enabled": True, + "advanced_features": True + }, + "technical_expert": { + "personality": "technical_expert", + "ai_provider": "anthropic", + "streaming": False, + "memory_enabled": True, + "advanced_features": True + }, + "creative_companion": { + "personality": "creative_companion", + "ai_provider": "local", + "streaming": True, + "memory_enabled": True, + "advanced_features": True + } + } + + try: + if config_path.exists(): + with open(config_path, 'r') as f: + self.bot_configurations = json.load(f) + else: + self.bot_configurations = default_configs + self._save_bot_configurations() + except Exception as e: + logger.error(f"Failed to load bot configurations: {e}") + self.bot_configurations = default_configs + + def _save_bot_configurations(self): + """Save bot configurations to JSON file.""" + config_path = Path(__file__).parent / "enhanced_bot_configs.json" + try: + with open(config_path, 'w') as f: + json.dump(self.bot_configurations, f, indent=2) + except Exception as e: + logger.error(f"Failed to save bot configurations: {e}") + + def _initialize_ai_systems(self): + """Initialize AI provider and personality systems.""" + try: + # Ensure default personality templates are loaded + personality_manager.ensure_default_templates() + + # Register available AI providers based on environment + providers_to_init = [] + + if os.getenv("OPENAI_API_KEY"): + providers_to_init.append(AIProviderType.OPENAI) + + if os.getenv("ANTHROPIC_API_KEY"): + providers_to_init.append(AIProviderType.ANTHROPIC) + + # Local provider is always available + providers_to_init.append(AIProviderType.LOCAL) + + for provider_type in providers_to_init: + try: + provider = ai_provider_manager.create_provider(provider_type) + ai_provider_manager.register_provider(f"system_{provider_type.value}", provider) + logger.info(f"Initialized AI provider: {provider_type.value}") + except Exception as e: + logger.warning(f"Failed to initialize provider {provider_type.value}: {e}") + + logger.info("AI systems initialized successfully") + + except Exception as e: + logger.error(f"Failed to initialize AI systems: {e}") + + async def discover_enhanced_bots(self) -> Dict[str, Dict[str, Any]]: + """Discover bots with enhanced information about AI capabilities.""" + # Start with standard bot discovery + standard_bots = discover_bots() # Returns List[BotInfoModel] + enhanced_bot_info = {} + + # Convert BotInfoModel list to dict and enhance with AI capabilities + for bot_info in standard_bots: + bot_name = bot_info.name + bot_info_dict = { + "name": bot_name, + "description": bot_info.description, + "has_media": bot_info.has_media, + "standard_info": { + "name": bot_name, + "description": bot_info.description, + "has_media": bot_info.has_media + }, + "enhanced_features": False, + "ai_capabilities": {}, + "health_status": "unknown" + } + + # Check if bot supports enhanced features + if bot_name in self.bot_configurations: + config = self.bot_configurations[bot_name] + if config.get("advanced_features", False): + bot_info_dict["enhanced_features"] = True + bot_info_dict["ai_capabilities"] = { + "personality": config.get("personality", "default"), + "ai_provider": config.get("ai_provider", "local"), + "streaming": config.get("streaming", False), + "memory_enabled": config.get("memory_enabled", False) + } + + # Check bot health if it supports it (would need to import the bot module) + try: + bot_module_path = f"voicebot.bots.{bot_name}" + bot_module = __import__(bot_module_path, fromlist=[bot_name]) + if hasattr(bot_module, 'get_bot_status'): + status = await bot_module.get_bot_status() + bot_info_dict["health_status"] = "healthy" + bot_info_dict["detailed_status"] = status + except Exception as e: + bot_info_dict["health_status"] = f"import_error: {e}" + + enhanced_bot_info[bot_name] = bot_info_dict + + return enhanced_bot_info + + async def create_enhanced_bot_instance(self, bot_name: str, session_name: str) -> Optional[Any]: + """Create an enhanced bot instance with AI features configured.""" + if not AI_FEATURES_AVAILABLE: + logger.warning(f"Cannot create enhanced bot {bot_name} - AI features not available") + return None + + if bot_name not in self.bot_configurations: + logger.warning(f"No configuration found for enhanced bot: {bot_name}") + return None + + config = self.bot_configurations[bot_name] + + try: + # Set environment variables for the bot based on configuration + os.environ[f"{bot_name.upper()}_PERSONALITY"] = config.get("personality", "helpful_assistant") + os.environ[f"{bot_name.upper()}_PROVIDER"] = config.get("ai_provider", "local") + os.environ[f"{bot_name.upper()}_STREAMING"] = str(config.get("streaming", False)).lower() + os.environ[f"{bot_name.upper()}_MEMORY"] = str(config.get("memory_enabled", False)).lower() + + # Import and create the bot + bot_module_path = f"voicebot.bots.{bot_name}" + bot_module = __import__(bot_module_path, fromlist=[bot_name]) + + # If the bot has a specific initialization function, use it + if hasattr(bot_module, 'create_enhanced_instance'): + bot_instance = await bot_module.create_enhanced_instance(session_name, config) + else: + # Create standard bot instance + bot_instance = bot_module + + self.enhanced_bots[f"{bot_name}_{session_name}"] = { + "instance": bot_instance, + "config": config, + "session": session_name, + "created_at": time.time() + } + + logger.info(f"Created enhanced bot instance: {bot_name} for session {session_name}") + return bot_instance + + except Exception as e: + logger.error(f"Failed to create enhanced bot instance {bot_name}: {e}") + return None + + async def monitor_bot_health(self) -> Dict[str, Any]: + """Monitor health of all enhanced bots and AI systems.""" + health_report = { + "timestamp": time.time(), + "ai_systems_available": AI_FEATURES_AVAILABLE, + "enhanced_bots": {}, + "ai_providers": {}, + "personality_system": {}, + "conversation_contexts": {} + } + + if not AI_FEATURES_AVAILABLE: + health_report["status"] = "limited - AI features disabled" + return health_report + + try: + # Check AI providers + for provider_id, provider in ai_provider_manager.list_providers().items(): + try: + provider_instance = ai_provider_manager.get_provider(provider_id) + if provider_instance: + is_healthy = await provider_instance.health_check() + health_report["ai_providers"][provider_id] = { + "status": "healthy" if is_healthy else "unhealthy", + "type": provider.value if hasattr(provider, 'value') else str(provider) + } + except Exception as e: + health_report["ai_providers"][provider_id] = { + "status": f"error: {e}", + "type": "unknown" + } + + # Check personality system + try: + templates = personality_manager.list_templates() + health_report["personality_system"] = { + "status": "healthy", + "available_templates": len(templates), + "template_ids": [t.id for t in templates] + } + except Exception as e: + health_report["personality_system"] = { + "status": f"error: {e}" + } + + # Check conversation context system + try: + context_stats = context_manager.get_statistics() + health_report["conversation_contexts"] = { + "status": "healthy", + "statistics": context_stats + } + except Exception as e: + health_report["conversation_contexts"] = { + "status": f"error: {e}" + } + + # Check enhanced bot instances + for bot_key, bot_data in self.enhanced_bots.items(): + try: + bot_instance = bot_data["instance"] + if hasattr(bot_instance, 'health_check'): + bot_health = await bot_instance.health_check() + health_report["enhanced_bots"][bot_key] = { + "status": "healthy", + "details": bot_health, + "uptime": time.time() - bot_data["created_at"] + } + else: + health_report["enhanced_bots"][bot_key] = { + "status": "unknown - no health check available", + "uptime": time.time() - bot_data["created_at"] + } + except Exception as e: + health_report["enhanced_bots"][bot_key] = { + "status": f"error: {e}", + "uptime": time.time() - bot_data.get("created_at", time.time()) + } + + health_report["status"] = "operational" + + except Exception as e: + health_report["status"] = f"system_error: {e}" + + # Store health stats for trending + self.health_stats[int(time.time())] = health_report + + # Keep only last 24 hours of health stats + cutoff_time = time.time() - (24 * 60 * 60) + self.health_stats = { + timestamp: stats for timestamp, stats in self.health_stats.items() + if timestamp > cutoff_time + } + + return health_report + + async def configure_bot_runtime(self, bot_name: str, new_config: Dict[str, Any]) -> bool: + """Dynamically reconfigure a bot at runtime.""" + if bot_name not in self.bot_configurations: + logger.error(f"Bot {bot_name} not found in configurations") + return False + + try: + # Update configuration + old_config = self.bot_configurations[bot_name].copy() + self.bot_configurations[bot_name].update(new_config) + + # Save updated configuration + self._save_bot_configurations() + + # If there are active instances, try to update them + updated_instances = [] + for bot_key, bot_data in self.enhanced_bots.items(): + if bot_key.startswith(f"{bot_name}_"): + bot_instance = bot_data["instance"] + + # Try to update personality if changed + if "personality" in new_config and hasattr(bot_instance, 'switch_personality'): + success = await bot_instance.switch_personality(new_config["personality"]) + if success: + updated_instances.append(f"{bot_key} personality") + + # Try to update AI provider if changed + if "ai_provider" in new_config and hasattr(bot_instance, 'switch_ai_provider'): + success = await bot_instance.switch_ai_provider(new_config["ai_provider"]) + if success: + updated_instances.append(f"{bot_key} provider") + + # Update bot data configuration + bot_data["config"] = self.bot_configurations[bot_name] + + logger.info(f"Bot {bot_name} configuration updated. Active instances updated: {updated_instances}") + return True + + except Exception as e: + # Rollback configuration on error + self.bot_configurations[bot_name] = old_config + logger.error(f"Failed to configure bot {bot_name}: {e}") + return False + + def get_bot_analytics(self) -> Dict[str, Any]: + """Get analytics and usage statistics for enhanced bots.""" + analytics = { + "enhanced_bots_count": len(self.enhanced_bots), + "configurations_count": len(self.bot_configurations), + "health_history_points": len(self.health_stats), + "bot_breakdown": {}, + "feature_usage": { + "ai_providers": {}, + "personalities": {}, + "memory_enabled": 0, + "streaming_enabled": 0 + } + } + + # Analyze bot configurations + for bot_name, config in self.bot_configurations.items(): + analytics["bot_breakdown"][bot_name] = { + "enhanced_features": config.get("advanced_features", False), + "ai_provider": config.get("ai_provider", "none"), + "personality": config.get("personality", "none"), + "active_instances": sum(1 for key in self.enhanced_bots.keys() if key.startswith(f"{bot_name}_")) + } + + # Count feature usage + provider = config.get("ai_provider", "none") + analytics["feature_usage"]["ai_providers"][provider] = analytics["feature_usage"]["ai_providers"].get(provider, 0) + 1 + + personality = config.get("personality", "none") + analytics["feature_usage"]["personalities"][personality] = analytics["feature_usage"]["personalities"].get(personality, 0) + 1 + + if config.get("memory_enabled", False): + analytics["feature_usage"]["memory_enabled"] += 1 + + if config.get("streaming", False): + analytics["feature_usage"]["streaming_enabled"] += 1 + + # Add conversation context statistics if available + if AI_FEATURES_AVAILABLE: + try: + context_stats = context_manager.get_statistics() + analytics["conversation_statistics"] = context_stats + except Exception as e: + analytics["conversation_statistics"] = {"error": str(e)} + + return analytics + + +# Global enhanced orchestrator instance +enhanced_orchestrator = EnhancedBotOrchestrator() + + +async def demo_step_5b_integration(): + """Demonstrate Step 5B integration capabilities.""" + print("=== Step 5B Advanced Bot Management Demo ===\n") + + # 1. Discover enhanced bots + print("1. Discovering bots with enhanced capabilities...") + enhanced_bots = await enhanced_orchestrator.discover_enhanced_bots() + for bot_name, info in enhanced_bots.items(): + print(f" Bot: {bot_name}") + print(f" Enhanced: {info['enhanced_features']}") + if info['enhanced_features']: + print(f" AI Capabilities: {info['ai_capabilities']}") + print(f" Health: {info['health_status']}") + print() + + # 2. Create enhanced bot instance + print("2. Creating enhanced AI chatbot instance...") + bot_instance = await enhanced_orchestrator.create_enhanced_bot_instance("ai_chatbot", "demo_session") + if bot_instance: + print(" ✓ Enhanced AI chatbot created successfully") + else: + print(" ✗ Failed to create enhanced bot") + print() + + # 3. Monitor system health + print("3. Monitoring system health...") + health_report = await enhanced_orchestrator.monitor_bot_health() + print(f" System Status: {health_report['status']}") + print(f" AI Features Available: {health_report['ai_systems_available']}") + if health_report['ai_systems_available']: + print(f" AI Providers: {len(health_report['ai_providers'])} registered") + print(f" Personality Templates: {health_report['personality_system'].get('available_templates', 0)}") + print(f" Enhanced Bot Instances: {len(health_report['enhanced_bots'])}") + print() + + # 4. Runtime configuration + print("4. Demonstrating runtime configuration...") + config_success = await enhanced_orchestrator.configure_bot_runtime("ai_chatbot", { + "personality": "technical_expert", + "streaming": False + }) + print(f" Configuration Update: {'✓ Success' if config_success else '✗ Failed'}") + print() + + # 5. Analytics + print("5. Bot analytics and usage statistics...") + analytics = enhanced_orchestrator.get_bot_analytics() + print(f" Enhanced Bots: {analytics['enhanced_bots_count']}") + print(f" Configurations: {analytics['configurations_count']}") + print(" Feature Usage:") + for feature, usage in analytics['feature_usage'].items(): + print(f" {feature}: {usage}") + print() + + print("=== Step 5B Integration Demo Complete ===") + + +if __name__ == "__main__": + # Run the demo + asyncio.run(demo_step_5b_integration()) diff --git a/voicebot/test_step_5b.py b/voicebot/test_step_5b.py new file mode 100644 index 0000000..8f52914 --- /dev/null +++ b/voicebot/test_step_5b.py @@ -0,0 +1,195 @@ +""" +Simple test to verify Step 5B enhanced bot functionality. + +This test verifies that the enhanced bot components work correctly +when integrated with the existing voicebot system. +""" + +import asyncio +import os +import time + +# Set up test environment variables +os.environ["AI_CHATBOT_PERSONALITY"] = "helpful_assistant" +os.environ["AI_CHATBOT_PROVIDER"] = "local" # Use local provider for testing +os.environ["AI_CHATBOT_STREAMING"] = "false" +os.environ["AI_CHATBOT_MEMORY"] = "true" + +# Import test modules +import sys +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from shared.models import ChatMessageModel + + +async def test_enhanced_ai_chatbot(): + """Test the enhanced AI chatbot functionality.""" + print("Testing Enhanced AI Chatbot...") + + try: + # Import the enhanced bot + from voicebot.bots.ai_chatbot import handle_chat_message, get_bot_status + + # Create a mock send function + responses = [] + async def mock_send(message: str): + responses.append(message) + print(f"Bot Response: {message}") + + # Test message handling + test_message = ChatMessageModel( + id="test_message_id", + sender_name="test_user", + sender_session_id="test_session", + lobby_id="test_lobby", + message="Hello, can you help me?", + timestamp=time.time() + ) + + print(f"Sending test message: {test_message.message}") + response = await handle_chat_message(test_message, mock_send) + + if response: + print(f"✓ Bot responded successfully: {response[:50]}...") + else: + print("✗ Bot did not respond") + + # Test bot status + print("\nTesting bot status...") + status = await get_bot_status() + print("✓ Bot status retrieved:") + print(f" - Agent: {status.get('agent_name', 'unknown')}") + print(f" - Features Available: {status.get('features_available', False)}") + print(f" - Configuration: {status.get('configuration', {})}") + + return True + + except Exception as e: + print(f"✗ Enhanced bot test failed: {e}") + return False + + +async def test_personality_system(): + """Test the personality system components.""" + print("\nTesting Personality System...") + + try: + from voicebot.personality_system import personality_manager + + # Test listing templates + templates = personality_manager.list_templates() + print(f"✓ Found {len(templates)} personality templates:") + for template in templates: + print(f" - {template.id}: {template.description}") + + # Test creating personality from template + personality = personality_manager.create_personality_from_template("helpful_assistant") + if personality: + print(f"✓ Created personality: {personality.name}") + print(f" - Traits: {[trait.value for trait in personality.traits]}") + print(f" - Communication Style: {personality.communication_style.value}") + else: + print("✗ Failed to create personality") + + return True + + except Exception as e: + print(f"✗ Personality system test failed: {e}") + return False + + +async def test_conversation_context(): + """Test the conversation context management.""" + print("\nTesting Conversation Context...") + + try: + from voicebot.conversation_context import context_manager + + # Test creating context + context = context_manager.get_or_create_context( + session_id="test_session", + bot_name="test_bot", + conversation_id="test_conversation" + ) + + if context: + print(f"✓ Created conversation context: {context.conversation_id}") + + # Test adding conversation turn + context_manager.add_conversation_turn( + conversation_id=context.conversation_id, + user_message="Test message", + bot_response="Test response", + context_used={"test": "context"}, + metadata={"timestamp": time.time()} + ) + + print("✓ Added conversation turn") + print(f" - Turns in context: {len(context.turns)}") + + # Test context summary + summary = context_manager.get_context_for_response(context.conversation_id) + if summary: + print(f"✓ Generated context summary: {summary[:50]}...") + + return True + + except Exception as e: + print(f"✗ Conversation context test failed: {e}") + return False + + +async def test_integration_orchestrator(): + """Test the integration orchestrator.""" + print("\nTesting Integration Orchestrator...") + + try: + from step_5b_integration_demo import enhanced_orchestrator + + # Test bot discovery + enhanced_bots = await enhanced_orchestrator.discover_enhanced_bots() + print(f"✓ Discovered {len(enhanced_bots)} bots") + + # Find enhanced bots + enhanced_count = sum(1 for bot_info in enhanced_bots.values() + if bot_info.get('enhanced_features', False)) + print(f"✓ Found {enhanced_count} enhanced bots") + + # Test analytics + analytics = enhanced_orchestrator.get_bot_analytics() + print(f"✓ Analytics: {analytics['enhanced_bots_count']} enhanced bots configured") + + return True + + except Exception as e: + print(f"✗ Integration orchestrator test failed: {e}") + return False + + +async def run_all_tests(): + """Run all Step 5B tests.""" + print("=== Step 5B Enhanced Bot Management Tests ===\n") + + test_results = [] + + # Run individual tests + test_results.append(await test_enhanced_ai_chatbot()) + test_results.append(await test_personality_system()) + test_results.append(await test_conversation_context()) + test_results.append(await test_integration_orchestrator()) + + # Summary + passed = sum(test_results) + total = len(test_results) + + print(f"\n=== Test Results: {passed}/{total} tests passed ===") + + if passed == total: + print("🎉 All Step 5B components are working correctly!") + else: + print("⚠️ Some tests failed - check the output above for details") + + return passed == total + + +if __name__ == "__main__": + asyncio.run(run_all_tests())