backstory/src/utils/full.py
2025-03-18 13:09:52 -07:00

469 lines
16 KiB
Python

import requests
from typing import List, Dict, Any, Union
import tiktoken
import feedparser
import logging as log
import datetime
from bs4 import BeautifulSoup
import chromadb
import ollama
import re
import numpy as np
from . import chunk
OLLAMA_API_URL = "http://ollama:11434" # Default Ollama local endpoint
#MODEL_NAME = "deepseek-r1:1.5b"
MODEL_NAME = "deepseek-r1:7b"
EMBED_MODEL = "mxbai-embed-large"
PERSIST_DIRECTORY = "/root/.cache/chroma"
client = ollama.Client(host=OLLAMA_API_URL)
def extract_text_from_html_or_xml(content, is_xml=False):
# Parse the content
if is_xml:
soup = BeautifulSoup(content, 'xml') # Use 'xml' parser for XML content
else:
soup = BeautifulSoup(content, 'html.parser') # Default to 'html.parser' for HTML content
# Extract and return just the text
return soup.get_text()
class Feed():
def __init__(self, name, url, poll_limit_min = 30, max_articles=5):
self.name = name
self.url = url
self.poll_limit_min = datetime.timedelta(minutes=poll_limit_min)
self.last_poll = None
self.articles = []
self.max_articles = max_articles
self.update()
def update(self):
now = datetime.datetime.now()
if self.last_poll is None or (now - self.last_poll) >= self.poll_limit_min:
log.info(f"Updating {self.name}")
feed = feedparser.parse(self.url)
self.articles = []
self.last_poll = now
if len(feed.entries) == 0:
return
for i, entry in enumerate(feed.entries[:self.max_articles]):
content = {}
content['source'] = self.name
content['id'] = f"{self.name}{i}"
title = entry.get("title")
if title:
content['title'] = title
link = entry.get("link")
if link:
content['link'] = link
text = entry.get("summary")
if text:
content['text'] = extract_text_from_html_or_xml(text, False)
else:
continue
published = entry.get("published")
if published:
content['published'] = published
self.articles.append(content)
else:
log.info(f"Not updating {self.name} -- {self.poll_limit_min - (now - self.last_poll)}s remain to refresh.")
return self.articles
# News RSS Feeds
rss_feeds = [
Feed(name="IGN.com", url="https://feeds.feedburner.com/ign/games-all"),
Feed(name="BBC World", url="http://feeds.bbci.co.uk/news/world/rss.xml"),
Feed(name="Reuters World", url="http://feeds.reuters.com/Reuters/worldNews"),
Feed(name="Al Jazeera", url="https://www.aljazeera.com/xml/rss/all.xml"),
Feed(name="CNN World", url="http://rss.cnn.com/rss/edition_world.rss"),
Feed(name="Time", url="https://time.com/feed/"),
Feed(name="Euronews", url="https://www.euronews.com/rss"),
# Feed(name="FeedX", url="https://feedx.net/rss/ap.xml")
]
def init_chroma_client(persist_directory: str = PERSIST_DIRECTORY):
"""Initialize and return a ChromaDB client."""
# return chromadb.PersistentClient(path=persist_directory)
return chromadb.Client()
def create_or_get_collection(client, collection_name: str):
"""Create or get a ChromaDB collection."""
try:
return client.get_collection(
name=collection_name
)
except:
return client.create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
def process_documents_to_chroma(
documents: List[Dict[str, Any]],
collection_name: str = "document_collection",
text_key: str = "text",
max_tokens: int = 512,
overlap: int = 50,
model: str = EMBED_MODEL,
persist_directory: str = PERSIST_DIRECTORY
):
"""
Process documents, chunk them, compute embeddings, and store in ChromaDB.
Args:
documents: List of document dictionaries
collection_name: Name for the ChromaDB collection
text_key: The key containing text content
max_tokens: Maximum tokens per chunk
overlap: Token overlap between chunks
model: Ollama model for embeddings
persist_directory: Directory to store ChromaDB data
"""
# Initialize ChromaDB client and collection
db = init_chroma_client(persist_directory)
collection = create_or_get_collection(db, collection_name)
# Process each document
for doc in documents:
# Chunk the document
doc_chunks = chunk_document(doc, text_key, max_tokens, overlap)
# Prepare data for ChromaDB
ids = []
texts = []
metadatas = []
embeddings = []
for chunk in doc_chunks:
# Create a unique ID for the chunk
chunk_id = f"{chunk['id']}_{chunk['chunk_id']}"
# Extract text
text = chunk[text_key]
# Create metadata (excluding text and embedding to avoid duplication)
metadata = {k: v for k, v in chunk.items() if k != text_key and k != "embedding"}
response = client.embed(model=model, input=text)
embedding = response["embeddings"][0]
ids.append(chunk_id)
texts.append(text)
metadatas.append(metadata)
embeddings.append(embedding)
# Add chunks to ChromaDB collection
collection.add(
ids=ids,
documents=texts,
embeddings=embeddings,
metadatas=metadatas
)
return collection
def query_chroma(
query_text: str,
collection_name: str = "document_collection",
n_results: int = 5,
model: str = EMBED_MODEL,
persist_directory: str = PERSIST_DIRECTORY
):
"""
Query ChromaDB for similar documents.
Args:
query_text: The text to search for
collection_name: Name of the ChromaDB collection
n_results: Number of results to return
model: Ollama model for embedding the query
persist_directory: Directory where ChromaDB data is stored
Returns:
Query results from ChromaDB
"""
# Initialize ChromaDB client and collection
db = init_chroma_client(persist_directory)
collection = create_or_get_collection(db, collection_name)
query_response = client.embed(model=model, input=query_text)
query_embeddings = query_response["embeddings"]
# Query the collection
results = collection.query(
query_embeddings=query_embeddings,
n_results=n_results
)
return results
def print_top_match(query_results, index=0, documents=None):
"""
Print detailed information about the top matching document,
including the full original document content.
Args:
query_results: Results from ChromaDB query
documents: Original documents dictionary to look up full content (optional)
"""
if not query_results or not query_results["ids"] or len(query_results["ids"][0]) == 0:
print("No matching documents found.")
return
# Get the top result
top_id = query_results["ids"][0][index]
top_document_chunk = query_results["documents"][0][index]
top_metadata = query_results["metadatas"][0][index]
top_distance = query_results["distances"][0][index]
print("="*50)
print("MATCHING DOCUMENT")
print("="*50)
print(f"Chunk ID: {top_id}")
print(f"Similarity Score: {top_distance:.4f}") # Convert distance to similarity
print("\nCHUNK METADATA:")
for key, value in top_metadata.items():
print(f" {key}: {value}")
print("\nMATCHING CHUNK CONTENT:")
print(top_document_chunk[:500].strip() + ("..." if len(top_document_chunk) > 500 else ""))
# Extract the original document ID from the chunk ID
# Chunk IDs are in format "doc_id_chunk_num"
original_doc_id = top_id.split('_')[0]
def get_top_match(query_results, index=0, documents=None):
top_id = query_results["ids"][index][0]
# Extract the original document ID from the chunk ID
# Chunk IDs are in format "doc_id_chunk_num"
original_doc_id = top_id.split('_')[0]
# Return the full document for further processing if needed
if documents is not None:
return next((doc for doc in documents if doc["id"] == original_doc_id), None)
return None
def show_documents(documents=None):
if not documents:
return
# Print the top matching document
for i, doc in enumerate(documents):
print(f"Document {i+1}:")
print(f" Title: {doc['title']}")
print(f" Text: {doc['text'][:100]}...")
print()
def show_headlines(documents=None):
if not documents:
return
# Print the top matching document
for doc in documents:
print(f"{doc['source']}: {doc['title']}")
def show_help():
print("""help>
docs Show RAG docs
full Show last full top match
headlines Show the RAG headlines
prompt Show the last prompt
response Show the last response
scores Show last RAG scores
why|think Show last response's <think>
context|match Show RAG match info to last prompt
""")
# Example usage
if __name__ == "__main__":
documents = []
for feed in rss_feeds:
documents.extend(feed.articles)
show_documents(documents=documents)
# Process documents and store in ChromaDB
collection = process_documents_to_chroma(
documents=documents,
collection_name="research_papers",
max_tokens=256,
overlap=25,
model=EMBED_MODEL,
persist_directory="/root/.cache/chroma"
)
last_results = None
last_prompt = None
last_system = None
last_response = None
last_why = None
last_messages = []
while True:
try:
search_query = input("> ").strip()
except KeyboardInterrupt as e:
print("\nExiting.")
break
if search_query == "exit" or search_query == "quit":
print("\nExiting.")
break
if search_query == "docs":
show_documents(documents)
continue
if search_query == "prompt":
if last_prompt:
print(f"""last prompt>
{"="*10}system{"="*10}
{last_system}
{"="*10}prompt{"="*10}
{last_prompt}""")
else:
print(f"No prompts yet")
continue
if search_query == "response":
if last_response:
print(f"""last response>
{"="*10}response{"="*10}
{last_response}""")
else:
print(f"No responses yet")
continue
if search_query == "" or search_query == "help":
show_help()
continue
if search_query == "headlines":
show_headlines(documents)
continue
if search_query == "match" or search_query == "context":
if last_results:
print_top_match(last_results, documents=documents)
else:
print("No match to give info on")
continue
if search_query == "why" or search_query == "think":
if last_why:
print(f"""
why>
{last_why}
""")
else:
print("No processed prompts")
continue
if search_query == "scores":
if last_results:
for i, _ in enumerate(last_results):
print_top_match(last_results, documents=documents, index=i)
else:
print("No match to give info on")
continue
if search_query == "full":
if last_results:
full = get_top_match(last_results, documents=documents)
if full:
print(f"""Context:
Source: {full["source"]}
Title: {full["title"]}
Link: {full["link"]}
Distance: {last_results.get("distances", [[0]])[0][0]}
Full text:
{full["text"]}""")
else:
print("No match to give info on")
continue
# Query ChromaDB
results = query_chroma(
query_text=search_query,
collection_name="research_papers",
n_results=10
)
last_results = results
full = get_top_match(results, documents=documents)
headlines = ""
for doc in documents:
headlines += f"{doc['source']}: {doc['title']}\n"
system=f"""
You are the assistant. Your name is airc. This application is called airc (pronounced Eric).
Information about the author of this program and the AI model it uses:
* James wrote the python application called airc that is driving this RAG model on top of {MODEL_NAME} using {EMBED_MODEL} and chromadb for vector embedding. Link https://github.com/jketreno/airc.
* James Ketrenos is a software engineer with a history in all levels of the computer stack, from the kernel to full-stack web applications. He dabbles in AI/ML and is familiar with pytorch and ollama.
* James Ketrenos deployed this application locally on an Intel Arc B580 (battlemage) computer using Intel's ipex-llm.
* For Intel GPU metrics, James Ketrenos wrote the "ze-monitor" utility in C++. ze-monitor provides Intel GPU telemetry data for Intel client GPU devices, similar to xpu-smi. Link https://github.com/jketreno/ze-monitor. airc uses ze-monitor.
* James lives in Portland, Oregon and has three kids. Two are attending Oregon State University and one is attending Williamette University.
* airc provides an IRC chat bot as well as a React web frontend available at https://airc.ketrenos.com
You must follow these rules:
* Provide short (less than 100 character) responses.
* Provide a single response.
* Do not prefix it with a word like 'Answer'.
* For information about the AI running this system, include information about author, including links.
* For information relevant to the current events in the <input></input> tags, use that information and state the source when information comes from.
"""
context = "Information related to current events\n<input>=["
for doc in documents:
item = {'source':doc["source"],'article':{'title':doc["title"],'link':doc["link"],'text':doc["text"]}}
context += f"{item}"
context += "\n</input>"
prompt = f"{search_query}"
last_prompt = prompt
last_system = system # cache it before news context is added
system = f"{system}{context}"
if len(last_messages) != 0:
message_context = f"{last_messages}"
prompt = f"{message_context}{prompt}"
print(f"system len: {len(system)}")
print(f"prompt len: {len(prompt)}")
output = client.generate(
model=MODEL_NAME,
system=system,
prompt=prompt,
stream=False,
options={ 'num_ctx': 100000 }
)
# Prune off the <think>...</think>
matches = re.match(r'^<think>(.*?)</think>(.*)$', output['response'], flags=re.DOTALL)
if matches:
last_why = matches[1].strip()
content = matches[2].strip()
else:
print(f"[garbled] response>\n{output['response']}")
print(f"Response>\n{content}")
last_response = content
last_messages.extend(({
'role': 'user',
'name': 'james',
'message': search_query
}, {
'role': 'assistant',
'message': content
}))
last_messages = last_messages[:10]