"""Simple chatbot agent that demonstrates chat message handling. This bot shows how to create an agent that primarily uses chat functionality rather than media streams. """ from typing import Dict, Optional, Callable, Awaitable import time import random from logger import logger from aiortc import MediaStreamTrack # Import shared models for chat functionality import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from shared.models import ChatMessageModel AGENT_NAME = "chatbot" AGENT_DESCRIPTION = "Simple chatbot that responds to chat messages" # Simple response database RESPONSES = { "hello": ["Hello!", "Hi there!", "Hey!", "Greetings!"], "how are you": ["I'm doing well, thank you!", "Great, thanks for asking!", "I'm fine!"], "goodbye": ["Goodbye!", "See you later!", "Bye!", "Take care!"], "help": ["I can respond to simple greetings and questions. Try saying hello, asking how I am, or say goodbye!"], "time": ["Let me check... it's currently {time}"], "joke": [ "Why don't scientists trust atoms? Because they make up everything!", "I told my wife she was drawing her eyebrows too high. She seemed surprised.", "What do you call a fish wearing a crown? A king fish!", "Why don't eggs tell jokes? They'd crack each other up!" ] } def agent_info() -> Dict[str, str]: return {"name": AGENT_NAME, "description": AGENT_DESCRIPTION} def create_agent_tracks(session_name: str) -> dict[str, MediaStreamTrack]: """Chatbot doesn't provide media tracks - it's chat-only.""" return {} async def handle_chat_message(chat_message: ChatMessageModel, send_message_func: Callable[[str], Awaitable[None]]) -> Optional[str]: """Handle incoming chat messages and provide responses. Args: chat_message: The received chat message send_message_func: Function to send messages back to the lobby Returns: Optional response message to send back to the lobby """ message_lower = chat_message.message.lower().strip() sender = chat_message.sender_name logger.info(f"Chatbot received message from {sender}: {chat_message.message}") # Skip messages from ourselves if sender.lower() == AGENT_NAME.lower(): return None # Look for keywords in the message for keyword, responses in RESPONSES.items(): if keyword in message_lower: response = random.choice(responses) # Handle special formatting if "{time}" in response: current_time = time.strftime("%Y-%m-%d %H:%M:%S") response = response.format(time=current_time) logger.info(f"Chatbot responding with: {response}") return response # If we get a direct mention or question, provide a generic response if any(word in message_lower for word in ["bot", "chatbot", "?"]): responses = [ f"Hi {sender}! I'm a simple chatbot. Say 'help' to see what I can do!", f"Hello {sender}! I heard you mention me. How can I help?", "I'm here and listening! Try asking me about the time or tell me a greeting!" ] return random.choice(responses) # Default: don't respond to unrecognized messages return None