backstory/src/utils/tools/basetools.py

527 lines
18 KiB
Python

import os
from pydantic import BaseModel, Field, model_validator # type: ignore
from typing import List, Optional, Generator, ClassVar, Any, Dict
from datetime import datetime
from typing import (
Any,
)
from typing_extensions import Annotated
from bs4 import BeautifulSoup # type: ignore
from geopy.geocoders import Nominatim # type: ignore
import pytz # type: ignore
import requests
import yfinance as yf # type: ignore
import logging
# %%
def WeatherForecast(city, state, country="USA"):
"""
Get weather information from weather.gov based on city, state, and country.
Args:
city (str): City name
state (str): State name or abbreviation
country (str): Country name (defaults to "USA" as weather.gov is for US locations)
Returns:
dict: Weather forecast information
"""
# Step 1: Get coordinates for the location using geocoding
location = f"{city}, {state}, {country}"
coordinates = get_coordinates(location)
if not coordinates:
return {"error": f"Could not find coordinates for {location}"}
# Step 2: Get the forecast grid endpoint for the coordinates
grid_endpoint = get_grid_endpoint(coordinates)
if not grid_endpoint:
return {"error": f"Could not find weather grid for coordinates {coordinates}"}
# Step 3: Get the forecast data from the grid endpoint
forecast = get_forecast(grid_endpoint)
if not forecast["location"]:
forecast["location"] = location
return forecast
def get_coordinates(location):
"""Convert a location string to latitude and longitude using Nominatim geocoder."""
try:
# Create a geocoder with a meaningful user agent
geolocator = Nominatim(user_agent="weather_app_example")
# Get the location
location_data = geolocator.geocode(location)
if location_data:
return {
"latitude": location_data.latitude,
"longitude": location_data.longitude,
}
else:
print(f"Location not found: {location}")
return None
except Exception as e:
print(f"Error getting coordinates: {e}")
return None
def get_grid_endpoint(coordinates):
"""Get the grid endpoint from weather.gov based on coordinates."""
try:
lat = coordinates["latitude"]
lon = coordinates["longitude"]
# Define headers for the API request
headers = {
"User-Agent": "WeatherAppExample/1.0 (your_email@example.com)",
"Accept": "application/geo+json",
}
# Make the request to get the grid endpoint
url = f"https://api.weather.gov/points/{lat},{lon}"
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
return data["properties"]["forecast"]
else:
print(f"Error getting grid: {response.status_code} - {response.text}")
return None
except Exception as e:
print(f"Error in get_grid_endpoint: {e}")
return None
# Weather related function
def get_forecast(grid_endpoint):
"""Get the forecast data from the grid endpoint."""
try:
# Define headers for the API request
headers = {
"User-Agent": "WeatherAppExample/1.0 (your_email@example.com)",
"Accept": "application/geo+json",
}
# Make the request to get the forecast
response = requests.get(grid_endpoint, headers=headers)
if response.status_code == 200:
data = response.json()
# Extract the relevant forecast information
periods = data["properties"]["periods"]
# Process the forecast data into a simpler format
forecast = {
"location": data["properties"]
.get("relativeLocation", {})
.get("properties", {}),
"updated": data["properties"].get("updated", ""),
"periods": [],
}
for period in periods:
forecast["periods"].append(
{
"name": period.get("name", ""),
"temperature": period.get("temperature", ""),
"temperatureUnit": period.get("temperatureUnit", ""),
"windSpeed": period.get("windSpeed", ""),
"windDirection": period.get("windDirection", ""),
"shortForecast": period.get("shortForecast", ""),
"detailedForecast": period.get("detailedForecast", ""),
}
)
return forecast
else:
print(f"Error getting forecast: {response.status_code} - {response.text}")
return {"error": f"API Error: {response.status_code}"}
except Exception as e:
print(f"Error in get_forecast: {e}")
return {"error": f"Exception: {str(e)}"}
# Example usage
# def do_weather():
# city = input("Enter city: ")
# state = input("Enter state: ")
# country = input("Enter country (default USA): ") or "USA"
# print(f"Getting weather for {city}, {state}, {country}...")
# weather_data = WeatherForecast(city, state, country)
# if "error" in weather_data:
# print(f"Error: {weather_data['error']}")
# else:
# print("\nWeather Forecast:")
# print(f"Location: {weather_data.get('location', {}).get('city', city)}, {weather_data.get('location', {}).get('state', state)}")
# print(f"Last Updated: {weather_data.get('updated', 'N/A')}")
# print("\nForecast Periods:")
# for period in weather_data.get("periods", []):
# print(f"\n{period['name']}:")
# print(f" Temperature: {period['temperature']}{period['temperatureUnit']}")
# print(f" Wind: {period['windSpeed']} {period['windDirection']}")
# print(f" Forecast: {period['shortForecast']}")
# print(f" Details: {period['detailedForecast']}")
# %%
def TickerValue(ticker_symbols):
api_key = os.getenv("TWELVEDATA_API_KEY", "")
if not api_key:
return {"error": f"Error fetching data: No API key for TwelveData"}
results = []
for ticker_symbol in ticker_symbols.split(","):
ticker_symbol = ticker_symbol.strip()
if ticker_symbol == "":
continue
url = (
f"https://api.twelvedata.com/price?symbol={ticker_symbol}&apikey={api_key}"
)
response = requests.get(url)
data = response.json()
if "price" in data:
logging.info(f"TwelveData: {ticker_symbol} {data}")
results.append({"symbol": ticker_symbol, "price": float(data["price"])})
else:
logging.error(f"TwelveData: {data}")
results.append({"symbol": ticker_symbol, "price": "Unavailable"})
return results[0] if len(results) == 1 else results
# Stock related function
def yfTickerValue(ticker_symbols):
"""
Look up the current price of a stock using its ticker symbol.
Args:
ticker_symbol (str): The stock ticker symbol (e.g., 'AAPL' for Apple)
Returns:
dict: Current stock information including price
"""
results = []
for ticker_symbol in ticker_symbols.split(","):
ticker_symbol = ticker_symbol.strip()
if ticker_symbol == "":
continue
# Create a Ticker object
try:
logging.info(f"Looking up {ticker_symbol}")
ticker = yf.Ticker(ticker_symbol)
# Get the latest market data
ticker_data = ticker.history(period="1d")
if ticker_data.empty:
results.append({"error": f"No data found for ticker {ticker_symbol}"})
continue
# Get the latest closing price
latest_price = ticker_data["Close"].iloc[-1]
# Get some additional info
results.append({"symbol": ticker_symbol, "price": latest_price})
except Exception as e:
import traceback
logging.error(f"Error fetching data for {ticker_symbol}: {e}")
logging.error(traceback.format_exc())
results.append(
{"error": f"Error fetching data for {ticker_symbol}: {str(e)}"}
)
return results[0] if len(results) == 1 else results
# %%
def DateTime(timezone="America/Los_Angeles"):
"""
Returns the current date and time in the specified timezone in ISO 8601 format.
Args:
timezone (str): Timezone name (e.g., "UTC", "America/New_York", "Europe/London")
Default is "America/Los_Angeles"
Returns:
str: Current date and time with timezone in the format YYYY-MM-DDTHH:MM:SS+HH:MM
"""
try:
if timezone == "system" or timezone == "" or not timezone:
timezone = "America/Los_Angeles"
# Get current UTC time (timezone-aware)
local_tz = pytz.timezone("America/Los_Angeles")
local_now = datetime.now(tz=local_tz)
# Convert to target timezone
target_tz = pytz.timezone(timezone)
target_time = local_now.astimezone(target_tz)
return target_time.isoformat()
except Exception as e:
return {"error": f"Invalid timezone {timezone}: {str(e)}"}
async def GenerateImage(llm, model: str, prompt: str):
return { "image_id": "image-a830a83-bd831" }
async def AnalyzeSite(llm, model: str, url: str, question: str):
"""
Fetches content from a URL, extracts the text, and uses Ollama to summarize it.
Args:
url (str): The URL of the website to summarize
Returns:
str: A summary of the website content
"""
try:
# Fetch the webpage
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
logging.info(f"Fetching {url}")
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
logging.info(f"{url} returned. Processing...")
# Parse the HTML
soup = BeautifulSoup(response.text, "html.parser")
# Remove script and style elements
for script in soup(["script", "style"]):
script.extract()
# Get text content
text = soup.get_text(separator=" ", strip=True)
# Clean up text (remove extra whitespace)
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = " ".join(chunk for chunk in chunks if chunk)
# Limit text length if needed (Ollama may have token limits)
max_chars = 100000
if len(text) > max_chars:
text = text[:max_chars] + "..."
# Create Ollama client
# logging.info(f"Requesting summary of: {text}")
# Generate summary using Ollama
prompt = f"CONTENTS:\n\n{text}\n\n{question}"
response = llm.generate(
model=model,
system="You are given the contents of {url}. Answer the question about the contents",
prompt=prompt,
)
# logging.info(response["response"])
return {
"source": "summarizer-llm",
"content": response["response"],
"metadata": DateTime(),
}
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching the URL: {e}")
return f"Error fetching the URL: {str(e)}"
except Exception as e:
logging.error(f"Error processing the website content: {e}")
return f"Error processing the website content: {str(e)}"
# %%
class Function(BaseModel):
name: str
description: str
parameters: Dict[str, Any]
returns: Optional[Dict[str, Any]] = {}
class Tool(BaseModel):
type: str
function: Function
tools : List[Tool] = [
# Tool.model_validate({
# "type": "function",
# "function": {
# "name": "GenerateImage",
# "description": """\
# CRITICAL INSTRUCTIONS FOR IMAGE GENERATION:
# 1. Call this tool when users request images, drawings, or visual content
# 2. This tool returns an image_id (e.g., "img_abc123")
# 3. MANDATORY: You must respond with EXACTLY this format: <GenerateImage id={image_id}/>
# 4. FORBIDDEN: DO NOT use markdown image syntax ![](url)
# 5. FORBIDDEN: DO NOT create fake URLs or file paths
# 6. FORBIDDEN: DO NOT use any other image embedding format
# CORRECT EXAMPLE:
# User: "Draw a cat"
# Tool returns: {"image_id": "img_xyz789"}
# Your response: "Here's your cat image: <GenerateImage id=img_xyz789/>"
# WRONG EXAMPLES (DO NOT DO THIS):
# - ![](https://example.com/...)
# - ![Cat image](any_url)
# - <img src="...">
# The <GenerateImage id={image_id}/> format is the ONLY way to display images in this system.
# """,
# "parameters": {
# "type": "object",
# "properties": {
# "prompt": {
# "type": "string",
# "description": "Detailed image description including style, colors, subject, composition"
# }
# },
# "required": ["prompt"]
# },
# "returns": {
# "type": "object",
# "properties": {
# "image_id": {
# "type": "string",
# "description": "Unique identifier for the generated image. Use this EXACTLY in <GenerateImage id={this_value}/>"
# }
# }
# }
# }
# }),
Tool.model_validate({
"type": "function",
"function": {
"name": "TickerValue",
"description": "Get the current stock price of one or more ticker symbols. Returns an array of objects with 'symbol' and 'price' fields. Call this whenever you need to know the latest value of stock ticker symbols, for example when a user asks 'How much is Intel trading at?' or 'What are the prices of AAPL and MSFT?'",
"parameters": {
"type": "object",
"properties": {
"ticker": {
"type": "string",
"description": "The company stock ticker symbol. For multiple tickers, provide a comma-separated list (e.g., 'AAPL,MSFT,GOOGL').",
},
},
"required": ["ticker"],
"additionalProperties": False,
},
},
}),
Tool.model_validate({
"type": "function",
"function": {
"name": "AnalyzeSite",
"description": "Downloads the requested site and asks a second LLM agent to answer the question based on the site content. For example if the user says 'What are the top headlines on cnn.com?' you would use AnalyzeSite to get the answer. Only use this if the user asks about a specific site or company.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The website URL to download and process",
},
"question": {
"type": "string",
"description": "The question to ask the second LLM about the content",
},
},
"required": ["url", "question"],
"additionalProperties": False,
},
"returns": {
"type": "object",
"properties": {
"source": {
"type": "string",
"description": "Identifier for the source LLM",
},
"content": {
"type": "string",
"description": "The complete response from the second LLM",
},
"metadata": {
"type": "object",
"description": "Additional information about the response",
},
},
},
},
}),
Tool.model_validate({
"type": "function",
"function": {
"name": "DateTime",
"description": "Get the current date and time in a specified timezone. For example if a user asks 'What time is it in Poland?' you would pass the Warsaw timezone to DateTime.",
"parameters": {
"type": "object",
"properties": {
"timezone": {
"type": "string",
"description": "Timezone name (e.g., 'UTC', 'America/New_York', 'Europe/London', 'America/Los_Angeles'). Default is 'America/Los_Angeles'.",
}
},
"required": [],
},
},
}),
Tool.model_validate({
"type": "function",
"function": {
"name": "WeatherForecast",
"description": "Get the full weather forecast as structured data for a given CITY and STATE location in the United States. For example, if the user asks 'What is the weather in Portland?' or 'What is the forecast for tomorrow?' use the provided data to answer the question.",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "City to find the weather forecast (e.g., 'Portland', 'Seattle').",
"minLength": 2,
},
"state": {
"type": "string",
"description": "State to find the weather forecast (e.g., 'OR', 'WA').",
"minLength": 2,
},
},
"required": ["city", "state"],
"additionalProperties": False,
},
},
}),
]
class ToolEntry(BaseModel):
enabled: bool = True
tool: Tool
def llm_tools(tools: List[ToolEntry]) -> List[Dict[str, Any]]:
return [entry.tool.model_dump(mode='json') for entry in tools if entry.enabled == True]
def all_tools() -> List[ToolEntry]:
return [ToolEntry(tool=tool) for tool in tools]
def enabled_tools(tools: List[ToolEntry]) -> List[ToolEntry]:
return [ToolEntry(tool=entry.tool) for entry in tools if entry.enabled == True]
tool_functions = ["DateTime", "WeatherForecast", "TickerValue", "AnalyzeSite", "GenerateImage"]
__all__ = ["ToolEntry", "all_tools", "llm_tools", "enabled_tools", "tool_functions"]
# __all__.extend(__tool_functions__) # type: ignore