469 lines
16 KiB
Python
469 lines
16 KiB
Python
import requests
|
|
from typing import List, Dict, Any, Union
|
|
import tiktoken
|
|
import feedparser
|
|
import logging as log
|
|
import datetime
|
|
from bs4 import BeautifulSoup
|
|
import chromadb
|
|
import ollama
|
|
import re
|
|
import numpy as np
|
|
from . import chunk
|
|
|
|
OLLAMA_API_URL = "http://ollama:11434" # Default Ollama local endpoint
|
|
#MODEL_NAME = "deepseek-r1:1.5b"
|
|
MODEL_NAME = "deepseek-r1:7b"
|
|
EMBED_MODEL = "mxbai-embed-large"
|
|
PERSIST_DIRECTORY = "/root/.cache/chroma"
|
|
|
|
client = ollama.Client(host=OLLAMA_API_URL)
|
|
|
|
def extract_text_from_html_or_xml(content, is_xml=False):
|
|
# Parse the content
|
|
if is_xml:
|
|
soup = BeautifulSoup(content, 'xml') # Use 'xml' parser for XML content
|
|
else:
|
|
soup = BeautifulSoup(content, 'html.parser') # Default to 'html.parser' for HTML content
|
|
|
|
# Extract and return just the text
|
|
return soup.get_text()
|
|
|
|
class Feed():
|
|
def __init__(self, name, url, poll_limit_min = 30, max_articles=5):
|
|
self.name = name
|
|
self.url = url
|
|
self.poll_limit_min = datetime.timedelta(minutes=poll_limit_min)
|
|
self.last_poll = None
|
|
self.articles = []
|
|
self.max_articles = max_articles
|
|
self.update()
|
|
|
|
def update(self):
|
|
now = datetime.datetime.now()
|
|
if self.last_poll is None or (now - self.last_poll) >= self.poll_limit_min:
|
|
log.info(f"Updating {self.name}")
|
|
feed = feedparser.parse(self.url)
|
|
self.articles = []
|
|
self.last_poll = now
|
|
|
|
if len(feed.entries) == 0:
|
|
return
|
|
|
|
for i, entry in enumerate(feed.entries[:self.max_articles]):
|
|
content = {}
|
|
content['source'] = self.name
|
|
content['id'] = f"{self.name}{i}"
|
|
title = entry.get("title")
|
|
if title:
|
|
content['title'] = title
|
|
link = entry.get("link")
|
|
if link:
|
|
content['link'] = link
|
|
text = entry.get("summary")
|
|
if text:
|
|
content['text'] = extract_text_from_html_or_xml(text, False)
|
|
else:
|
|
continue
|
|
published = entry.get("published")
|
|
if published:
|
|
content['published'] = published
|
|
|
|
self.articles.append(content)
|
|
else:
|
|
log.info(f"Not updating {self.name} -- {self.poll_limit_min - (now - self.last_poll)}s remain to refresh.")
|
|
return self.articles
|
|
|
|
# News RSS Feeds
|
|
rss_feeds = [
|
|
Feed(name="IGN.com", url="https://feeds.feedburner.com/ign/games-all"),
|
|
Feed(name="BBC World", url="http://feeds.bbci.co.uk/news/world/rss.xml"),
|
|
Feed(name="Reuters World", url="http://feeds.reuters.com/Reuters/worldNews"),
|
|
Feed(name="Al Jazeera", url="https://www.aljazeera.com/xml/rss/all.xml"),
|
|
Feed(name="CNN World", url="http://rss.cnn.com/rss/edition_world.rss"),
|
|
Feed(name="Time", url="https://time.com/feed/"),
|
|
Feed(name="Euronews", url="https://www.euronews.com/rss"),
|
|
# Feed(name="FeedX", url="https://feedx.net/rss/ap.xml")
|
|
]
|
|
|
|
|
|
def init_chroma_client(persist_directory: str = PERSIST_DIRECTORY):
|
|
"""Initialize and return a ChromaDB client."""
|
|
# return chromadb.PersistentClient(path=persist_directory)
|
|
return chromadb.Client()
|
|
|
|
def create_or_get_collection(client, collection_name: str):
|
|
"""Create or get a ChromaDB collection."""
|
|
try:
|
|
return client.get_collection(
|
|
name=collection_name
|
|
)
|
|
except:
|
|
return client.create_collection(
|
|
name=collection_name,
|
|
metadata={"hnsw:space": "cosine"}
|
|
)
|
|
|
|
def process_documents_to_chroma(
|
|
documents: List[Dict[str, Any]],
|
|
collection_name: str = "document_collection",
|
|
text_key: str = "text",
|
|
max_tokens: int = 512,
|
|
overlap: int = 50,
|
|
model: str = EMBED_MODEL,
|
|
persist_directory: str = PERSIST_DIRECTORY
|
|
):
|
|
"""
|
|
Process documents, chunk them, compute embeddings, and store in ChromaDB.
|
|
|
|
Args:
|
|
documents: List of document dictionaries
|
|
collection_name: Name for the ChromaDB collection
|
|
text_key: The key containing text content
|
|
max_tokens: Maximum tokens per chunk
|
|
overlap: Token overlap between chunks
|
|
model: Ollama model for embeddings
|
|
persist_directory: Directory to store ChromaDB data
|
|
"""
|
|
# Initialize ChromaDB client and collection
|
|
db = init_chroma_client(persist_directory)
|
|
collection = create_or_get_collection(db, collection_name)
|
|
|
|
# Process each document
|
|
for doc in documents:
|
|
# Chunk the document
|
|
doc_chunks = chunk_document(doc, text_key, max_tokens, overlap)
|
|
|
|
# Prepare data for ChromaDB
|
|
ids = []
|
|
texts = []
|
|
metadatas = []
|
|
embeddings = []
|
|
|
|
for chunk in doc_chunks:
|
|
# Create a unique ID for the chunk
|
|
chunk_id = f"{chunk['id']}_{chunk['chunk_id']}"
|
|
|
|
# Extract text
|
|
text = chunk[text_key]
|
|
|
|
# Create metadata (excluding text and embedding to avoid duplication)
|
|
metadata = {k: v for k, v in chunk.items() if k != text_key and k != "embedding"}
|
|
|
|
response = client.embed(model=model, input=text)
|
|
embedding = response["embeddings"][0]
|
|
ids.append(chunk_id)
|
|
texts.append(text)
|
|
metadatas.append(metadata)
|
|
embeddings.append(embedding)
|
|
|
|
# Add chunks to ChromaDB collection
|
|
collection.add(
|
|
ids=ids,
|
|
documents=texts,
|
|
embeddings=embeddings,
|
|
metadatas=metadatas
|
|
)
|
|
|
|
return collection
|
|
|
|
def query_chroma(
|
|
query_text: str,
|
|
collection_name: str = "document_collection",
|
|
n_results: int = 5,
|
|
model: str = EMBED_MODEL,
|
|
persist_directory: str = PERSIST_DIRECTORY
|
|
):
|
|
"""
|
|
Query ChromaDB for similar documents.
|
|
|
|
Args:
|
|
query_text: The text to search for
|
|
collection_name: Name of the ChromaDB collection
|
|
n_results: Number of results to return
|
|
model: Ollama model for embedding the query
|
|
persist_directory: Directory where ChromaDB data is stored
|
|
|
|
Returns:
|
|
Query results from ChromaDB
|
|
"""
|
|
# Initialize ChromaDB client and collection
|
|
db = init_chroma_client(persist_directory)
|
|
collection = create_or_get_collection(db, collection_name)
|
|
|
|
query_response = client.embed(model=model, input=query_text)
|
|
query_embeddings = query_response["embeddings"]
|
|
|
|
# Query the collection
|
|
results = collection.query(
|
|
query_embeddings=query_embeddings,
|
|
n_results=n_results
|
|
)
|
|
|
|
return results
|
|
|
|
def print_top_match(query_results, index=0, documents=None):
|
|
"""
|
|
Print detailed information about the top matching document,
|
|
including the full original document content.
|
|
|
|
Args:
|
|
query_results: Results from ChromaDB query
|
|
documents: Original documents dictionary to look up full content (optional)
|
|
"""
|
|
if not query_results or not query_results["ids"] or len(query_results["ids"][0]) == 0:
|
|
print("No matching documents found.")
|
|
return
|
|
|
|
# Get the top result
|
|
top_id = query_results["ids"][0][index]
|
|
top_document_chunk = query_results["documents"][0][index]
|
|
top_metadata = query_results["metadatas"][0][index]
|
|
top_distance = query_results["distances"][0][index]
|
|
|
|
print("="*50)
|
|
print("MATCHING DOCUMENT")
|
|
print("="*50)
|
|
print(f"Chunk ID: {top_id}")
|
|
print(f"Similarity Score: {top_distance:.4f}") # Convert distance to similarity
|
|
|
|
print("\nCHUNK METADATA:")
|
|
for key, value in top_metadata.items():
|
|
print(f" {key}: {value}")
|
|
|
|
print("\nMATCHING CHUNK CONTENT:")
|
|
print(top_document_chunk[:500].strip() + ("..." if len(top_document_chunk) > 500 else ""))
|
|
|
|
# Extract the original document ID from the chunk ID
|
|
# Chunk IDs are in format "doc_id_chunk_num"
|
|
original_doc_id = top_id.split('_')[0]
|
|
|
|
def get_top_match(query_results, index=0, documents=None):
|
|
top_id = query_results["ids"][index][0]
|
|
# Extract the original document ID from the chunk ID
|
|
# Chunk IDs are in format "doc_id_chunk_num"
|
|
original_doc_id = top_id.split('_')[0]
|
|
|
|
# Return the full document for further processing if needed
|
|
if documents is not None:
|
|
return next((doc for doc in documents if doc["id"] == original_doc_id), None)
|
|
|
|
return None
|
|
|
|
def show_documents(documents=None):
|
|
if not documents:
|
|
return
|
|
|
|
# Print the top matching document
|
|
for i, doc in enumerate(documents):
|
|
print(f"Document {i+1}:")
|
|
print(f" Title: {doc['title']}")
|
|
print(f" Text: {doc['text'][:100]}...")
|
|
print()
|
|
|
|
def show_headlines(documents=None):
|
|
if not documents:
|
|
return
|
|
|
|
# Print the top matching document
|
|
for doc in documents:
|
|
print(f"{doc['source']}: {doc['title']}")
|
|
|
|
def show_help():
|
|
print("""help>
|
|
docs Show RAG docs
|
|
full Show last full top match
|
|
headlines Show the RAG headlines
|
|
prompt Show the last prompt
|
|
response Show the last response
|
|
scores Show last RAG scores
|
|
why|think Show last response's <think>
|
|
context|match Show RAG match info to last prompt
|
|
""")
|
|
|
|
|
|
# Example usage
|
|
if __name__ == "__main__":
|
|
documents = []
|
|
for feed in rss_feeds:
|
|
documents.extend(feed.articles)
|
|
|
|
show_documents(documents=documents)
|
|
|
|
# Process documents and store in ChromaDB
|
|
collection = process_documents_to_chroma(
|
|
documents=documents,
|
|
collection_name="research_papers",
|
|
max_tokens=256,
|
|
overlap=25,
|
|
model=EMBED_MODEL,
|
|
persist_directory="/root/.cache/chroma"
|
|
)
|
|
|
|
last_results = None
|
|
last_prompt = None
|
|
last_system = None
|
|
last_response = None
|
|
last_why = None
|
|
last_messages = []
|
|
while True:
|
|
try:
|
|
search_query = input("> ").strip()
|
|
except KeyboardInterrupt as e:
|
|
print("\nExiting.")
|
|
break
|
|
|
|
if search_query == "exit" or search_query == "quit":
|
|
print("\nExiting.")
|
|
break
|
|
|
|
if search_query == "docs":
|
|
show_documents(documents)
|
|
continue
|
|
|
|
if search_query == "prompt":
|
|
if last_prompt:
|
|
print(f"""last prompt>
|
|
{"="*10}system{"="*10}
|
|
{last_system}
|
|
{"="*10}prompt{"="*10}
|
|
{last_prompt}""")
|
|
else:
|
|
print(f"No prompts yet")
|
|
continue
|
|
|
|
if search_query == "response":
|
|
if last_response:
|
|
print(f"""last response>
|
|
{"="*10}response{"="*10}
|
|
{last_response}""")
|
|
else:
|
|
print(f"No responses yet")
|
|
continue
|
|
|
|
if search_query == "" or search_query == "help":
|
|
show_help()
|
|
continue
|
|
|
|
if search_query == "headlines":
|
|
show_headlines(documents)
|
|
continue
|
|
|
|
if search_query == "match" or search_query == "context":
|
|
if last_results:
|
|
print_top_match(last_results, documents=documents)
|
|
else:
|
|
print("No match to give info on")
|
|
continue
|
|
|
|
if search_query == "why" or search_query == "think":
|
|
if last_why:
|
|
print(f"""
|
|
why>
|
|
{last_why}
|
|
""")
|
|
else:
|
|
print("No processed prompts")
|
|
continue
|
|
|
|
if search_query == "scores":
|
|
if last_results:
|
|
for i, _ in enumerate(last_results):
|
|
print_top_match(last_results, documents=documents, index=i)
|
|
else:
|
|
print("No match to give info on")
|
|
continue
|
|
|
|
if search_query == "full":
|
|
if last_results:
|
|
full = get_top_match(last_results, documents=documents)
|
|
if full:
|
|
print(f"""Context:
|
|
Source: {full["source"]}
|
|
Title: {full["title"]}
|
|
Link: {full["link"]}
|
|
Distance: {last_results.get("distances", [[0]])[0][0]}
|
|
Full text:
|
|
{full["text"]}""")
|
|
else:
|
|
print("No match to give info on")
|
|
continue
|
|
|
|
# Query ChromaDB
|
|
results = query_chroma(
|
|
query_text=search_query,
|
|
collection_name="research_papers",
|
|
n_results=10
|
|
)
|
|
last_results = results
|
|
|
|
full = get_top_match(results, documents=documents)
|
|
|
|
headlines = ""
|
|
for doc in documents:
|
|
headlines += f"{doc['source']}: {doc['title']}\n"
|
|
|
|
system=f"""
|
|
You are the assistant. Your name is airc. This application is called airc (pronounced Eric).
|
|
|
|
Information about the author of this program and the AI model it uses:
|
|
|
|
* James wrote the python application called airc that is driving this RAG model on top of {MODEL_NAME} using {EMBED_MODEL} and chromadb for vector embedding. Link https://github.com/jketreno/airc.
|
|
* James Ketrenos is a software engineer with a history in all levels of the computer stack, from the kernel to full-stack web applications. He dabbles in AI/ML and is familiar with pytorch and ollama.
|
|
* James Ketrenos deployed this application locally on an Intel Arc B580 (battlemage) computer using Intel's ipex-llm.
|
|
* For Intel GPU metrics, James Ketrenos wrote the "ze-monitor" utility in C++. ze-monitor provides Intel GPU telemetry data for Intel client GPU devices, similar to xpu-smi. Link https://github.com/jketreno/ze-monitor. airc uses ze-monitor.
|
|
* James lives in Portland, Oregon and has three kids. Two are attending Oregon State University and one is attending Williamette University.
|
|
* airc provides an IRC chat bot as well as a React web frontend available at https://airc.ketrenos.com
|
|
|
|
You must follow these rules:
|
|
|
|
* Provide short (less than 100 character) responses.
|
|
* Provide a single response.
|
|
* Do not prefix it with a word like 'Answer'.
|
|
* For information about the AI running this system, include information about author, including links.
|
|
* For information relevant to the current events in the <input></input> tags, use that information and state the source when information comes from.
|
|
|
|
"""
|
|
context = "Information related to current events\n<input>=["
|
|
for doc in documents:
|
|
item = {'source':doc["source"],'article':{'title':doc["title"],'link':doc["link"],'text':doc["text"]}}
|
|
context += f"{item}"
|
|
context += "\n</input>"
|
|
|
|
prompt = f"{search_query}"
|
|
last_prompt = prompt
|
|
last_system = system # cache it before news context is added
|
|
system = f"{system}{context}"
|
|
if len(last_messages) != 0:
|
|
message_context = f"{last_messages}"
|
|
prompt = f"{message_context}{prompt}"
|
|
|
|
print(f"system len: {len(system)}")
|
|
print(f"prompt len: {len(prompt)}")
|
|
output = client.generate(
|
|
model=MODEL_NAME,
|
|
system=system,
|
|
prompt=prompt,
|
|
stream=False,
|
|
options={ 'num_ctx': 100000 }
|
|
)
|
|
# Prune off the <think>...</think>
|
|
matches = re.match(r'^<think>(.*?)</think>(.*)$', output['response'], flags=re.DOTALL)
|
|
if matches:
|
|
last_why = matches[1].strip()
|
|
content = matches[2].strip()
|
|
else:
|
|
print(f"[garbled] response>\n{output['response']}")
|
|
print(f"Response>\n{content}")
|
|
|
|
last_response = content
|
|
last_messages.extend(({
|
|
'role': 'user',
|
|
'name': 'james',
|
|
'message': search_query
|
|
}, {
|
|
'role': 'assistant',
|
|
'message': content
|
|
}))
|
|
last_messages = last_messages[:10]
|