Improved bot capabilities

This commit is contained in:
James Ketr 2025-09-04 17:53:26 -07:00
parent 3a0d54ce0f
commit 15641aa542
14 changed files with 2473 additions and 20 deletions

View File

@ -14,9 +14,9 @@ Endpoints:
- /api/system/info - System information - /api/system/info - System information
""" """
from typing import Dict, Any, Optional from typing import Optional
from fastapi import APIRouter, HTTPException, Query from fastapi import APIRouter, HTTPException, Query
from datetime import datetime, timedelta from datetime import datetime
from logger import logger from logger import logger

View File

@ -14,14 +14,11 @@ Features:
""" """
import asyncio import asyncio
import time
import json
import hashlib import hashlib
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Any, Dict, Optional, List, Union, Callable, TypeVar from typing import Any, Dict, Optional, Callable, TypeVar
from collections import OrderedDict from collections import OrderedDict
from dataclasses import dataclass from dataclasses import dataclass
import weakref
from logger import logger from logger import logger

View File

@ -10,7 +10,7 @@ import asyncio
import time import time
import traceback import traceback
from enum import Enum from enum import Enum
from typing import Any, Callable, Dict, List, Optional, TypeVar, Generic from typing import Any, Callable, Dict, List, Optional, TypeVar
from functools import wraps from functools import wraps
from dataclasses import dataclass from dataclasses import dataclass
from fastapi import WebSocket from fastapi import WebSocket

View File

@ -15,9 +15,8 @@ Features:
import asyncio import asyncio
import time import time
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Callable, NamedTuple from typing import Dict, Any, List, Optional, NamedTuple
from enum import Enum from enum import Enum
import json
from logger import logger from logger import logger

View File

@ -21,7 +21,6 @@ from typing import Dict, Any, List, Optional, Callable
from dataclasses import dataclass, field from dataclasses import dataclass, field
from collections import defaultdict, deque from collections import defaultdict, deque
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
import weakref
from logger import logger from logger import logger

View File

@ -12,11 +12,10 @@ from fastapi import WebSocket
from logger import logger from logger import logger
from .webrtc_signaling import WebRTCSignalingHandlers from .webrtc_signaling import WebRTCSignalingHandlers
from core.error_handling import ( from core.error_handling import (
error_handler, error_handler,
WebSocketError, WebSocketError,
ValidationError, ValidationError,
with_websocket_error_handling, ErrorSeverity,
ErrorSeverity
) )
if TYPE_CHECKING: if TYPE_CHECKING:

View File

@ -9,12 +9,7 @@ from typing import Any, Dict, TYPE_CHECKING
from fastapi import WebSocket from fastapi import WebSocket
from logger import logger from logger import logger
from core.error_handling import ( from core.error_handling import with_webrtc_error_handling
with_webrtc_error_handling,
WebRTCError,
ErrorSeverity,
error_handler
)
if TYPE_CHECKING: if TYPE_CHECKING:
from core.session_manager import Session from core.session_manager import Session

514
voicebot/ai_providers.py Normal file
View File

@ -0,0 +1,514 @@
"""
AI Provider Integration for Advanced Bot Management.
This module provides support for multiple AI providers including OpenAI, Anthropic,
and local models for enhanced bot capabilities.
"""
import os
import time
import asyncio
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any, AsyncIterator
from enum import Enum
from dataclasses import dataclass
from pydantic import BaseModel, Field
from logger import logger
class AIProviderType(str, Enum):
"""Supported AI provider types."""
OPENAI = "openai"
ANTHROPIC = "anthropic"
LOCAL = "local"
CUSTOM = "custom"
class MessageRole(str, Enum):
"""Message roles in conversation."""
SYSTEM = "system"
USER = "user"
ASSISTANT = "assistant"
@dataclass
class ConversationMessage:
"""Individual message in a conversation."""
role: MessageRole
content: str
timestamp: float = None
metadata: Dict[str, Any] = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
if self.metadata is None:
self.metadata = {}
class AIProviderConfig(BaseModel):
"""Configuration for AI providers."""
provider_type: AIProviderType
api_key: Optional[str] = None
base_url: Optional[str] = None
model: str = "gpt-3.5-turbo"
max_tokens: int = 1000
temperature: float = 0.7
timeout: float = 30.0
retry_attempts: int = 3
retry_delay: float = 1.0
# Advanced settings
top_p: Optional[float] = None
frequency_penalty: Optional[float] = None
presence_penalty: Optional[float] = None
stop_sequences: List[str] = Field(default_factory=list)
class Config:
extra = "allow" # Allow additional provider-specific configs
class ConversationContext(BaseModel):
"""Conversation context and memory management."""
session_id: str
bot_name: str
messages: List[ConversationMessage] = Field(default_factory=list)
created_at: float = Field(default_factory=time.time)
last_updated: float = Field(default_factory=time.time)
# Context management
max_history: int = 50
context_window: int = 4000 # Token limit for context
personality_prompt: Optional[str] = None
# Metadata
user_preferences: Dict[str, Any] = Field(default_factory=dict)
conversation_state: Dict[str, Any] = Field(default_factory=dict)
def add_message(self, role: MessageRole, content: str, metadata: Dict[str, Any] = None):
"""Add a message to the conversation."""
message = ConversationMessage(role=role, content=content, metadata=metadata or {})
self.messages.append(message)
self.last_updated = time.time()
# Trim history if needed
if len(self.messages) > self.max_history:
# Keep system messages and recent messages
system_messages = [m for m in self.messages if m.role == MessageRole.SYSTEM]
recent_messages = [m for m in self.messages if m.role != MessageRole.SYSTEM][-self.max_history:]
self.messages = system_messages + recent_messages
def get_context_messages(self) -> List[Dict[str, str]]:
"""Get messages formatted for AI provider APIs."""
messages = []
for msg in self.messages:
messages.append({
"role": msg.role.value,
"content": msg.content
})
return messages
class AIProvider(ABC):
"""Abstract base class for AI providers."""
def __init__(self, config: AIProviderConfig):
self.config = config
self.provider_type = config.provider_type
@abstractmethod
async def generate_response(
self,
context: ConversationContext,
message: str
) -> str:
"""Generate a response to a message."""
pass
@abstractmethod
async def stream_response(
self,
context: ConversationContext,
message: str
) -> AsyncIterator[str]:
"""Stream a response to a message."""
pass
@abstractmethod
async def health_check(self) -> bool:
"""Check if the provider is healthy and available."""
pass
class OpenAIProvider(AIProvider):
"""OpenAI provider implementation."""
def __init__(self, config: AIProviderConfig):
super().__init__(config)
self._client = None
def _get_client(self):
"""Lazy initialization of OpenAI client."""
if self._client is None:
try:
import openai
self._client = openai.AsyncOpenAI(
api_key=self.config.api_key or os.getenv("OPENAI_API_KEY"),
base_url=self.config.base_url,
timeout=self.config.timeout
)
except ImportError:
raise ImportError("OpenAI package not installed. Install with: pip install openai")
return self._client
async def generate_response(self, context: ConversationContext, message: str) -> str:
"""Generate response using OpenAI API."""
client = self._get_client()
# Add user message to context
context.add_message(MessageRole.USER, message)
messages = context.get_context_messages()
for attempt in range(self.config.retry_attempts):
try:
response = await client.chat.completions.create(
model=self.config.model,
messages=messages,
max_tokens=self.config.max_tokens,
temperature=self.config.temperature,
top_p=self.config.top_p,
frequency_penalty=self.config.frequency_penalty,
presence_penalty=self.config.presence_penalty,
stop=self.config.stop_sequences or None
)
response_text = response.choices[0].message.content
context.add_message(MessageRole.ASSISTANT, response_text)
return response_text
except Exception as e:
logger.warning(f"OpenAI API attempt {attempt + 1} failed: {e}")
if attempt < self.config.retry_attempts - 1:
await asyncio.sleep(self.config.retry_delay * (2 ** attempt))
else:
raise
async def stream_response(self, context: ConversationContext, message: str) -> AsyncIterator[str]:
"""Stream response using OpenAI API."""
client = self._get_client()
context.add_message(MessageRole.USER, message)
messages = context.get_context_messages()
try:
stream = await client.chat.completions.create(
model=self.config.model,
messages=messages,
max_tokens=self.config.max_tokens,
temperature=self.config.temperature,
stream=True
)
full_response = ""
async for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
yield content
# Add complete response to context
context.add_message(MessageRole.ASSISTANT, full_response)
except Exception as e:
logger.error(f"OpenAI streaming failed: {e}")
raise
async def health_check(self) -> bool:
"""Check OpenAI API health."""
try:
client = self._get_client()
await client.models.list()
return True
except Exception as e:
logger.warning(f"OpenAI health check failed: {e}")
return False
class AnthropicProvider(AIProvider):
"""Anthropic Claude provider implementation."""
def __init__(self, config: AIProviderConfig):
super().__init__(config)
self._client = None
def _get_client(self):
"""Lazy initialization of Anthropic client."""
if self._client is None:
try:
import anthropic
self._client = anthropic.AsyncAnthropic(
api_key=self.config.api_key or os.getenv("ANTHROPIC_API_KEY"),
timeout=self.config.timeout
)
except ImportError:
raise ImportError("Anthropic package not installed. Install with: pip install anthropic")
return self._client
async def generate_response(self, context: ConversationContext, message: str) -> str:
"""Generate response using Anthropic API."""
client = self._get_client()
context.add_message(MessageRole.USER, message)
# Convert messages for Anthropic format
messages = []
system_prompt = None
for msg in context.messages:
if msg.role == MessageRole.SYSTEM:
system_prompt = msg.content
else:
messages.append({
"role": msg.role.value,
"content": msg.content
})
for attempt in range(self.config.retry_attempts):
try:
kwargs = {
"model": self.config.model,
"messages": messages,
"max_tokens": self.config.max_tokens,
"temperature": self.config.temperature,
}
if system_prompt:
kwargs["system"] = system_prompt
response = await client.messages.create(**kwargs)
response_text = response.content[0].text
context.add_message(MessageRole.ASSISTANT, response_text)
return response_text
except Exception as e:
logger.warning(f"Anthropic API attempt {attempt + 1} failed: {e}")
if attempt < self.config.retry_attempts - 1:
await asyncio.sleep(self.config.retry_delay * (2 ** attempt))
else:
raise
async def stream_response(self, context: ConversationContext, message: str) -> AsyncIterator[str]:
"""Stream response using Anthropic API."""
client = self._get_client()
context.add_message(MessageRole.USER, message)
messages = context.get_context_messages()
try:
async with client.messages.stream(
model=self.config.model,
messages=messages,
max_tokens=self.config.max_tokens,
temperature=self.config.temperature
) as stream:
full_response = ""
async for text in stream.text_stream:
full_response += text
yield text
context.add_message(MessageRole.ASSISTANT, full_response)
except Exception as e:
logger.error(f"Anthropic streaming failed: {e}")
raise
async def health_check(self) -> bool:
"""Check Anthropic API health."""
try:
client = self._get_client()
# Simple test to verify API connectivity
await client.messages.create(
model=self.config.model,
messages=[{"role": "user", "content": "test"}],
max_tokens=1
)
return True
except Exception as e:
logger.warning(f"Anthropic health check failed: {e}")
return False
class LocalProvider(AIProvider):
"""Local model provider (e.g., Ollama, llama.cpp)."""
def __init__(self, config: AIProviderConfig):
super().__init__(config)
self.base_url = config.base_url or "http://localhost:11434"
async def generate_response(self, context: ConversationContext, message: str) -> str:
"""Generate response using local model API."""
context.add_message(MessageRole.USER, message)
import aiohttp
async with aiohttp.ClientSession() as session:
payload = {
"model": self.config.model,
"messages": context.get_context_messages(),
"stream": False,
"options": {
"temperature": self.config.temperature,
"num_predict": self.config.max_tokens
}
}
try:
async with session.post(
f"{self.base_url}/api/chat",
json=payload,
timeout=aiohttp.ClientTimeout(total=self.config.timeout)
) as resp:
if resp.status == 200:
result = await resp.json()
response_text = result["message"]["content"]
context.add_message(MessageRole.ASSISTANT, response_text)
return response_text
else:
raise Exception(f"Local API returned status {resp.status}")
except Exception as e:
logger.error(f"Local provider failed: {e}")
raise
async def stream_response(self, context: ConversationContext, message: str) -> AsyncIterator[str]:
"""Stream response using local model API."""
context.add_message(MessageRole.USER, message)
import aiohttp
async with aiohttp.ClientSession() as session:
payload = {
"model": self.config.model,
"messages": context.get_context_messages(),
"stream": True
}
try:
async with session.post(
f"{self.base_url}/api/chat",
json=payload,
timeout=aiohttp.ClientTimeout(total=self.config.timeout)
) as resp:
if resp.status == 200:
full_response = ""
async for line in resp.content:
if line:
import json
try:
data = json.loads(line.decode())
if "message" in data and "content" in data["message"]:
content = data["message"]["content"]
full_response += content
yield content
except json.JSONDecodeError:
continue
context.add_message(MessageRole.ASSISTANT, full_response)
else:
raise Exception(f"Local API returned status {resp.status}")
except Exception as e:
logger.error(f"Local provider streaming failed: {e}")
raise
async def health_check(self) -> bool:
"""Check local model health."""
try:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.base_url}/api/tags",
timeout=aiohttp.ClientTimeout(total=5)
) as resp:
return resp.status == 200
except Exception as e:
logger.warning(f"Local provider health check failed: {e}")
return False
class AIProviderManager:
"""Manager for AI providers and configurations."""
def __init__(self):
self.providers: Dict[str, AIProvider] = {}
self.default_configs = self._load_default_configs()
def _load_default_configs(self) -> Dict[AIProviderType, AIProviderConfig]:
"""Load default configurations for providers."""
return {
AIProviderType.OPENAI: AIProviderConfig(
provider_type=AIProviderType.OPENAI,
model=os.getenv("OPENAI_MODEL", "gpt-3.5-turbo"),
max_tokens=int(os.getenv("OPENAI_MAX_TOKENS", "1000")),
temperature=float(os.getenv("OPENAI_TEMPERATURE", "0.7"))
),
AIProviderType.ANTHROPIC: AIProviderConfig(
provider_type=AIProviderType.ANTHROPIC,
model=os.getenv("ANTHROPIC_MODEL", "claude-3-sonnet-20240229"),
max_tokens=int(os.getenv("ANTHROPIC_MAX_TOKENS", "1000")),
temperature=float(os.getenv("ANTHROPIC_TEMPERATURE", "0.7"))
),
AIProviderType.LOCAL: AIProviderConfig(
provider_type=AIProviderType.LOCAL,
base_url=os.getenv("LOCAL_MODEL_URL", "http://localhost:11434"),
model=os.getenv("LOCAL_MODEL_NAME", "llama2"),
max_tokens=int(os.getenv("LOCAL_MAX_TOKENS", "1000")),
temperature=float(os.getenv("LOCAL_TEMPERATURE", "0.7"))
)
}
def create_provider(self, provider_type: AIProviderType, config: Optional[AIProviderConfig] = None) -> AIProvider:
"""Create an AI provider instance."""
if config is None:
config = self.default_configs.get(provider_type)
if config is None:
raise ValueError(f"No default config for provider type: {provider_type}")
if provider_type == AIProviderType.OPENAI:
return OpenAIProvider(config)
elif provider_type == AIProviderType.ANTHROPIC:
return AnthropicProvider(config)
elif provider_type == AIProviderType.LOCAL:
return LocalProvider(config)
else:
raise ValueError(f"Unsupported provider type: {provider_type}")
def register_provider(self, name: str, provider: AIProvider):
"""Register a provider instance."""
self.providers[name] = provider
logger.info(f"Registered AI provider: {name} ({provider.provider_type})")
def get_provider(self, name: str) -> Optional[AIProvider]:
"""Get a registered provider."""
return self.providers.get(name)
def list_providers(self) -> List[str]:
"""List all registered provider names."""
return list(self.providers.keys())
async def health_check_all(self) -> Dict[str, bool]:
"""Health check all registered providers."""
results = {}
for name, provider in self.providers.items():
try:
results[name] = await provider.health_check()
except Exception as e:
logger.error(f"Health check failed for provider {name}: {e}")
results[name] = False
return results
# Global provider manager instance
ai_provider_manager = AIProviderManager()

359
voicebot/bots/ai_chatbot.py Normal file
View File

@ -0,0 +1,359 @@
"""Enhanced AI Chatbot with Multi-Provider Support and Personality System.
This bot demonstrates the advanced capabilities including:
- Multi-provider AI integration (OpenAI, Anthropic, Local models)
- Personality system with configurable traits
- Conversation context and memory management
- Enhanced response generation with streaming support
"""
import os
import time
import uuid
from typing import Dict, Optional, Callable, Awaitable, Any
from aiortc import MediaStreamTrack
# Import system modules
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from logger import logger
from shared.models import ChatMessageModel
# Import advanced bot management modules
try:
from voicebot.ai_providers import (
AIProviderType, AIProviderConfig, ai_provider_manager,
ConversationContext, MessageRole
)
from voicebot.personality_system import personality_manager, PersonalityTrait, CommunicationStyle
from voicebot.conversation_context import context_manager
AI_PROVIDERS_AVAILABLE = True
except ImportError as e:
logger.warning(f"Advanced AI features not available: {e}")
AI_PROVIDERS_AVAILABLE = False
AGENT_NAME = "ai_chatbot"
AGENT_DESCRIPTION = "Advanced AI chatbot with multi-provider support, personality system, and conversation memory"
# Bot configuration from environment
BOT_PERSONALITY = os.getenv("AI_CHATBOT_PERSONALITY", "helpful_assistant")
BOT_AI_PROVIDER = os.getenv("AI_CHATBOT_PROVIDER", "openai")
BOT_STREAMING = os.getenv("AI_CHATBOT_STREAMING", "false").lower() == "true"
BOT_MEMORY_ENABLED = os.getenv("AI_CHATBOT_MEMORY", "true").lower() == "true"
# Fallback responses when AI providers are not available
FALLBACK_RESPONSES = {
"greeting": [
"Hello! I'm an AI chatbot ready to help you.",
"Hi there! How can I assist you today?",
"Greetings! I'm here to chat and help with any questions you have."
],
"help": [
"I'm an advanced AI chatbot that can help with various topics. Just ask me anything!",
"I can assist with questions, have conversations, and provide information on many subjects.",
"Feel free to ask me questions or just chat - I'm here to help!"
],
"capabilities": [
"I support multiple AI providers, have configurable personalities, and maintain conversation context.",
"My capabilities include natural conversation, information retrieval, and adaptive personality responses.",
"I can remember our conversation context and adapt my responses based on configured personality traits."
],
"default": [
"That's interesting! Tell me more about that.",
"I understand. What would you like to discuss next?",
"Thanks for sharing! How can I help you further?",
"I see. Is there anything specific you'd like to know about?"
],
"error": [
"I apologize, but I'm having trouble processing that right now. Could you try rephrasing?",
"Something went wrong on my end. Could you ask that again?",
"I encountered an issue. Please try your question again."
]
}
class EnhancedAIChatbot:
"""Enhanced AI chatbot with advanced features."""
def __init__(self, session_name: str):
self.session_name = session_name
self.session_id = str(uuid.uuid4())
self.ai_provider = None
self.personality = None
self.conversation_context = None
self.initialized = False
# Initialize advanced features if available
if AI_PROVIDERS_AVAILABLE:
self._initialize_ai_features()
else:
logger.warning("Running in fallback mode - advanced AI features disabled")
def _initialize_ai_features(self):
"""Initialize AI provider, personality, and context management."""
try:
# Initialize personality
self.personality = personality_manager.create_personality_from_template(BOT_PERSONALITY)
if not self.personality:
logger.warning(f"Personality template '{BOT_PERSONALITY}' not found, using default")
self.personality = personality_manager.create_personality_from_template("helpful_assistant")
# Initialize AI provider
provider_type = AIProviderType(BOT_AI_PROVIDER)
self.ai_provider = ai_provider_manager.create_provider(provider_type)
ai_provider_manager.register_provider(f"{AGENT_NAME}_{self.session_id}", self.ai_provider)
# Initialize conversation context if memory is enabled
if BOT_MEMORY_ENABLED:
self.conversation_context = context_manager.get_or_create_context(
session_id=self.session_id,
bot_name=AGENT_NAME,
conversation_id=f"{AGENT_NAME}_{self.session_id}_{int(time.time())}"
)
self.initialized = True
logger.info(f"Enhanced AI chatbot initialized: provider={BOT_AI_PROVIDER}, personality={BOT_PERSONALITY}, memory={BOT_MEMORY_ENABLED}")
except Exception as e:
logger.error(f"Failed to initialize AI features: {e}")
self.initialized = False
async def generate_response(self, message: str) -> str:
"""Generate a response using AI provider with personality and context."""
if not self.initialized or not self.ai_provider:
return self._get_fallback_response(message)
try:
# Prepare conversation context
if self.conversation_context:
# Create a new AI conversation context with personality
ai_context = ConversationContext(
session_id=self.session_id,
bot_name=AGENT_NAME,
personality_prompt=self.personality.generate_system_prompt() if self.personality else None
)
# Add personality system message
if self.personality:
ai_context.add_message(MessageRole.SYSTEM, self.personality.generate_system_prompt())
# Add conversation history context
context_summary = context_manager.get_context_for_response(self.conversation_context.conversation_id)
if context_summary:
ai_context.add_message(MessageRole.SYSTEM, f"Conversation context: {context_summary}")
else:
# Simple context without memory
ai_context = ConversationContext(
session_id=self.session_id,
bot_name=AGENT_NAME
)
if self.personality:
ai_context.add_message(MessageRole.SYSTEM, self.personality.generate_system_prompt())
# Generate response
if BOT_STREAMING:
# For streaming, collect the full response
response_parts = []
async for chunk in self.ai_provider.stream_response(ai_context, message):
response_parts.append(chunk)
response = "".join(response_parts)
else:
response = await self.ai_provider.generate_response(ai_context, message)
# Store conversation turn in context manager
if self.conversation_context:
context_manager.add_conversation_turn(
conversation_id=self.conversation_context.conversation_id,
user_message=message,
bot_response=response,
context_used={"ai_provider": BOT_AI_PROVIDER, "personality": BOT_PERSONALITY},
metadata={"timestamp": time.time(), "streaming": BOT_STREAMING}
)
return response
except Exception as e:
logger.error(f"AI response generation failed: {e}")
return self._get_fallback_response(message, error=True)
def _get_fallback_response(self, message: str, error: bool = False) -> str:
"""Get fallback response when AI providers are unavailable."""
if error:
return FALLBACK_RESPONSES["error"][hash(message) % len(FALLBACK_RESPONSES["error"])]
message_lower = message.lower()
# Simple keyword matching for fallback responses
if any(word in message_lower for word in ["hello", "hi", "hey", "greetings"]):
return FALLBACK_RESPONSES["greeting"][hash(message) % len(FALLBACK_RESPONSES["greeting"])]
elif any(word in message_lower for word in ["help", "what can you do", "capabilities"]):
return FALLBACK_RESPONSES["help"][hash(message) % len(FALLBACK_RESPONSES["help"])]
elif any(word in message_lower for word in ["features", "abilities", "advanced"]):
return FALLBACK_RESPONSES["capabilities"][hash(message) % len(FALLBACK_RESPONSES["capabilities"])]
else:
return FALLBACK_RESPONSES["default"][hash(message) % len(FALLBACK_RESPONSES["default"])]
async def health_check(self) -> Dict[str, Any]:
"""Perform health check on bot components."""
health = {
"bot_name": AGENT_NAME,
"session_id": self.session_id,
"initialized": self.initialized,
"ai_providers_available": AI_PROVIDERS_AVAILABLE,
"configuration": {
"personality": BOT_PERSONALITY,
"ai_provider": BOT_AI_PROVIDER,
"streaming": BOT_STREAMING,
"memory_enabled": BOT_MEMORY_ENABLED
}
}
if self.initialized and self.ai_provider:
try:
provider_healthy = await self.ai_provider.health_check()
health["ai_provider_status"] = "healthy" if provider_healthy else "unhealthy"
except Exception as e:
health["ai_provider_status"] = f"error: {e}"
if self.personality:
health["personality_loaded"] = True
health["personality_traits"] = [trait.value for trait in self.personality.traits]
if self.conversation_context:
health["conversation_turns"] = len(self.conversation_context.turns)
health["context_summary"] = self.conversation_context.get_conversation_summary()
return health
# Global bot instance
_bot_instance: Optional[EnhancedAIChatbot] = None
def agent_info() -> Dict[str, str]:
"""Return agent information."""
return {
"name": AGENT_NAME,
"description": AGENT_DESCRIPTION,
"has_media": "false",
"features": [
"multi_provider_ai",
"personality_system",
"conversation_memory",
"streaming_responses",
"health_monitoring"
]
}
def create_agent_tracks(session_name: str) -> Dict[str, MediaStreamTrack]:
"""AI chatbot doesn't provide media tracks - it's chat-only."""
return {}
async def handle_chat_message(
chat_message: ChatMessageModel,
send_message_func: Callable[[str], Awaitable[None]]
) -> Optional[str]:
"""Handle incoming chat messages and provide AI-powered responses."""
global _bot_instance
try:
# Initialize bot instance if needed
if _bot_instance is None:
_bot_instance = EnhancedAIChatbot(chat_message.nick)
logger.info(f"Initialized enhanced AI chatbot for session: {chat_message.nick}")
# Generate response
response = await _bot_instance.generate_response(chat_message.message)
# Send response
if response:
await send_message_func(response)
logger.info(f"AI Chatbot responded to {chat_message.nick}: {response[:100]}...")
return response
except Exception as e:
logger.error(f"Error in AI chatbot: {e}")
error_response = "I apologize, but I encountered an error. Please try again."
await send_message_func(error_response)
return error_response
async def get_bot_status() -> Dict[str, Any]:
"""Get detailed bot status and health information."""
global _bot_instance
status = {
"agent_name": AGENT_NAME,
"agent_description": AGENT_DESCRIPTION,
"features_available": AI_PROVIDERS_AVAILABLE,
"configuration": {
"personality_template": BOT_PERSONALITY,
"ai_provider": BOT_AI_PROVIDER,
"streaming_enabled": BOT_STREAMING,
"memory_enabled": BOT_MEMORY_ENABLED
}
}
if _bot_instance:
health_info = await _bot_instance.health_check()
status.update(health_info)
else:
status["instance_status"] = "not_initialized"
# Add system-level information
if AI_PROVIDERS_AVAILABLE:
status["available_personalities"] = [
template.id for template in personality_manager.list_templates()
]
status["available_providers"] = ai_provider_manager.list_providers()
# Get context manager statistics
if BOT_MEMORY_ENABLED:
context_stats = context_manager.get_statistics()
status["conversation_statistics"] = context_stats
return status
# Additional helper functions for advanced features
async def switch_personality(personality_id: str) -> bool:
"""Switch bot personality at runtime."""
global _bot_instance
if not AI_PROVIDERS_AVAILABLE or not _bot_instance:
return False
try:
new_personality = personality_manager.create_personality_from_template(personality_id)
if new_personality:
_bot_instance.personality = new_personality
logger.info(f"Switched to personality: {personality_id}")
return True
except Exception as e:
logger.error(f"Failed to switch personality: {e}")
return False
async def switch_ai_provider(provider_type: str) -> bool:
"""Switch AI provider at runtime."""
global _bot_instance
if not AI_PROVIDERS_AVAILABLE or not _bot_instance:
return False
try:
provider_enum = AIProviderType(provider_type)
new_provider = ai_provider_manager.create_provider(provider_enum)
_bot_instance.ai_provider = new_provider
logger.info(f"Switched to AI provider: {provider_type}")
return True
except Exception as e:
logger.error(f"Failed to switch AI provider: {e}")
return False

View File

@ -0,0 +1,385 @@
"""
Conversation Context Management for Advanced Bot Management.
This module manages conversation context, memory, and state for enhanced
bot interactions with persistent conversation awareness.
"""
import json
import time
import os
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from pydantic import BaseModel, Field
from collections import defaultdict
from logger import logger
@dataclass
class ConversationTurn:
"""Individual turn in a conversation."""
turn_id: str
timestamp: float
user_message: str
bot_response: str
context_used: Dict[str, Any] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"turn_id": self.turn_id,
"timestamp": self.timestamp,
"user_message": self.user_message,
"bot_response": self.bot_response,
"context_used": self.context_used,
"metadata": self.metadata
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ConversationTurn':
"""Create from dictionary."""
return cls(**data)
class ConversationMemory(BaseModel):
"""Memory system for conversation context."""
# Core conversation data
session_id: str
bot_name: str
user_name: Optional[str] = None
conversation_id: str
# Conversation history
turns: List[ConversationTurn] = Field(default_factory=list)
created_at: float = Field(default_factory=time.time)
last_updated: float = Field(default_factory=time.time)
# Memory components
facts_learned: Dict[str, Any] = Field(default_factory=dict) # Facts about user/context
preferences: Dict[str, Any] = Field(default_factory=dict) # User preferences
topics_discussed: List[str] = Field(default_factory=list) # Topics covered
emotional_context: Dict[str, Any] = Field(default_factory=dict) # Emotional state
# Conversation state
current_topic: Optional[str] = None
conversation_stage: str = "greeting" # greeting, discussion, conclusion
user_intent: Optional[str] = None
bot_goals: List[str] = Field(default_factory=list)
# Memory management
max_turns: int = 100
max_facts: int = 50
memory_decay_factor: float = 0.95 # How quickly old memories fade
class Config:
arbitrary_types_allowed = True
def add_turn(self, turn: ConversationTurn):
"""Add a conversation turn to memory."""
self.turns.append(turn)
self.last_updated = time.time()
# Extract facts and context from the turn
self._extract_context_from_turn(turn)
# Trim history if needed
if len(self.turns) > self.max_turns:
self.turns = self.turns[-self.max_turns:]
def _extract_context_from_turn(self, turn: ConversationTurn):
"""Extract contextual information from a conversation turn."""
# Simple keyword-based fact extraction (can be enhanced with NLP)
user_message = turn.user_message.lower()
# Extract preferences
if "i like" in user_message or "i love" in user_message:
# Simple preference extraction
preference_start = max(user_message.find("i like"), user_message.find("i love"))
preference_text = user_message[preference_start:].split('.')[0]
self.preferences[f"preference_{len(self.preferences)}"] = preference_text
# Extract facts
if "my name is" in user_message:
name_start = user_message.find("my name is") + len("my name is")
name = user_message[name_start:].split()[0].strip()
if name:
self.facts_learned["user_name"] = name
self.user_name = name
# Topic tracking
if turn.metadata.get("detected_topics"):
for topic in turn.metadata["detected_topics"]:
if topic not in self.topics_discussed:
self.topics_discussed.append(topic)
# Emotional context (simple sentiment analysis)
emotional_indicators = {
"happy": ["happy", "great", "wonderful", "excited", "joy"],
"sad": ["sad", "unhappy", "disappointed", "depressed"],
"frustrated": ["frustrated", "annoyed", "angry", "upset"],
"confused": ["confused", "don't understand", "unclear", "puzzled"],
"satisfied": ["good", "thanks", "helpful", "satisfied"]
}
for emotion, indicators in emotional_indicators.items():
if any(indicator in user_message for indicator in indicators):
self.emotional_context["current_emotion"] = emotion
self.emotional_context["last_emotion_update"] = time.time()
break
def get_recent_context(self, turns: int = 5) -> List[ConversationTurn]:
"""Get recent conversation turns for context."""
return self.turns[-turns:] if self.turns else []
def get_relevant_facts(self, query: str) -> Dict[str, Any]:
"""Get facts relevant to a query."""
relevant_facts = {}
query_lower = query.lower()
for key, value in self.facts_learned.items():
if isinstance(value, str) and any(word in value.lower() for word in query_lower.split()):
relevant_facts[key] = value
return relevant_facts
def get_conversation_summary(self) -> str:
"""Generate a summary of the conversation."""
if not self.turns:
return "No conversation history."
summary_parts = []
if self.user_name:
summary_parts.append(f"User: {self.user_name}")
if self.topics_discussed:
topics_str = ", ".join(self.topics_discussed[:5])
summary_parts.append(f"Topics discussed: {topics_str}")
if self.preferences:
prefs = list(self.preferences.values())[:3]
summary_parts.append(f"User preferences: {'; '.join(prefs)}")
if self.emotional_context.get("current_emotion"):
summary_parts.append(f"Current mood: {self.emotional_context['current_emotion']}")
summary_parts.append(f"Conversation turns: {len(self.turns)}")
return " | ".join(summary_parts)
class ConversationContextManager:
"""Manager for conversation contexts and memory."""
def __init__(self, storage_path: Optional[str] = None):
self.storage_path = storage_path or "./conversation_contexts"
self.active_contexts: Dict[str, ConversationMemory] = {}
self.context_index: Dict[str, List[str]] = defaultdict(list) # bot_name -> conversation_ids
# Ensure storage directory exists
os.makedirs(self.storage_path, exist_ok=True)
# Load existing contexts
self._load_existing_contexts()
def _load_existing_contexts(self):
"""Load existing conversation contexts from storage."""
try:
context_files = [f for f in os.listdir(self.storage_path) if f.endswith('.json')]
for file in context_files:
try:
file_path = os.path.join(self.storage_path, file)
with open(file_path, 'r') as f:
data = json.load(f)
# Convert turn data back to ConversationTurn objects
turns = [ConversationTurn.from_dict(turn_data) for turn_data in data.get('turns', [])]
data['turns'] = turns
context = ConversationMemory(**data)
conversation_id = context.conversation_id
self.active_contexts[conversation_id] = context
self.context_index[context.bot_name].append(conversation_id)
except Exception as e:
logger.warning(f"Failed to load context from {file}: {e}")
logger.info(f"Loaded {len(self.active_contexts)} conversation contexts")
except Exception as e:
logger.error(f"Failed to load conversation contexts: {e}")
def get_or_create_context(
self,
session_id: str,
bot_name: str,
conversation_id: Optional[str] = None
) -> ConversationMemory:
"""Get existing context or create a new one."""
if conversation_id and conversation_id in self.active_contexts:
return self.active_contexts[conversation_id]
# Create new conversation ID if not provided
if not conversation_id:
conversation_id = f"{session_id}_{bot_name}_{int(time.time())}"
# Create new context
context = ConversationMemory(
session_id=session_id,
bot_name=bot_name,
conversation_id=conversation_id
)
self.active_contexts[conversation_id] = context
self.context_index[bot_name].append(conversation_id)
logger.info(f"Created new conversation context: {conversation_id}")
return context
def save_context(self, conversation_id: str):
"""Save a conversation context to storage."""
if conversation_id not in self.active_contexts:
logger.warning(f"Context {conversation_id} not found for saving")
return
context = self.active_contexts[conversation_id]
try:
# Convert to dict for serialization
data = context.model_dump()
# Convert ConversationTurn objects to dicts
data['turns'] = [turn.to_dict() for turn in context.turns]
file_path = os.path.join(self.storage_path, f"{conversation_id}.json")
with open(file_path, 'w') as f:
json.dump(data, f, indent=2)
logger.debug(f"Saved context: {conversation_id}")
except Exception as e:
logger.error(f"Failed to save context {conversation_id}: {e}")
def add_conversation_turn(
self,
conversation_id: str,
user_message: str,
bot_response: str,
context_used: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None
):
"""Add a conversation turn to the specified context."""
if conversation_id not in self.active_contexts:
logger.warning(f"Context {conversation_id} not found")
return
turn = ConversationTurn(
turn_id=f"{conversation_id}_{len(self.active_contexts[conversation_id].turns)}",
timestamp=time.time(),
user_message=user_message,
bot_response=bot_response,
context_used=context_used or {},
metadata=metadata or {}
)
self.active_contexts[conversation_id].add_turn(turn)
# Auto-save after each turn
self.save_context(conversation_id)
def get_context_for_response(self, conversation_id: str) -> Optional[str]:
"""Get formatted context for generating bot responses."""
if conversation_id not in self.active_contexts:
return None
context = self.active_contexts[conversation_id]
context_parts = []
# Add conversation summary
summary = context.get_conversation_summary()
if summary != "No conversation history.":
context_parts.append(f"Conversation context: {summary}")
# Add recent turns for immediate context
recent_turns = context.get_recent_context(3)
if recent_turns:
context_parts.append("Recent conversation:")
for turn in recent_turns:
context_parts.append(f"User: {turn.user_message}")
context_parts.append(f"Bot: {turn.bot_response}")
# Add relevant facts
if context.facts_learned:
facts_str = "; ".join([f"{k}: {v}" for k, v in list(context.facts_learned.items())[:3]])
context_parts.append(f"Known facts: {facts_str}")
# Add emotional context
if context.emotional_context.get("current_emotion"):
context_parts.append(f"User's current mood: {context.emotional_context['current_emotion']}")
return "\n".join(context_parts) if context_parts else None
def get_contexts_for_bot(self, bot_name: str) -> List[ConversationMemory]:
"""Get all contexts for a specific bot."""
conversation_ids = self.context_index.get(bot_name, [])
return [self.active_contexts[cid] for cid in conversation_ids if cid in self.active_contexts]
def cleanup_old_contexts(self, max_age_days: int = 30):
"""Clean up old conversation contexts."""
current_time = time.time()
max_age_seconds = max_age_days * 24 * 60 * 60
contexts_to_remove = []
for conversation_id, context in self.active_contexts.items():
if current_time - context.last_updated > max_age_seconds:
contexts_to_remove.append(conversation_id)
for conversation_id in contexts_to_remove:
context = self.active_contexts[conversation_id]
# Remove from index
if context.bot_name in self.context_index:
if conversation_id in self.context_index[context.bot_name]:
self.context_index[context.bot_name].remove(conversation_id)
# Remove context file
try:
file_path = os.path.join(self.storage_path, f"{conversation_id}.json")
if os.path.exists(file_path):
os.remove(file_path)
except Exception as e:
logger.warning(f"Failed to remove context file {conversation_id}: {e}")
# Remove from active contexts
del self.active_contexts[conversation_id]
if contexts_to_remove:
logger.info(f"Cleaned up {len(contexts_to_remove)} old conversation contexts")
def get_statistics(self) -> Dict[str, Any]:
"""Get statistics about conversation contexts."""
total_contexts = len(self.active_contexts)
total_turns = sum(len(context.turns) for context in self.active_contexts.values())
bot_stats = {}
for bot_name, conversation_ids in self.context_index.items():
active_conversations = [cid for cid in conversation_ids if cid in self.active_contexts]
bot_stats[bot_name] = {
"active_conversations": len(active_conversations),
"total_turns": sum(len(self.active_contexts[cid].turns) for cid in active_conversations)
}
return {
"total_contexts": total_contexts,
"total_turns": total_turns,
"average_turns_per_context": total_turns / total_contexts if total_contexts > 0 else 0,
"bot_statistics": bot_stats
}
# Global context manager instance
context_manager = ConversationContextManager()

View File

@ -0,0 +1,85 @@
{
"ai_chatbot": {
"personality": "technical_expert",
"ai_provider": "openai",
"streaming": false,
"memory_enabled": true,
"advanced_features": true,
"description": "Advanced AI chatbot with OpenAI integration and helpful personality",
"features": [
"multi_provider_ai",
"personality_system",
"conversation_memory",
"streaming_responses"
]
},
"technical_expert": {
"personality": "technical_expert",
"ai_provider": "anthropic",
"streaming": false,
"memory_enabled": true,
"advanced_features": true,
"description": "Technical expert bot with detailed explanations and Anthropic Claude integration",
"features": [
"technical_expertise",
"detailed_explanations",
"conversation_memory"
]
},
"creative_companion": {
"personality": "creative_companion",
"ai_provider": "local",
"streaming": true,
"memory_enabled": true,
"advanced_features": true,
"description": "Creative writing and brainstorming companion with local AI model",
"features": [
"creative_writing",
"brainstorming",
"streaming_responses",
"conversation_memory"
]
},
"business_advisor": {
"personality": "business_advisor",
"ai_provider": "openai",
"streaming": false,
"memory_enabled": true,
"advanced_features": true,
"description": "Business and strategic advisor with professional communication style",
"features": [
"business_analysis",
"strategic_planning",
"professional_communication",
"conversation_memory"
]
},
"comedy_bot": {
"personality": "comedy_bot",
"ai_provider": "local",
"streaming": true,
"memory_enabled": false,
"advanced_features": true,
"description": "Entertainment-focused bot with humor and casual conversation",
"features": [
"humor_generation",
"casual_conversation",
"entertainment",
"streaming_responses"
]
},
"wise_mentor": {
"personality": "wise_mentor",
"ai_provider": "anthropic",
"streaming": false,
"memory_enabled": true,
"advanced_features": true,
"description": "Wise mentor providing thoughtful guidance and life advice",
"features": [
"life_advice",
"thoughtful_guidance",
"wisdom_sharing",
"conversation_memory"
]
}
}

View File

@ -0,0 +1,449 @@
"""
Bot Personality System for Advanced Bot Management.
This module provides personality templates, configuration, and behavior
management for creating diverse and engaging bot characters.
"""
import json
import os
from typing import Dict, List, Optional, Any
from enum import Enum
from pydantic import BaseModel, Field
from logger import logger
class PersonalityTrait(str, Enum):
"""Personality traits that can be assigned to bots."""
FRIENDLY = "friendly"
PROFESSIONAL = "professional"
HUMOROUS = "humorous"
SARCASTIC = "sarcastic"
EMPATHETIC = "empathetic"
ANALYTICAL = "analytical"
CREATIVE = "creative"
ASSERTIVE = "assertive"
PATIENT = "patient"
ENTHUSIASTIC = "enthusiastic"
MYSTERIOUS = "mysterious"
WISE = "wise"
PLAYFUL = "playful"
FORMAL = "formal"
CASUAL = "casual"
class CommunicationStyle(str, Enum):
"""Communication styles for bot responses."""
CONCISE = "concise"
DETAILED = "detailed"
CONVERSATIONAL = "conversational"
TECHNICAL = "technical"
STORYTELLING = "storytelling"
QUESTION_BASED = "question_based"
SUPPORTIVE = "supportive"
DIRECT = "direct"
DIPLOMATIC = "diplomatic"
class ExpertiseDomain(str, Enum):
"""Domains of expertise for specialized bots."""
GENERAL = "general"
TECHNOLOGY = "technology"
SCIENCE = "science"
ARTS = "arts"
BUSINESS = "business"
EDUCATION = "education"
HEALTH = "health"
ENTERTAINMENT = "entertainment"
SPORTS = "sports"
COOKING = "cooking"
TRAVEL = "travel"
FINANCE = "finance"
LEGAL = "legal"
PSYCHOLOGY = "psychology"
PHILOSOPHY = "philosophy"
class BotPersonality(BaseModel):
"""Complete personality configuration for a bot."""
# Identity
name: str = "Assistant"
description: str = "A helpful AI assistant"
backstory: Optional[str] = None
# Core personality
traits: List[PersonalityTrait] = Field(default_factory=lambda: [PersonalityTrait.FRIENDLY])
communication_style: CommunicationStyle = CommunicationStyle.CONVERSATIONAL
expertise_domains: List[ExpertiseDomain] = Field(default_factory=lambda: [ExpertiseDomain.GENERAL])
# Behavior settings
response_length_preference: str = "medium" # short, medium, long
emoji_usage: bool = True
formality_level: float = 0.5 # 0.0 = very casual, 1.0 = very formal
humor_level: float = 0.3 # 0.0 = no humor, 1.0 = very humorous
empathy_level: float = 0.7 # 0.0 = analytical only, 1.0 = highly empathetic
# Language preferences
preferred_language: str = "en"
technical_complexity: float = 0.5 # 0.0 = simple terms, 1.0 = technical jargon
# Conversation patterns
greeting_style: str = "warm" # formal, warm, casual, quirky
farewell_style: str = "friendly" # formal, friendly, casual, memorable
# Custom prompts and examples
system_prompt: Optional[str] = None
example_responses: List[Dict[str, str]] = Field(default_factory=list)
custom_instructions: List[str] = Field(default_factory=list)
# Behavioral boundaries
topics_to_avoid: List[str] = Field(default_factory=list)
preferred_topics: List[str] = Field(default_factory=list)
conversation_limits: Dict[str, Any] = Field(default_factory=dict)
def generate_system_prompt(self) -> str:
"""Generate a comprehensive system prompt based on personality."""
if self.system_prompt:
return self.system_prompt
prompt_parts = []
# Identity
prompt_parts.append(f"You are {self.name}, {self.description}")
if self.backstory:
prompt_parts.append(f"Background: {self.backstory}")
# Personality traits
if self.traits:
traits_str = ", ".join([trait.value for trait in self.traits])
prompt_parts.append(f"Your personality is: {traits_str}")
# Communication style
style_desc = self._get_style_description()
prompt_parts.append(f"Communication style: {style_desc}")
# Expertise
if self.expertise_domains and self.expertise_domains != [ExpertiseDomain.GENERAL]:
domains_str = ", ".join([domain.value for domain in self.expertise_domains])
prompt_parts.append(f"Areas of expertise: {domains_str}")
# Behavior guidelines
behavior_guidelines = self._generate_behavior_guidelines()
if behavior_guidelines:
prompt_parts.append(f"Behavior guidelines: {behavior_guidelines}")
# Custom instructions
if self.custom_instructions:
prompt_parts.append("Additional instructions:")
prompt_parts.extend([f"- {instruction}" for instruction in self.custom_instructions])
# Topic boundaries
if self.topics_to_avoid:
avoid_str = ", ".join(self.topics_to_avoid)
prompt_parts.append(f"Avoid discussing: {avoid_str}")
if self.preferred_topics:
prefer_str = ", ".join(self.preferred_topics)
prompt_parts.append(f"Preferred discussion topics: {prefer_str}")
return "\n\n".join(prompt_parts)
def _get_style_description(self) -> str:
"""Get communication style description."""
style_map = {
CommunicationStyle.CONCISE: "Keep responses brief and to the point",
CommunicationStyle.DETAILED: "Provide comprehensive, detailed explanations",
CommunicationStyle.CONVERSATIONAL: "Use natural, conversational language",
CommunicationStyle.TECHNICAL: "Use precise, technical terminology when appropriate",
CommunicationStyle.STORYTELLING: "Frame responses as engaging narratives",
CommunicationStyle.QUESTION_BASED: "Ask follow-up questions to better understand needs",
CommunicationStyle.SUPPORTIVE: "Provide encouraging and supportive responses",
CommunicationStyle.DIRECT: "Be straightforward and direct in communication",
CommunicationStyle.DIPLOMATIC: "Use diplomatic and tactful language"
}
return style_map.get(self.communication_style, "Communicate naturally")
def _generate_behavior_guidelines(self) -> str:
"""Generate behavior guidelines based on settings."""
guidelines = []
if self.formality_level > 0.7:
guidelines.append("maintain a formal and professional tone")
elif self.formality_level < 0.3:
guidelines.append("use casual and relaxed language")
if self.humor_level > 0.5:
guidelines.append("incorporate appropriate humor when suitable")
elif self.humor_level < 0.2:
guidelines.append("maintain a serious and professional demeanor")
if self.empathy_level > 0.7:
guidelines.append("show high emotional intelligence and empathy")
elif self.empathy_level < 0.3:
guidelines.append("focus on logical and analytical responses")
if self.emoji_usage:
guidelines.append("use emojis appropriately to enhance communication")
if self.response_length_preference == "short":
guidelines.append("keep responses concise")
elif self.response_length_preference == "long":
guidelines.append("provide detailed explanations")
return "; ".join(guidelines) if guidelines else ""
class PersonalityTemplate(BaseModel):
"""Pre-defined personality templates for common bot types."""
id: str
name: str
description: str
personality: BotPersonality
category: str = "general"
tags: List[str] = Field(default_factory=list)
class Config:
extra = "allow"
class PersonalityManager:
"""Manager for bot personalities and templates."""
def __init__(self):
self.templates: Dict[str, PersonalityTemplate] = {}
self.custom_personalities: Dict[str, BotPersonality] = {}
self._load_default_templates()
def _load_default_templates(self):
"""Load default personality templates."""
default_templates = [
PersonalityTemplate(
id="helpful_assistant",
name="Helpful Assistant",
description="A friendly and helpful general-purpose assistant",
personality=BotPersonality(
name="Assistant",
description="a helpful and friendly AI assistant",
traits=[PersonalityTrait.FRIENDLY, PersonalityTrait.EMPATHETIC, PersonalityTrait.PATIENT],
communication_style=CommunicationStyle.CONVERSATIONAL,
expertise_domains=[ExpertiseDomain.GENERAL],
formality_level=0.4,
humor_level=0.3,
empathy_level=0.8
),
category="general",
tags=["helpful", "friendly", "general"]
),
PersonalityTemplate(
id="technical_expert",
name="Technical Expert",
description="A knowledgeable technical specialist",
personality=BotPersonality(
name="TechBot",
description="a knowledgeable technical expert and problem solver",
traits=[PersonalityTrait.ANALYTICAL, PersonalityTrait.PROFESSIONAL, PersonalityTrait.PATIENT],
communication_style=CommunicationStyle.TECHNICAL,
expertise_domains=[ExpertiseDomain.TECHNOLOGY, ExpertiseDomain.SCIENCE],
formality_level=0.7,
humor_level=0.1,
empathy_level=0.4,
technical_complexity=0.8,
emoji_usage=False
),
category="technical",
tags=["technical", "expert", "analytical"]
),
PersonalityTemplate(
id="creative_companion",
name="Creative Companion",
description="An imaginative and inspiring creative partner",
personality=BotPersonality(
name="CreativeBot",
description="an imaginative and inspiring creative companion",
traits=[PersonalityTrait.CREATIVE, PersonalityTrait.ENTHUSIASTIC, PersonalityTrait.PLAYFUL],
communication_style=CommunicationStyle.STORYTELLING,
expertise_domains=[ExpertiseDomain.ARTS, ExpertiseDomain.ENTERTAINMENT],
formality_level=0.2,
humor_level=0.7,
empathy_level=0.6,
emoji_usage=True,
custom_instructions=[
"Encourage creative thinking and exploration",
"Offer multiple perspectives and ideas",
"Use vivid and imaginative language"
]
),
category="creative",
tags=["creative", "artistic", "inspiring"]
),
PersonalityTemplate(
id="business_advisor",
name="Business Advisor",
description="A professional business consultant and strategist",
personality=BotPersonality(
name="BusinessBot",
description="a professional business advisor and strategic consultant",
traits=[PersonalityTrait.PROFESSIONAL, PersonalityTrait.ANALYTICAL, PersonalityTrait.ASSERTIVE],
communication_style=CommunicationStyle.DIRECT,
expertise_domains=[ExpertiseDomain.BUSINESS, ExpertiseDomain.FINANCE],
formality_level=0.8,
humor_level=0.2,
empathy_level=0.5,
emoji_usage=False,
response_length_preference="detailed",
custom_instructions=[
"Provide actionable business insights",
"Focus on practical solutions and ROI",
"Use business terminology appropriately"
]
),
category="business",
tags=["business", "professional", "strategic"]
),
PersonalityTemplate(
id="comedy_bot",
name="Comedy Bot",
description="A humorous entertainer that loves jokes and wordplay",
personality=BotPersonality(
name="ComedyBot",
description="a humorous entertainer who loves jokes, puns, and making people laugh",
traits=[PersonalityTrait.HUMOROUS, PersonalityTrait.PLAYFUL, PersonalityTrait.ENTHUSIASTIC],
communication_style=CommunicationStyle.CONVERSATIONAL,
expertise_domains=[ExpertiseDomain.ENTERTAINMENT],
formality_level=0.1,
humor_level=0.9,
empathy_level=0.6,
emoji_usage=True,
greeting_style="quirky",
farewell_style="memorable",
custom_instructions=[
"Look for opportunities to make appropriate jokes",
"Use wordplay and puns when fitting",
"Keep humor light and positive"
]
),
category="entertainment",
tags=["funny", "jokes", "entertainment"]
),
PersonalityTemplate(
id="wise_mentor",
name="Wise Mentor",
description="A thoughtful mentor who provides wisdom and guidance",
personality=BotPersonality(
name="Mentor",
description="a wise and thoughtful mentor who provides guidance and wisdom",
traits=[PersonalityTrait.WISE, PersonalityTrait.EMPATHETIC, PersonalityTrait.PATIENT],
communication_style=CommunicationStyle.SUPPORTIVE,
expertise_domains=[ExpertiseDomain.PSYCHOLOGY, ExpertiseDomain.PHILOSOPHY, ExpertiseDomain.EDUCATION],
formality_level=0.5,
humor_level=0.3,
empathy_level=0.9,
response_length_preference="detailed",
custom_instructions=[
"Ask thoughtful questions to understand deeper needs",
"Provide perspective and context for challenges",
"Encourage personal growth and reflection"
]
),
category="guidance",
tags=["wise", "mentor", "supportive"]
)
]
for template in default_templates:
self.templates[template.id] = template
logger.info(f"Loaded {len(default_templates)} default personality templates")
def get_template(self, template_id: str) -> Optional[PersonalityTemplate]:
"""Get a personality template by ID."""
return self.templates.get(template_id)
def list_templates(self, category: Optional[str] = None) -> List[PersonalityTemplate]:
"""List all templates, optionally filtered by category."""
templates = list(self.templates.values())
if category:
templates = [t for t in templates if t.category == category]
return templates
def get_categories(self) -> List[str]:
"""Get all available template categories."""
categories = set(t.category for t in self.templates.values())
return sorted(list(categories))
def create_personality_from_template(self, template_id: str, customizations: Optional[Dict[str, Any]] = None) -> Optional[BotPersonality]:
"""Create a personality instance from a template with optional customizations."""
template = self.get_template(template_id)
if not template:
return None
# Start with template personality
personality_dict = template.personality.model_dump()
# Apply customizations
if customizations:
for key, value in customizations.items():
if key in personality_dict:
personality_dict[key] = value
return BotPersonality(**personality_dict)
def save_custom_personality(self, personality_id: str, personality: BotPersonality):
"""Save a custom personality."""
self.custom_personalities[personality_id] = personality
logger.info(f"Saved custom personality: {personality_id}")
def get_custom_personality(self, personality_id: str) -> Optional[BotPersonality]:
"""Get a custom personality by ID."""
return self.custom_personalities.get(personality_id)
def list_custom_personalities(self) -> List[str]:
"""List all custom personality IDs."""
return list(self.custom_personalities.keys())
def export_personality(self, personality: BotPersonality) -> str:
"""Export a personality to JSON string."""
return personality.model_dump_json(indent=2)
def import_personality(self, json_str: str) -> BotPersonality:
"""Import a personality from JSON string."""
return BotPersonality.model_validate_json(json_str)
def load_personalities_from_file(self, file_path: str):
"""Load custom personalities from a JSON file."""
try:
if os.path.exists(file_path):
with open(file_path, 'r') as f:
data = json.load(f)
for personality_id, personality_data in data.items():
personality = BotPersonality(**personality_data)
self.custom_personalities[personality_id] = personality
logger.info(f"Loaded {len(data)} custom personalities from {file_path}")
except Exception as e:
logger.error(f"Failed to load personalities from {file_path}: {e}")
def save_personalities_to_file(self, file_path: str):
"""Save custom personalities to a JSON file."""
try:
data = {}
for personality_id, personality in self.custom_personalities.items():
data[personality_id] = personality.model_dump()
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'w') as f:
json.dump(data, f, indent=2)
logger.info(f"Saved {len(data)} custom personalities to {file_path}")
except Exception as e:
logger.error(f"Failed to save personalities to {file_path}: {e}")
# Global personality manager instance
personality_manager = PersonalityManager()

View File

@ -0,0 +1,477 @@
"""Step 5B Integration: Enhanced Bot Orchestrator with Advanced Bot Management.
This module demonstrates how the new advanced bot management features integrate
with the existing bot orchestrator to provide:
1. AI Provider-powered bots with multiple backend support
2. Personality-driven bot behavior and responses
3. Conversation context and memory management
4. Dynamic bot configuration and health monitoring
This integration enhances the existing bot discovery and management system
without breaking compatibility with existing bot implementations.
"""
import os
import json
import time
import asyncio
from typing import Dict, Optional, Any
from pathlib import Path
# Import existing bot orchestrator functionality
from bot_orchestrator import discover_bots
# Import advanced bot management modules
try:
from voicebot.ai_providers import ai_provider_manager, AIProviderType
from voicebot.personality_system import personality_manager
from voicebot.conversation_context import context_manager
AI_FEATURES_AVAILABLE = True
except ImportError as e:
print(f"Warning: Advanced AI features not available: {e}")
AI_FEATURES_AVAILABLE = False
from logger import logger
class EnhancedBotOrchestrator:
"""Enhanced bot orchestrator with Step 5B advanced management features."""
def __init__(self):
self.enhanced_bots = {} # Enhanced bots with AI features
self.bot_configurations = {} # Bot-specific configurations
self.health_stats = {} # Health monitoring data
# Load configurations
self._load_bot_configurations()
# Initialize AI systems if available
if AI_FEATURES_AVAILABLE:
self._initialize_ai_systems()
def _load_bot_configurations(self):
"""Load bot configurations from JSON file."""
config_path = Path(__file__).parent / "enhanced_bot_configs.json"
default_configs = {
"ai_chatbot": {
"personality": "helpful_assistant",
"ai_provider": "openai",
"streaming": True,
"memory_enabled": True,
"advanced_features": True
},
"technical_expert": {
"personality": "technical_expert",
"ai_provider": "anthropic",
"streaming": False,
"memory_enabled": True,
"advanced_features": True
},
"creative_companion": {
"personality": "creative_companion",
"ai_provider": "local",
"streaming": True,
"memory_enabled": True,
"advanced_features": True
}
}
try:
if config_path.exists():
with open(config_path, 'r') as f:
self.bot_configurations = json.load(f)
else:
self.bot_configurations = default_configs
self._save_bot_configurations()
except Exception as e:
logger.error(f"Failed to load bot configurations: {e}")
self.bot_configurations = default_configs
def _save_bot_configurations(self):
"""Save bot configurations to JSON file."""
config_path = Path(__file__).parent / "enhanced_bot_configs.json"
try:
with open(config_path, 'w') as f:
json.dump(self.bot_configurations, f, indent=2)
except Exception as e:
logger.error(f"Failed to save bot configurations: {e}")
def _initialize_ai_systems(self):
"""Initialize AI provider and personality systems."""
try:
# Ensure default personality templates are loaded
personality_manager.ensure_default_templates()
# Register available AI providers based on environment
providers_to_init = []
if os.getenv("OPENAI_API_KEY"):
providers_to_init.append(AIProviderType.OPENAI)
if os.getenv("ANTHROPIC_API_KEY"):
providers_to_init.append(AIProviderType.ANTHROPIC)
# Local provider is always available
providers_to_init.append(AIProviderType.LOCAL)
for provider_type in providers_to_init:
try:
provider = ai_provider_manager.create_provider(provider_type)
ai_provider_manager.register_provider(f"system_{provider_type.value}", provider)
logger.info(f"Initialized AI provider: {provider_type.value}")
except Exception as e:
logger.warning(f"Failed to initialize provider {provider_type.value}: {e}")
logger.info("AI systems initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize AI systems: {e}")
async def discover_enhanced_bots(self) -> Dict[str, Dict[str, Any]]:
"""Discover bots with enhanced information about AI capabilities."""
# Start with standard bot discovery
standard_bots = discover_bots() # Returns List[BotInfoModel]
enhanced_bot_info = {}
# Convert BotInfoModel list to dict and enhance with AI capabilities
for bot_info in standard_bots:
bot_name = bot_info.name
bot_info_dict = {
"name": bot_name,
"description": bot_info.description,
"has_media": bot_info.has_media,
"standard_info": {
"name": bot_name,
"description": bot_info.description,
"has_media": bot_info.has_media
},
"enhanced_features": False,
"ai_capabilities": {},
"health_status": "unknown"
}
# Check if bot supports enhanced features
if bot_name in self.bot_configurations:
config = self.bot_configurations[bot_name]
if config.get("advanced_features", False):
bot_info_dict["enhanced_features"] = True
bot_info_dict["ai_capabilities"] = {
"personality": config.get("personality", "default"),
"ai_provider": config.get("ai_provider", "local"),
"streaming": config.get("streaming", False),
"memory_enabled": config.get("memory_enabled", False)
}
# Check bot health if it supports it (would need to import the bot module)
try:
bot_module_path = f"voicebot.bots.{bot_name}"
bot_module = __import__(bot_module_path, fromlist=[bot_name])
if hasattr(bot_module, 'get_bot_status'):
status = await bot_module.get_bot_status()
bot_info_dict["health_status"] = "healthy"
bot_info_dict["detailed_status"] = status
except Exception as e:
bot_info_dict["health_status"] = f"import_error: {e}"
enhanced_bot_info[bot_name] = bot_info_dict
return enhanced_bot_info
async def create_enhanced_bot_instance(self, bot_name: str, session_name: str) -> Optional[Any]:
"""Create an enhanced bot instance with AI features configured."""
if not AI_FEATURES_AVAILABLE:
logger.warning(f"Cannot create enhanced bot {bot_name} - AI features not available")
return None
if bot_name not in self.bot_configurations:
logger.warning(f"No configuration found for enhanced bot: {bot_name}")
return None
config = self.bot_configurations[bot_name]
try:
# Set environment variables for the bot based on configuration
os.environ[f"{bot_name.upper()}_PERSONALITY"] = config.get("personality", "helpful_assistant")
os.environ[f"{bot_name.upper()}_PROVIDER"] = config.get("ai_provider", "local")
os.environ[f"{bot_name.upper()}_STREAMING"] = str(config.get("streaming", False)).lower()
os.environ[f"{bot_name.upper()}_MEMORY"] = str(config.get("memory_enabled", False)).lower()
# Import and create the bot
bot_module_path = f"voicebot.bots.{bot_name}"
bot_module = __import__(bot_module_path, fromlist=[bot_name])
# If the bot has a specific initialization function, use it
if hasattr(bot_module, 'create_enhanced_instance'):
bot_instance = await bot_module.create_enhanced_instance(session_name, config)
else:
# Create standard bot instance
bot_instance = bot_module
self.enhanced_bots[f"{bot_name}_{session_name}"] = {
"instance": bot_instance,
"config": config,
"session": session_name,
"created_at": time.time()
}
logger.info(f"Created enhanced bot instance: {bot_name} for session {session_name}")
return bot_instance
except Exception as e:
logger.error(f"Failed to create enhanced bot instance {bot_name}: {e}")
return None
async def monitor_bot_health(self) -> Dict[str, Any]:
"""Monitor health of all enhanced bots and AI systems."""
health_report = {
"timestamp": time.time(),
"ai_systems_available": AI_FEATURES_AVAILABLE,
"enhanced_bots": {},
"ai_providers": {},
"personality_system": {},
"conversation_contexts": {}
}
if not AI_FEATURES_AVAILABLE:
health_report["status"] = "limited - AI features disabled"
return health_report
try:
# Check AI providers
for provider_id, provider in ai_provider_manager.list_providers().items():
try:
provider_instance = ai_provider_manager.get_provider(provider_id)
if provider_instance:
is_healthy = await provider_instance.health_check()
health_report["ai_providers"][provider_id] = {
"status": "healthy" if is_healthy else "unhealthy",
"type": provider.value if hasattr(provider, 'value') else str(provider)
}
except Exception as e:
health_report["ai_providers"][provider_id] = {
"status": f"error: {e}",
"type": "unknown"
}
# Check personality system
try:
templates = personality_manager.list_templates()
health_report["personality_system"] = {
"status": "healthy",
"available_templates": len(templates),
"template_ids": [t.id for t in templates]
}
except Exception as e:
health_report["personality_system"] = {
"status": f"error: {e}"
}
# Check conversation context system
try:
context_stats = context_manager.get_statistics()
health_report["conversation_contexts"] = {
"status": "healthy",
"statistics": context_stats
}
except Exception as e:
health_report["conversation_contexts"] = {
"status": f"error: {e}"
}
# Check enhanced bot instances
for bot_key, bot_data in self.enhanced_bots.items():
try:
bot_instance = bot_data["instance"]
if hasattr(bot_instance, 'health_check'):
bot_health = await bot_instance.health_check()
health_report["enhanced_bots"][bot_key] = {
"status": "healthy",
"details": bot_health,
"uptime": time.time() - bot_data["created_at"]
}
else:
health_report["enhanced_bots"][bot_key] = {
"status": "unknown - no health check available",
"uptime": time.time() - bot_data["created_at"]
}
except Exception as e:
health_report["enhanced_bots"][bot_key] = {
"status": f"error: {e}",
"uptime": time.time() - bot_data.get("created_at", time.time())
}
health_report["status"] = "operational"
except Exception as e:
health_report["status"] = f"system_error: {e}"
# Store health stats for trending
self.health_stats[int(time.time())] = health_report
# Keep only last 24 hours of health stats
cutoff_time = time.time() - (24 * 60 * 60)
self.health_stats = {
timestamp: stats for timestamp, stats in self.health_stats.items()
if timestamp > cutoff_time
}
return health_report
async def configure_bot_runtime(self, bot_name: str, new_config: Dict[str, Any]) -> bool:
"""Dynamically reconfigure a bot at runtime."""
if bot_name not in self.bot_configurations:
logger.error(f"Bot {bot_name} not found in configurations")
return False
try:
# Update configuration
old_config = self.bot_configurations[bot_name].copy()
self.bot_configurations[bot_name].update(new_config)
# Save updated configuration
self._save_bot_configurations()
# If there are active instances, try to update them
updated_instances = []
for bot_key, bot_data in self.enhanced_bots.items():
if bot_key.startswith(f"{bot_name}_"):
bot_instance = bot_data["instance"]
# Try to update personality if changed
if "personality" in new_config and hasattr(bot_instance, 'switch_personality'):
success = await bot_instance.switch_personality(new_config["personality"])
if success:
updated_instances.append(f"{bot_key} personality")
# Try to update AI provider if changed
if "ai_provider" in new_config and hasattr(bot_instance, 'switch_ai_provider'):
success = await bot_instance.switch_ai_provider(new_config["ai_provider"])
if success:
updated_instances.append(f"{bot_key} provider")
# Update bot data configuration
bot_data["config"] = self.bot_configurations[bot_name]
logger.info(f"Bot {bot_name} configuration updated. Active instances updated: {updated_instances}")
return True
except Exception as e:
# Rollback configuration on error
self.bot_configurations[bot_name] = old_config
logger.error(f"Failed to configure bot {bot_name}: {e}")
return False
def get_bot_analytics(self) -> Dict[str, Any]:
"""Get analytics and usage statistics for enhanced bots."""
analytics = {
"enhanced_bots_count": len(self.enhanced_bots),
"configurations_count": len(self.bot_configurations),
"health_history_points": len(self.health_stats),
"bot_breakdown": {},
"feature_usage": {
"ai_providers": {},
"personalities": {},
"memory_enabled": 0,
"streaming_enabled": 0
}
}
# Analyze bot configurations
for bot_name, config in self.bot_configurations.items():
analytics["bot_breakdown"][bot_name] = {
"enhanced_features": config.get("advanced_features", False),
"ai_provider": config.get("ai_provider", "none"),
"personality": config.get("personality", "none"),
"active_instances": sum(1 for key in self.enhanced_bots.keys() if key.startswith(f"{bot_name}_"))
}
# Count feature usage
provider = config.get("ai_provider", "none")
analytics["feature_usage"]["ai_providers"][provider] = analytics["feature_usage"]["ai_providers"].get(provider, 0) + 1
personality = config.get("personality", "none")
analytics["feature_usage"]["personalities"][personality] = analytics["feature_usage"]["personalities"].get(personality, 0) + 1
if config.get("memory_enabled", False):
analytics["feature_usage"]["memory_enabled"] += 1
if config.get("streaming", False):
analytics["feature_usage"]["streaming_enabled"] += 1
# Add conversation context statistics if available
if AI_FEATURES_AVAILABLE:
try:
context_stats = context_manager.get_statistics()
analytics["conversation_statistics"] = context_stats
except Exception as e:
analytics["conversation_statistics"] = {"error": str(e)}
return analytics
# Global enhanced orchestrator instance
enhanced_orchestrator = EnhancedBotOrchestrator()
async def demo_step_5b_integration():
"""Demonstrate Step 5B integration capabilities."""
print("=== Step 5B Advanced Bot Management Demo ===\n")
# 1. Discover enhanced bots
print("1. Discovering bots with enhanced capabilities...")
enhanced_bots = await enhanced_orchestrator.discover_enhanced_bots()
for bot_name, info in enhanced_bots.items():
print(f" Bot: {bot_name}")
print(f" Enhanced: {info['enhanced_features']}")
if info['enhanced_features']:
print(f" AI Capabilities: {info['ai_capabilities']}")
print(f" Health: {info['health_status']}")
print()
# 2. Create enhanced bot instance
print("2. Creating enhanced AI chatbot instance...")
bot_instance = await enhanced_orchestrator.create_enhanced_bot_instance("ai_chatbot", "demo_session")
if bot_instance:
print(" ✓ Enhanced AI chatbot created successfully")
else:
print(" ✗ Failed to create enhanced bot")
print()
# 3. Monitor system health
print("3. Monitoring system health...")
health_report = await enhanced_orchestrator.monitor_bot_health()
print(f" System Status: {health_report['status']}")
print(f" AI Features Available: {health_report['ai_systems_available']}")
if health_report['ai_systems_available']:
print(f" AI Providers: {len(health_report['ai_providers'])} registered")
print(f" Personality Templates: {health_report['personality_system'].get('available_templates', 0)}")
print(f" Enhanced Bot Instances: {len(health_report['enhanced_bots'])}")
print()
# 4. Runtime configuration
print("4. Demonstrating runtime configuration...")
config_success = await enhanced_orchestrator.configure_bot_runtime("ai_chatbot", {
"personality": "technical_expert",
"streaming": False
})
print(f" Configuration Update: {'✓ Success' if config_success else '✗ Failed'}")
print()
# 5. Analytics
print("5. Bot analytics and usage statistics...")
analytics = enhanced_orchestrator.get_bot_analytics()
print(f" Enhanced Bots: {analytics['enhanced_bots_count']}")
print(f" Configurations: {analytics['configurations_count']}")
print(" Feature Usage:")
for feature, usage in analytics['feature_usage'].items():
print(f" {feature}: {usage}")
print()
print("=== Step 5B Integration Demo Complete ===")
if __name__ == "__main__":
# Run the demo
asyncio.run(demo_step_5b_integration())

195
voicebot/test_step_5b.py Normal file
View File

@ -0,0 +1,195 @@
"""
Simple test to verify Step 5B enhanced bot functionality.
This test verifies that the enhanced bot components work correctly
when integrated with the existing voicebot system.
"""
import asyncio
import os
import time
# Set up test environment variables
os.environ["AI_CHATBOT_PERSONALITY"] = "helpful_assistant"
os.environ["AI_CHATBOT_PROVIDER"] = "local" # Use local provider for testing
os.environ["AI_CHATBOT_STREAMING"] = "false"
os.environ["AI_CHATBOT_MEMORY"] = "true"
# Import test modules
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from shared.models import ChatMessageModel
async def test_enhanced_ai_chatbot():
"""Test the enhanced AI chatbot functionality."""
print("Testing Enhanced AI Chatbot...")
try:
# Import the enhanced bot
from voicebot.bots.ai_chatbot import handle_chat_message, get_bot_status
# Create a mock send function
responses = []
async def mock_send(message: str):
responses.append(message)
print(f"Bot Response: {message}")
# Test message handling
test_message = ChatMessageModel(
id="test_message_id",
sender_name="test_user",
sender_session_id="test_session",
lobby_id="test_lobby",
message="Hello, can you help me?",
timestamp=time.time()
)
print(f"Sending test message: {test_message.message}")
response = await handle_chat_message(test_message, mock_send)
if response:
print(f"✓ Bot responded successfully: {response[:50]}...")
else:
print("✗ Bot did not respond")
# Test bot status
print("\nTesting bot status...")
status = await get_bot_status()
print("✓ Bot status retrieved:")
print(f" - Agent: {status.get('agent_name', 'unknown')}")
print(f" - Features Available: {status.get('features_available', False)}")
print(f" - Configuration: {status.get('configuration', {})}")
return True
except Exception as e:
print(f"✗ Enhanced bot test failed: {e}")
return False
async def test_personality_system():
"""Test the personality system components."""
print("\nTesting Personality System...")
try:
from voicebot.personality_system import personality_manager
# Test listing templates
templates = personality_manager.list_templates()
print(f"✓ Found {len(templates)} personality templates:")
for template in templates:
print(f" - {template.id}: {template.description}")
# Test creating personality from template
personality = personality_manager.create_personality_from_template("helpful_assistant")
if personality:
print(f"✓ Created personality: {personality.name}")
print(f" - Traits: {[trait.value for trait in personality.traits]}")
print(f" - Communication Style: {personality.communication_style.value}")
else:
print("✗ Failed to create personality")
return True
except Exception as e:
print(f"✗ Personality system test failed: {e}")
return False
async def test_conversation_context():
"""Test the conversation context management."""
print("\nTesting Conversation Context...")
try:
from voicebot.conversation_context import context_manager
# Test creating context
context = context_manager.get_or_create_context(
session_id="test_session",
bot_name="test_bot",
conversation_id="test_conversation"
)
if context:
print(f"✓ Created conversation context: {context.conversation_id}")
# Test adding conversation turn
context_manager.add_conversation_turn(
conversation_id=context.conversation_id,
user_message="Test message",
bot_response="Test response",
context_used={"test": "context"},
metadata={"timestamp": time.time()}
)
print("✓ Added conversation turn")
print(f" - Turns in context: {len(context.turns)}")
# Test context summary
summary = context_manager.get_context_for_response(context.conversation_id)
if summary:
print(f"✓ Generated context summary: {summary[:50]}...")
return True
except Exception as e:
print(f"✗ Conversation context test failed: {e}")
return False
async def test_integration_orchestrator():
"""Test the integration orchestrator."""
print("\nTesting Integration Orchestrator...")
try:
from step_5b_integration_demo import enhanced_orchestrator
# Test bot discovery
enhanced_bots = await enhanced_orchestrator.discover_enhanced_bots()
print(f"✓ Discovered {len(enhanced_bots)} bots")
# Find enhanced bots
enhanced_count = sum(1 for bot_info in enhanced_bots.values()
if bot_info.get('enhanced_features', False))
print(f"✓ Found {enhanced_count} enhanced bots")
# Test analytics
analytics = enhanced_orchestrator.get_bot_analytics()
print(f"✓ Analytics: {analytics['enhanced_bots_count']} enhanced bots configured")
return True
except Exception as e:
print(f"✗ Integration orchestrator test failed: {e}")
return False
async def run_all_tests():
"""Run all Step 5B tests."""
print("=== Step 5B Enhanced Bot Management Tests ===\n")
test_results = []
# Run individual tests
test_results.append(await test_enhanced_ai_chatbot())
test_results.append(await test_personality_system())
test_results.append(await test_conversation_context())
test_results.append(await test_integration_orchestrator())
# Summary
passed = sum(test_results)
total = len(test_results)
print(f"\n=== Test Results: {passed}/{total} tests passed ===")
if passed == total:
print("🎉 All Step 5B components are working correctly!")
else:
print("⚠️ Some tests failed - check the output above for details")
return passed == total
if __name__ == "__main__":
asyncio.run(run_all_tests())