Working with streaming!!!

This commit is contained in:
James Ketr 2025-03-30 14:27:24 -07:00
parent 2393c74890
commit 179b126130
5 changed files with 1161 additions and 185 deletions

132
doc/resume/timeline.md Normal file
View File

@ -0,0 +1,132 @@
# Professional Projects
## 1995 - 1998: Intel Intercast Technology
* OS: Microsoft Windows Application, WinTV
* Languages: C++
* Role: Team lead and software architect
* Microsoft Media infrastructure
* Windows kernel driver work
* Worked with internal teams and external companies to expand compatible hardware and integrate with Windows
* Integration of Internet Explorer via COM embedding into the Intercast Viewer
## 1999 - 2024: Linux evangelist
* One of the initial members of Intel's Open Source Technology Center (OTC)
* Worked across Intel organizational boundaries to educate teams on the benefits and working model of the Linux open source ecosystem
* Deep understanding of licensing issues, political dynamics, community goals, and business needs
* Frequent resource for executive management and teams looking to leverage open source software
## 2000 - 2001: COM on Linux Prototype
* Distributed component object model
* Languages: C++, STL, Flex, Yacc, Bison
* Role: Team lead and architect
* Evaluated key performance differences between Microsoft Component Object Model's (COM) IUnknown (QueryInterface, AddRef, Release) vs. the Component Object Request Broker Architecture (CORBA) for both in-process and distributed cross-process and remote communication.
* Developed prototype tool-chain and functional code providing a Linux compatible implementation of COM
## 1998 - 2000: Intel Dot Station
* Languages: Java, C
* Designed and built a "visual lens" Java plugin for Netscape Navigator
* Role: Software architect
## 2000 - 2002: Carrier Grade Linux
* OS distribution work
* Contributed to the Linux System Base specification
* Role: Team lead and software architect working with internal and external collaborators
## 2004 - 2006: Intel Wireless Linux Kernel Driver
* Languages: C
* Authored original ipw2100, ipw2200, and ipw3945 Linux kernel drivers
* Built IEEE 802.11 wireless subsystem
* Hosted Wireless Birds-of-a-Feather talk at the Ottawa Linux Symposium
* Maintained SourceForge web presence, IRC channel, and community
## 2015 - 2018: Robotics
* Languages: C, Python, NodeJS
* "Maker" blogs on developing a Stewart Platform
*
* Image recognition and tracking
* Presented at Embedded Linux Conference
## 2012 - 2017: RT24 - crosswalk
* Chromium based native web application host
* Role: Team lead and software architect
* Worked with WebGL, Web Assembly, Native Client (NaCl)
* Several internal presentations at various corporate events
## 2007 - 2009: Moblin
* Tablet targetting OS distribution
* Role: Team lead and software architect and requirements
* Technology evaluation: Cairo, EFL, GTK, Clutter
* Languages: C, C++, OpenGL
## 2012 - Web Sys Info
* W3C
* Tizen Working Group
## 2007 - 2017: Marblin
* An interactive graphical stress test of rendering contexts
* Ported to each framework being used for OS development
* Originally written in C and using Clutter, ported to WebGL and EFL
## 2009 - 2011: MeeGo
* The merging of Linux Foundation's Moblin with Nokia's Maemo
* Coordinated and worked across business groups at Intel and Nokia
* Role: Team lead and software architect
* Focused on:
* Resolution independent user interfaces
* Multi-touch enabling in X
* Educated teams on the interface paradigm shift to "mobile first"
* Presented at MeeGo Conference
* Languages: C++, QT, HTML5
## Android on Intel
## 2011 - 2013: Tizen
* Rendering framework: Enlightenment Foundation Library (EFL)
* Focused on: API specifications
* Languages: JavaScript, HTML, C
## Robotics
## Quark
## Board Explorer
## Stewart Platform
## Developer Journey
## Product and Team Tracker
## Travel Tool
## Drones
## Security Mitigations
## 2019 - 2024: Intel Graphics Architect
* Technologies: C, JavaScript, HTML5, React, Markdown, bash, GitHub, GitHub Actions, Docker, Clusters, Data Center, Machine Learning, git
* Role:
* Set strategic direction for working with open source ecosystem
* Worked with hardware and software architects to plan, execute, and support features
* Set strategic direction for overhauling the customer experience for Intel graphics on Linux
# Personal Projects
1995 - 2023: Photo Management Software
* Languages: C, JavaScript, PHP, HTML5, CSS, Polymer, React, SQL
* Role: Personal photo management software, including facial recognition
* Image classification, clustering, and identity
2020 - 2025: Eikona Android App
* OS: Android
* Languages: Java, Expo, React
* Role: Maintainer for Android port
2019 - 2023: Peddlers of Ketran
* Languages: JavaScript, React, NodeJS, HTML5, CSS
* Features: Audio, Video, and Text chat. Full game plus expansions.
* Role: Self-hosted online multiplayer clone of Settlers of Catan
2025: Ze-Monitor
* C++ utility leveraging Level Zero API to monitor GPUs
* https://github.com/jketreno/ze-monitor

View File

@ -25,7 +25,9 @@ div {
margin-left: 10px;
box-sizing: border-box;
overflow-x: visible;
min-width: 10rem;
}
.container {
display: flex;
flex-grow: 1;
@ -37,16 +39,15 @@ div {
padding: 1rem;
}
.query-box,
.user-box {
.query-box {
display: flex;
margin-bottom: 20px;
margin: 20px 0;
}
.query-box input,
.user-box input {
.query-box input {
flex-grow: 1;
padding: 8px;
margin-right: 10px;
font-size: 1rem;
}

View File

@ -6,12 +6,12 @@ import rehypeKatex from 'rehype-katex'
import remarkMath from 'remark-math'
import 'katex/dist/katex.min.css' // `rehype-katex` does not import the CSS for you
const welcome_message = "Welcome to Ketr-Chat. I have real-time access to a lot of information. Ask things like 'What are the headlines from cnn.com?' or 'What is the weather in Portland, OR?'";
const welcomeMessage = { "role": "assistant", "content": "Welcome to Ketr-Chat. I have real-time access to a lot of information. Ask things like 'What are the headlines from cnn.com?' or 'What is the weather in Portland, OR?'" };
const loadingMessage = { "role": "assistant", "content": "Instancing chat session..." };
//const url: string = "https://ai.ketrenos.com"
const getConnectionBase = (url: string): string => {
console.log(url);
if (!url.match(/.*battle-linux.*/)) {
return url;
} else {
@ -21,21 +21,22 @@ const getConnectionBase = (url: string): string => {
const Controls = () => {
const tools = ["get_stock_price", "get_weather", "site_summary", "RAG JPK", "RAG LKML"]
return (<div className="Controls">{
const toggleTool = (event: any) => {
};
return (<div className="Controls">
<div>Enabled Tools</div> {
tools.map((tool, index) => {
return (<div key={index}>{tool}</div>);
return (<div key={index}><input type="checkbox" onChange={e => toggleTool(e.target.checked)} />{tool}</div>);
})
}</div>);
}
const App = () => {
const [query, setQuery] = useState('');
const [conversation, setConversation] = useState<MessageList>([{"role": "assistant", "content": "Connecting to server..."}]);
const [conversation, setConversation] = useState<MessageList>([]);
const conversationRef = useRef<any>(null);
const [processing, setProcessing] = useState(false);
const [ws, setWs] = useState<WebSocket | undefined>(undefined);
const [loaded, setLoaded] = useState<boolean>(false);
const [connection, setConnection] = useState<any | undefined>(undefined);
const [sessionId, setSessionId] = useState<string | undefined>(undefined);
const [loc,] = useState<Location>(window.location)
@ -46,6 +47,14 @@ const App = () => {
}
}, [conversation]);
useEffect(() => {
if (sessionId === undefined) {
setConversation([loadingMessage]);
} else {
setConversation([welcomeMessage]);
}
}, [sessionId, setConversation]);
useEffect(() => {
const url = new URL(window.location.href);
const pathParts = url.pathname.split('/').filter(Boolean);
@ -53,7 +62,7 @@ const App = () => {
if (!pathParts.length) {
console.log("No session id -- creating a new session")
fetch(loc.protocol + "//" + getConnectionBase(loc.host) + `/api/context`, {
method: 'CREATE',
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
@ -72,163 +81,6 @@ const App = () => {
}, [setSessionId, loc.host, loc.protocol]);
const onWsOpen = (event: any) => {
console.log(`ws: open`);
/* We do not set the socket as connected until the 'open' message
* comes through */
setConnection(event.target);
/* Request a full history update */
event.target.send(JSON.stringify({
session: sessionId,
type: 'history'
}));
event.target.send(JSON.stringify({
session: sessionId,
type: 'users'
}));
event.target.send(JSON.stringify({
session: sessionId,
type: 'user'
}));
event.target.send(JSON.stringify({
session: sessionId,
type: 'processing'
}));
};
const onWsMessage = (event:any) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'error':
console.error(`App - error`, data.error);
break;
case 'warning':
console.warn(`App - warning`, data.warning);
break;
case 'history':
if (!loaded) {
setLoaded(true);
}
if (data.update.length) {
setConversation(data.update);
} else {
setConversation([{
role: "assistant",
content: welcome_message
}])
}
break;
case 'processing':
console.log(`processing = ${data.value}`)
setProcessing(data.value);
break;
default:
break;
}
};
const resetConnection = useCallback(() => {
let timer: any = 0;
function reset() {
timer = 0;
setConnection(undefined);
};
return (_: any) => {
if (timer) {
clearTimeout(timer);
}
timer = setTimeout(reset, 5000);
};
}, [setConnection]);
const onWsError = (event: any) => {
console.error(`ws: error`, event);
setWs(undefined);
resetConnection();
};
const onWsClose = (event: any) => {
console.warn(`ws: close`);
setWs(undefined);
resetConnection();
};
/* callback refs are used to provide correct state reference
* in the callback handlers, while also preventing rebinding
* of event handlers on every render */
const refWsOpen = useRef(onWsOpen);
useEffect(() => { refWsOpen.current = onWsOpen; });
const refWsMessage = useRef(onWsMessage);
useEffect(() => { refWsMessage.current = onWsMessage; });
const refWsClose = useRef(onWsClose);
useEffect(() => { refWsClose.current = onWsClose; });
const refWsError = useRef(onWsError);
useEffect(() => { refWsError.current = onWsError; });
/* Once a session id is known, create the sole WebSocket connection
* to the backend. This WebSocket is then shared with any component
* that performs game state updates. Those components should
* bind to the 'message:update' WebSocket event and parse
* their update information from those messages
*/
useEffect(() => {
if (!sessionId) {
return;
}
const unbind = () => {
console.log(`App - unbind`);
}
console.log(`App - bind`);
let socket = ws;
if (!socket && !connection) {
let new_uri;
if (loc.protocol === "https:") {
new_uri = "wss://";
} else {
new_uri = "ws://";
}
let host = getConnectionBase(loc.host)
new_uri += host + `/api/ws/${sessionId}`;
console.log(`Attempting WebSocket connection to ${new_uri}`);
socket = new WebSocket(new_uri)
setWs(socket);
setConnection(undefined);
return unbind;
}
if (!socket) {
return unbind;
}
const cbOpen = (e: any) => refWsOpen.current(e);
const cbMessage = (e: any) => refWsMessage.current(e);
const cbClose = (e: any) => refWsClose.current(e);
const cbError = (e: any) => refWsError.current(e);
socket.addEventListener('open', cbOpen);
socket.addEventListener('close', cbClose);
socket.addEventListener('error', cbError);
socket.addEventListener('message', cbMessage);
return () => {
unbind();
if (ws) {
ws.removeEventListener('open', cbOpen);
ws.removeEventListener('close', cbClose);
ws.removeEventListener('error', cbError);
ws.removeEventListener('message', cbMessage);
}
}
}, [setWs, connection, setConnection, sessionId, ws, refWsOpen, refWsMessage, refWsClose, refWsError, loc.host, loc.protocol]);
const handleKeyPress = (event: React.KeyboardEvent<HTMLInputElement>) => {
if (event.key === 'Enter') {
switch (event.currentTarget.id) {
@ -247,6 +99,9 @@ const App = () => {
role: string,
content: string,
user?: string,
type?: string,
id?: string,
isProcessing?: boolean,
metadata?: MessageMetadata
};
@ -255,10 +110,12 @@ const App = () => {
const sendQuery = async () => {
if (!query.trim()) return;
const userMessage = [{ role: 'user', content: query }];
// Add user message to conversation
const newConversation: MessageList = [
...conversation,
{ role: 'user', content: query }
...userMessage
];
setConversation(newConversation);
@ -267,28 +124,127 @@ const App = () => {
try {
setProcessing(true);
// Send query to server
// Create a unique ID for the processing message
const processingId = Date.now().toString();
// Add initial processing message
setConversation(prev => [
...prev,
{ role: 'assistant', content: 'Processing request...', id: processingId, isProcessing: true }
]);
// Make the fetch request with proper headers
const response = await fetch(loc.protocol + "//" + getConnectionBase(loc.host) + `/api/chat/${sessionId}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json',
},
body: JSON.stringify(query.trim()),
body: JSON.stringify({ role: 'user', content: query.trim() }),
});
const data = await response.json();
if (data.error) {
console.error(data);
} else if (data.success) {
console.log(data);
if (!response.ok) {
throw new Error(`Server responded with ${response.status}: ${response.statusText}`);
}
if (!response.body) {
throw new Error('Response body is null');
}
// Set up stream processing with explicit chunking
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
// Debug message to console
console.log('Starting to process stream');
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('Stream complete');
break;
}
// Convert chunk to text and debug
const chunk = decoder.decode(value, { stream: true });
console.log('Received chunk:', chunk);
// Add to buffer and process lines
buffer += chunk;
// Process complete lines
let lines = buffer.split('\n');
buffer = lines.pop() || ''; // Keep incomplete line in buffer
console.log(`Processing ${lines.length} complete lines`);
// Process each complete line immediately
for (const line of lines) {
if (!line.trim()) continue;
try {
console.log('Processing line:', line);
const update = JSON.parse(line);
// Force an immediate state update based on the message type
if (update.status === 'processing') {
// Update processing message with immediate re-render
setConversation(prev => prev.map(msg =>
msg.id === processingId
? { ...msg, content: update.message }
: msg
));
// Add a small delay to ensure React has time to update the UI
await new Promise(resolve => setTimeout(resolve, 0));
} else if (update.status === 'done') {
// Replace processing message with final result
setConversation(prev => [
...prev.filter(msg => msg.id !== processingId),
update.message
]);
} else if (update.status === 'error') {
// Show error
setConversation(prev => [
...prev.filter(msg => msg.id !== processingId),
{ role: 'assistant', type: 'error', content: update.message }
]);
}
} catch (e) {
console.error('Error parsing JSON:', e, line);
}
}
}
// Process any remaining buffer content
if (buffer.trim()) {
try {
console.log('Processing final buffer:', buffer);
const update = JSON.parse(buffer);
if (update.status === 'done') {
setConversation(prev => [
...prev.filter(msg => msg.id !== processingId),
update.message
]);
}
} catch (e) {
console.error('Error parsing final buffer:', e);
}
}
setProcessing(false);
} catch (error) {
console.error('Error:', error);
setConversation([
...newConversation,
{ role: 'assistant', content: 'Error processing your query. Please try again.' }
console.error('Fetch error:', error);
setConversation(prev => [
...prev.filter(msg => !msg.isProcessing),
{ role: 'assistant', type: 'error', content: `Error: ${error}` }
]);
setProcessing(false);
}
};
@ -350,7 +306,7 @@ const App = () => {
<div className="query-box">
<input
disabled={connection ? false : true || processing}
disabled={processing}
type="text"
value={query}
onChange={(e) => setQuery(e.target.value)}

519
src/server.py Normal file
View File

@ -0,0 +1,519 @@
# %%
# Imports [standard]
# Standard library modules (no try-except needed)
import argparse
import asyncio
import anyio
import json
import logging
import os
import queue
import re
import time
from datetime import datetime
import textwrap
import threading
import uuid
import random
def try_import(module_name, pip_name=None):
try:
__import__(module_name)
except ImportError:
print(f"Module '{module_name}' not found. Install it using:")
print(f" pip install {pip_name or module_name}")
# Third-party modules with import checks
try_import('gradio')
try_import('ollama')
try_import('openai')
try_import('pytz')
try_import('requests')
try_import('yfinance', 'yfinance')
try_import('dotenv', 'python-dotenv')
try_import('geopy', 'geopy')
try_import('hyphen', 'PyHyphen')
try_import('bs4', 'beautifulsoup4')
try_import('nltk')
import nltk
from dotenv import load_dotenv
from geopy.geocoders import Nominatim
import gradio as gr
import ollama
import openai
import pytz
import requests
import yfinance as yf
from hyphen import hyphenator
from bs4 import BeautifulSoup
from fastapi import FastAPI, HTTPException, BackgroundTasks, Request
from fastapi.responses import JSONResponse, StreamingResponse, FileResponse, RedirectResponse
from fastapi.middleware.cors import CORSMiddleware
from tools import (
get_weather_by_location,
get_current_datetime,
get_ticker_price,
tools
)
# %%
# Defaults
OLLAMA_API_URL = "http://ollama:11434" # Default Ollama local endpoint
#MODEL_NAME = "deepseek-r1:7b"
MODEL_NAME = "llama3.2"
LOG_LEVEL="debug"
USE_TLS=False
WEB_HOST="0.0.0.0"
WEB_PORT=5000
# %%
# Globals
system_message = f"""
You are a helpful information agent.
You have real time access to any website or URL the user asks about, to stock prices, the current date and time, and current weather information for locations in the United States.
You are running { { 'model': MODEL_NAME, 'gpu': 'Intel Arc B580', 'cpu': 'Intel Core i9-14900KS', 'ram': '64G' } }.
You were launched on {get_current_datetime()}.
If you use any real time access, do not mention your knowledge cutoff.
Give short, courteous answers, no more than 2-3 sentences.
Always be accurate. If you don't know the answer, say so. Do not make up details.
When you receive a response from summarize_site, you must:
1. Review the entire content returned by the second LLM
2. Provide the URL used to obtain the information.
3. Incorporate the information into your response as appropriate
"""
tool_log = []
command_log = []
model = None
client = None
web_server = None
# %%
# Cmd line overrides
def parse_args():
parser = argparse.ArgumentParser(description="AI is Really Cool")
parser.add_argument("--ollama-server", type=str, default=OLLAMA_API_URL, help=f"Ollama API endpoint. default={OLLAMA_API_URL}")
parser.add_argument("--ollama-model", type=str, default=MODEL_NAME, help=f"LLM model to use. default={MODEL_NAME}")
parser.add_argument("--web-host", type=str, default=WEB_HOST, help=f"Host to launch Flask web server. default={WEB_HOST} only if --web-disable not specified.")
parser.add_argument("--web-port", type=str, default=WEB_PORT, help=f"Port to launch Flask web server. default={WEB_PORT} only if --web-disable not specified.")
parser.add_argument('--level', type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
default=LOG_LEVEL, help=f'Set the logging level. default={LOG_LEVEL}')
return parser.parse_args()
def setup_logging(level):
numeric_level = getattr(logging, level.upper(), None)
if not isinstance(numeric_level, int):
raise ValueError(f"Invalid log level: {level}")
logging.basicConfig(level=numeric_level, format='%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s')
logging.info(f"Logging is set to {level} level.")
# %%
def is_words_downloaded():
try:
from nltk.corpus import words
words.words() # Attempt to access the dataset
return True
except LookupError:
return False
if not is_words_downloaded():
logging.info("Downloading nltk words corpus for random nick generation")
nltk.download('words')
# %%
def split_paragraph_with_hyphenation(text, line_length=80, language='en_US'):
"""
Split a paragraph into multiple lines with proper hyphenation.
Args:
text (str): The text to split.
line_length (int): The maximum length of each line.
language (str): The language code for hyphenation rules.
Returns:
[str]: The text split into multiple lines with proper hyphenation.
"""
# Initialize the hyphenator for the specified language
h = hyphenator.Hyphenator(language)
# First attempt: try to wrap without hyphenation
lines = textwrap.wrap(text, width=line_length)
# If any lines are too long, we need to apply hyphenation
result_lines = []
for line in lines:
# If the line is already short enough, keep it as is
if len(line) <= line_length:
result_lines.append(line)
continue
# Otherwise, we need to hyphenate
words = line.split()
current_line = ""
for word in words:
# If adding the word doesn't exceed the limit, add it
if len(current_line) + len(word) + (1 if current_line else 0) <= line_length:
if current_line:
current_line += " "
current_line += word
# If the word itself is too long, hyphenate it
elif len(word) > line_length - len(current_line) - (1 if current_line else 0):
# If we already have content on the line, add it to results
if current_line:
result_lines.append(current_line)
current_line = ""
# Get hyphenation points for the word
hyphenated = h.syllables(word)
if not hyphenated:
# If no hyphenation points found, just add the word to a new line
result_lines.append(word)
continue
# Try to find a suitable hyphenation point
partial_word = ""
for syllable in hyphenated:
if len(partial_word) + len(syllable) + 1 > line_length:
# Add hyphen to the partial word and start a new line
if partial_word:
result_lines.append(partial_word + "-")
partial_word = syllable
else:
# If a single syllable is too long, just add it
result_lines.append(syllable)
else:
partial_word += syllable
# Don't forget the remaining part
if partial_word:
current_line = partial_word
else:
# Start a new line with this word
result_lines.append(current_line)
current_line = word
# Don't forget any remaining content
if current_line:
result_lines.append(current_line)
return result_lines
# %%
async def handle_tool_calls(message):
response = []
tools_used = []
for tool_call in message['tool_calls']:
arguments = tool_call['function']['arguments']
tool = tool_call['function']['name']
match tool:
case 'get_ticker_price':
ticker = arguments.get('ticker')
if not ticker:
ret = None
else:
ret = get_ticker_price(ticker)
tools_used.append(f"{tool}({ticker})")
case 'summarize_site':
url = arguments.get('url');
question = arguments.get('question', 'what is the summary of this content?')
ret = await summarize_site(url, question)
tools_used.append(f"{tool}('{url}', '{question}')")
case 'get_current_datetime':
tz = arguments.get('timezone')
ret = get_current_datetime(tz)
tools_used.append(f"{tool}('{tz}')")
case 'get_weather_by_location':
city = arguments.get('city')
state = arguments.get('state')
ret = get_weather_by_location(city, state)
tools_used.append(f"{tool}('{city}', '{state}')")
case _:
ret = None
response.append({
"role": "tool",
"content": str(ret),
"name": tool_call['function']['name']
})
if len(response) == 1:
return response[0], tools_used
else:
return response, tools_used
# %%
def total_json_length(dict_array):
total = 0
for item in dict_array:
# Convert dictionary to minimized JSON string
json_string = json.dumps(item, separators=(',', ':'))
total += len(json_string)
return total
async def summarize_site(url, question):
"""
Fetches content from a URL, extracts the text, and uses Ollama to summarize it.
Args:
url (str): The URL of the website to summarize
Returns:
str: A summary of the website content
"""
global model, client
try:
# Fetch the webpage
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
logging.info(f"Fetching {url}")
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
logging.info(f"{url} returned. Processing...")
# Parse the HTML
soup = BeautifulSoup(response.text, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.extract()
# Get text content
text = soup.get_text(separator=' ', strip=True)
# Clean up text (remove extra whitespace)
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)
# Limit text length if needed (Ollama may have token limits)
max_chars = 100000
if len(text) > max_chars:
text = text[:max_chars] + "..."
# Create Ollama client
# logging.info(f"Requesting summary of: {text}")
# Generate summary using Ollama
prompt = f"CONTENTS:\n\n{text}\n\n{question}"
response = client.generate(model=model,
system="You are given the contents of {url}. Answer the question about the contents",
prompt=prompt)
#logging.info(response['response'])
return {
'source': 'summarizer-llm',
'content': response['response'],
'metadata': get_current_datetime()
}
except requests.exceptions.RequestException as e:
return f"Error fetching the URL: {str(e)}"
except Exception as e:
return f"Error processing the website content: {str(e)}"
# %%
# %%
def is_valid_uuid(value):
try:
uuid_obj = uuid.UUID(value, version=4)
return str(uuid_obj) == value
except (ValueError, TypeError):
return False
# %%
class WebServer:
def __init__(self, logging, client, model=MODEL_NAME):
self.logging = logging
self.app = FastAPI()
self.contexts = {}
self.client = client
self.model = model
self.processing = False
self.app.add_middleware(
CORSMiddleware,
allow_origins=["http://battle-linux.ketrenos.com:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
self.setup_routes()
def setup_routes(self):
@self.app.get('/')
async def root():
context = self.create_context()
self.logging.info(f"Redirecting non-session to {context['id']}")
return RedirectResponse(url=f"/{context['id']}", status_code=307)
#return JSONResponse({"redirect": f"/{context['id']}"})
@self.app.post('/api/chat/{context_id}')
async def chat_endpoint(context_id: str, request: Request):
context = self.upsert_context(context_id)
data = await request.json()
# Create a custom generator that ensures flushing
async def flush_generator():
async for message in self.chat(context=context, content=data['content']):
# Convert to JSON and add newline
yield json.dumps(message) + "\n"
# Explicitly flush after each yield
await asyncio.sleep(0) # Allow the event loop to process the write
# Return StreamingResponse with appropriate headers
return StreamingResponse(
flush_generator(),
media_type="application/json",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Prevents Nginx buffering if you're using it
}
)
@self.app.post('/api/context')
async def create_context():
context = self.create_context()
self.logging.info(f"Generated new session as {context['id']}")
return JSONResponse(context)
@self.app.get('/api/history/{context_id}')
async def get_history(context_id: str):
context = self.upsert_context(context_id)
return JSONResponse(context["history"])
@self.app.get('/api/tools/{context_id}')
async def get_tools(context_id: str):
context = self.upsert_context(context_id)
return JSONResponse(context["tools"])
@self.app.get('/api/health')
async def health_check():
return JSONResponse({"status": "healthy"})
@self.app.get('/{path:path}')
async def serve_static(path: str):
full_path = os.path.join('/opt/airc/src/ketr-chat/build', path)
if os.path.exists(full_path) and os.path.isfile(full_path):
self.logging.info(f"Serve static request for {full_path}")
return FileResponse(full_path)
self.logging.info(f"Serve index.html for {path}")
return FileResponse('/opt/airc/src/ketr-chat/build/index.html')
def create_context(self):
context_id = str(uuid.uuid4())
context = {
"id": context_id,
"system": [{"role": "system", "content": system_message}],
"history": [],
"tools": []
}
logging.info(f"{context_id} created and added to sessions.")
self.contexts[context_id] = context
return context
def upsert_context(self, context_id):
if context_id in self.contexts:
return self.contexts[context_id]
return self.create_context()
async def chat(self, context, content):
content = content.strip()
if not content:
yield {"status": "error", "message": "Invalid request"}
return
if self.processing:
yield {"status": "error", "message": "Busy"}
return
self.processing = True
try:
history = context["history"]
history.append({"role": "user", "content": content})
messages = context["system"] + history[-1:]
#logging.info(messages)
yield {"status": "processing", "message": "Processing request..."}
# Use the async generator in an async for loop
response = self.client.chat(model=self.model, messages=messages, tools=tools)
tools_used = []
yield {"status": "processing", "message": "Initial response received"}
if 'tool_calls' in response.get('message', {}):
yield {"status": "processing", "message": "Processing tool calls..."}
message = response['message']
tool_result, tools_used = await handle_tool_calls(message)
message_dict = {
'role': message.get('role', 'assistant'),
'content': message.get('content', '')
}
if 'tool_calls' in message:
message_dict['tool_calls'] = [
{'function': {'name': tc['function']['name'], 'arguments': tc['function']['arguments']}}
for tc in message['tool_calls']
]
messages.append(message_dict)
if isinstance(tool_result, list):
messages.extend(tool_result)
else:
messages.append(tool_result)
yield {"status": "processing", "message": "Generating final response..."}
response = self.client.chat(model=self.model, messages=messages, stream=False)
reply = response['message']['content']
if len(tools_used):
final_message = {"role": "assistant", "content": reply, 'metadata': {"title": f"🛠️ Tool(s) used: {','.join(tools_used)}"}}
else:
final_message = {"role": "assistant", "content": reply}
yield {"status": "done", "message": final_message}
except Exception as e:
logging.exception({ 'model': self.model, 'messages': messages, 'error': str(e) })
yield {"status": "error", "message": f"An error occurred: {str(e)}"}
finally:
self.processing = False
def run(self, host='0.0.0.0', port=5000, **kwargs):
import uvicorn
uvicorn.run(self.app, host=host, port=port)
# %%
# Main function to run everything
def main():
global client, model, web_server
# Parse command-line arguments
args = parse_args()
# Setup logging based on the provided level
setup_logging(args.level)
client = ollama.Client(host=args.ollama_server)
model = args.ollama_model
web_server = WebServer(logging, client, model)
logging.info(f"Starting web server at http://{args.web_host}:{args.web_port}")
web_server.run(host=args.web_host, port=args.web_port, use_reloader=False)
# Run the main function using anyio
main()

368
src/tools.py Normal file
View File

@ -0,0 +1,368 @@
# %%
# Imports [standard]
# Standard library modules (no try-except needed)
import argparse
import asyncio
import anyio
import json
import logging
import os
import queue
import re
import time
from datetime import datetime
def try_import(module_name, pip_name=None):
try:
__import__(module_name)
except ImportError:
print(f"Module '{module_name}' not found. Install it using:")
print(f" pip install {pip_name or module_name}")
# Third-party modules with import checks
try_import('gradio')
try_import('ollama')
try_import('openai')
try_import('pydle')
try_import('pytz')
try_import('requests')
try_import('yfinance', 'yfinance')
try_import('dotenv', 'python-dotenv')
try_import('geopy', 'geopy')
from dotenv import load_dotenv
from geopy.geocoders import Nominatim
import gradio as gr
import ollama
import openai
import pydle
import pytz
import requests
import yfinance as yf
# %%
def get_weather_by_location(city, state, country="USA"):
"""
Get weather information from weather.gov based on city, state, and country.
Args:
city (str): City name
state (str): State name or abbreviation
country (str): Country name (defaults to "USA" as weather.gov is for US locations)
Returns:
dict: Weather forecast information
"""
# Step 1: Get coordinates for the location using geocoding
location = f"{city}, {state}, {country}"
coordinates = get_coordinates(location)
if not coordinates:
return {"error": f"Could not find coordinates for {location}"}
# Step 2: Get the forecast grid endpoint for the coordinates
grid_endpoint = get_grid_endpoint(coordinates)
if not grid_endpoint:
return {"error": f"Could not find weather grid for coordinates {coordinates}"}
# Step 3: Get the forecast data from the grid endpoint
forecast = get_forecast(grid_endpoint)
if not forecast['location']:
forecast['location'] = location
return forecast
def get_coordinates(location):
"""Convert a location string to latitude and longitude using Nominatim geocoder."""
try:
# Create a geocoder with a meaningful user agent
geolocator = Nominatim(user_agent="weather_app_example")
# Get the location
location_data = geolocator.geocode(location)
if location_data:
return {
"latitude": location_data.latitude,
"longitude": location_data.longitude
}
else:
print(f"Location not found: {location}")
return None
except Exception as e:
print(f"Error getting coordinates: {e}")
return None
def get_grid_endpoint(coordinates):
"""Get the grid endpoint from weather.gov based on coordinates."""
try:
lat = coordinates["latitude"]
lon = coordinates["longitude"]
# Define headers for the API request
headers = {
"User-Agent": "WeatherAppExample/1.0 (your_email@example.com)",
"Accept": "application/geo+json"
}
# Make the request to get the grid endpoint
url = f"https://api.weather.gov/points/{lat},{lon}"
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
return data["properties"]["forecast"]
else:
print(f"Error getting grid: {response.status_code} - {response.text}")
return None
except Exception as e:
print(f"Error in get_grid_endpoint: {e}")
return None
# Weather related function
def get_forecast(grid_endpoint):
"""Get the forecast data from the grid endpoint."""
try:
# Define headers for the API request
headers = {
"User-Agent": "WeatherAppExample/1.0 (your_email@example.com)",
"Accept": "application/geo+json"
}
# Make the request to get the forecast
response = requests.get(grid_endpoint, headers=headers)
if response.status_code == 200:
data = response.json()
# Extract the relevant forecast information
periods = data["properties"]["periods"]
# Process the forecast data into a simpler format
forecast = {
"location": data["properties"].get("relativeLocation", {}).get("properties", {}),
"updated": data["properties"].get("updated", ""),
"periods": []
}
for period in periods:
forecast["periods"].append({
"name": period.get("name", ""),
"temperature": period.get("temperature", ""),
"temperatureUnit": period.get("temperatureUnit", ""),
"windSpeed": period.get("windSpeed", ""),
"windDirection": period.get("windDirection", ""),
"shortForecast": period.get("shortForecast", ""),
"detailedForecast": period.get("detailedForecast", "")
})
return forecast
else:
print(f"Error getting forecast: {response.status_code} - {response.text}")
return {"error": f"API Error: {response.status_code}"}
except Exception as e:
print(f"Error in get_forecast: {e}")
return {"error": f"Exception: {str(e)}"}
# Example usage
def do_weather():
city = input("Enter city: ")
state = input("Enter state: ")
country = input("Enter country (default USA): ") or "USA"
print(f"Getting weather for {city}, {state}, {country}...")
weather_data = get_weather_by_location(city, state, country)
if "error" in weather_data:
print(f"Error: {weather_data['error']}")
else:
print("\nWeather Forecast:")
print(f"Location: {weather_data.get('location', {}).get('city', city)}, {weather_data.get('location', {}).get('state', state)}")
print(f"Last Updated: {weather_data.get('updated', 'N/A')}")
print("\nForecast Periods:")
for period in weather_data.get("periods", []):
print(f"\n{period['name']}:")
print(f" Temperature: {period['temperature']}{period['temperatureUnit']}")
print(f" Wind: {period['windSpeed']} {period['windDirection']}")
print(f" Forecast: {period['shortForecast']}")
print(f" Details: {period['detailedForecast']}")
# %%
# Stock related function
def get_ticker_price(ticker_symbols):
"""
Look up the current price of a stock using its ticker symbol.
Args:
ticker_symbol (str): The stock ticker symbol (e.g., 'AAPL' for Apple)
Returns:
dict: Current stock information including price
"""
results = []
print(f"get_ticker_price('{ticker_symbols}')")
for ticker_symbol in ticker_symbols.split(','):
ticker_symbol = ticker_symbol.strip()
if ticker_symbol == "":
continue
# Create a Ticker object
try:
ticker = yf.Ticker(ticker_symbol)
print(ticker)
# Get the latest market data
ticker_data = ticker.history(period="1d")
if ticker_data.empty:
results.append({"error": f"No data found for ticker {ticker_symbol}"})
continue
# Get the latest closing price
latest_price = ticker_data['Close'].iloc[-1]
# Get some additional info
info = ticker.info
results.append({ 'symbol': ticker_symbol, 'price': latest_price })
except Exception as e:
results.append({"error": f"Error fetching data for {ticker_symbol}: {str(e)}"})
return results[0] if len(results) == 1 else results
#{
# "symbol": ticker_symbol,
# "price": latest_price,
# "currency": info.get("currency", "Unknown"),
# "company_name": info.get("shortName", "Unknown"),
# "previous_close": info.get("previousClose", "Unknown"),
# "market_cap": info.get("marketCap", "Unknown"),
#}
# %%
def get_current_datetime(timezone="America/Los_Angeles"):
"""
Returns the current date and time in the specified timezone in ISO 8601 format.
Args:
timezone (str): Timezone name (e.g., "UTC", "America/New_York", "Europe/London")
Default is "America/Los_Angeles"
Returns:
str: Current date and time with timezone in the format YYYY-MM-DDTHH:MM:SS+HH:MM
"""
try:
if timezone == 'system' or timezone == '' or not timezone:
timezone = 'America/Los_Angeles'
# Get current UTC time (timezone-aware)
local_tz = pytz.timezone("America/Los_Angeles")
local_now = datetime.now(tz=local_tz)
# Convert to target timezone
target_tz = pytz.timezone(timezone)
target_time = local_now.astimezone(target_tz)
return target_time.isoformat()
except Exception as e:
return {'error': f"Invalid timezone {timezone}: {str(e)}"}
# %%
tools = [ {
"type": "function",
"function": {
"name": "get_ticker_price",
"description": "Get the current stock price of one or more ticker symbols. Returns an array of objects with 'symbol' and 'price' fields. Call this whenever you need to know the latest value of stock ticker symbols, for example when a user asks 'How much is Intel trading at?' or 'What are the prices of AAPL and MSFT?'",
"parameters": {
"type": "object",
"properties": {
"ticker": {
"type": "string",
"description": "The company stock ticker symbol. For multiple tickers, provide a comma-separated list (e.g., 'AAPL,MSFT,GOOGL').",
},
},
"required": ["ticker"],
"additionalProperties": False
}
}
}, {
"type": "function",
"function": {
"name": "summarize_site",
"description": "Requests a second LLM agent to download the requested site and answer a question about the site. For example if the user says 'What are the top headlines on cnn.com?' you would use summarize_site to get the answer.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The website URL to download and process",
},
"question": {
"type": "string",
"description": "The question to ask the second LLM about the content",
},
},
"required": ["url", "question"],
"additionalProperties": False
},
"returns": {
"type": "object",
"properties": {
"source": {
"type": "string",
"description": "Identifier for the source LLM"
},
"content": {
"type": "string",
"description": "The complete response from the second LLM"
},
"metadata": {
"type": "object",
"description": "Additional information about the response"
}
}
}
}
}, {
"type": "function",
"function": {
"name": "get_current_datetime",
"description": "Get the current date and time in a specified timezone",
"parameters": {
"type": "object",
"properties": {
"timezone": {
"type": "string",
"description": "Timezone name (e.g., 'UTC', 'America/New_York', 'Europe/London', 'America/Los_Angeles'). Default is 'America/Los_Angeles'."
}
},
"required": []
}
}
}, {
"type": "function",
"function": {
"name": "get_weather_by_location",
"description": "Get the full weather forecast as structured data for a given CITY and STATE location in the United States. For example, if the user asks 'What is the weather in Portland?' or 'What is the forecast for tomorrow?' use the provided data to answer the question.",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "City to find the weather forecast (e.g., 'Portland', 'Seattle')."
},
"state": {
"type": "string",
"description": "State to find the weather forecast (e.g., 'OR', 'WA')."
}
},
"required": [ "city", "state" ],
"additionalProperties": False
}
}
}]
__all__ = [ 'tools', 'get_current_datetime', 'get_weather_by_location', 'get_ticker_price' ]