# core.py - Core RAG functionality import re from typing import List, Dict, Any, Union, Optional, Tuple import ollama from utils import defines, Feed, chunk_document, query_chroma, process_documents_to_chroma class RagCore: """Core RAG functionality that can be used by different interfaces""" def __init__(self): self.client = ollama.Client(host=defines.ollama_api_url) self.collection_name = "research_papers" # News RSS Feeds self.rss_feeds = [ Feed(name="IGN.com", url="https://feeds.feedburner.com/ign/games-all"), Feed(name="BBC World", url="http://feeds.bbci.co.uk/news/world/rss.xml"), Feed(name="Reuters World", url="http://feeds.reuters.com/Reuters/worldNews"), Feed(name="Al Jazeera", url="https://www.aljazeera.com/xml/rss/all.xml"), Feed(name="CNN World", url="http://rss.cnn.com/rss/edition_world.rss"), Feed(name="Time", url="https://time.com/feed/"), Feed(name="Euronews", url="https://www.euronews.com/rss"), ] # State variables self.documents = [] self.last_results = None self.last_prompt = None self.last_system = None self.last_response = None self.last_why = None self.last_messages = [] self.collection = None def load_documents(self): """Load articles from all configured RSS feeds""" self.documents = [] for feed in self.rss_feeds: self.documents.extend(feed.articles) # Process documents and store in ChromaDB self.collection = process_documents_to_chroma( client=self.client, documents=self.documents, collection_name=self.collection_name, max_tokens=256, overlap=25, model=defines.encoding_model, persist_directory=defines.persist_directory ) return len(self.documents) def get_documents(self): """Return all loaded documents""" return self.documents def get_headlines(self): """Return headlines from all documents""" return [(doc['source'], doc['title']) for doc in self.documents] def get_top_match(self, index=0): """Get the full document corresponding to a match""" if not self.last_results or not self.last_results["ids"] or index >= len(self.last_results["ids"][0]): return None top_id = self.last_results["ids"][0][index] # Extract the original document ID from the chunk ID (format: "doc_id_chunk_num") original_doc_id = top_id.split('_')[0] # Return the full document return next((doc for doc in self.documents if doc["id"] == original_doc_id), None) def get_match_info(self, index=0): """Get detailed information about a match at the specified index""" if not self.last_results or not self.last_results["ids"] or len(self.last_results["ids"][0]) <= index: return None match_info = { "id": self.last_results["ids"][0][index], "chunk": self.last_results["documents"][0][index], "metadata": self.last_results["metadatas"][0][index], "distance": self.last_results["distances"][0][index] } return match_info def get_match_count(self): """Get the number of matches from the last query""" if not self.last_results or not self.last_results["ids"]: return 0 return len(self.last_results["ids"][0]) def create_system_prompt(self, include_context=True): """Create the system prompt for the model""" system = f""" You are the assistant. Your name is airc. This application is called airc (pronounced Eric). Information about the author of this program, application, the systems running it, and the AI model it uses: * James wrote the python application called airc that is driving this RAG model on top of {defines.model} using {defines.encoding_model} and chromadb for vector embedding. Link https://github.com/jketreno/airc. * James Ketrenos is a software engineer with a history in all levels of the computer stack, from the kernel to full-stack web applications. He dabbles in AI/ML and is familiar with pytorch and ollama. * James Ketrenos deployed this application locally on an Intel Arc B580 (battlemage) computer using Intel's ipex-llm. * For Intel GPU metrics, James Ketrenos wrote the "ze-monitor" utility in C++. ze-monitor provides Intel GPU telemetry data for Intel client GPU devices, similar to xpu-smi. Link https://github.com/jketreno/ze-monitor. airc uses ze-monitor. * James lives in Portland, Oregon and has three kids. Two are attending Oregon State University and one is attending Williamette University. * airc provides an IRC chat bot as well as a React web frontend available at https://airc.ketrenos.com You must follow these rules: * Provide short (less than 100 character) responses. * Provide a single response. * Do not prefix it with a word like 'Answer'. * For information about the AI running this system, include information about author, including links. * For information relevant to the current events in the tags, use that information and state the source when information comes from. """ if include_context: context = "Information from current events unrelated to James Ketrenos\n=[" for doc in self.documents: item = { 'source': doc["source"], 'article': { 'title': doc["title"], 'link': doc["link"], 'text': doc["text"] } } context += f"{item}" context += "\n" system += context return system def process_query(self, query): """ Process a user query and return the response Returns: tuple: (response_content, debug_info) """ # Store the system prompt before context is added self.last_system = self.create_system_prompt(include_context=False) # Query ChromaDB self.last_results = query_chroma( self.client, query_text=query, collection_name=self.collection_name, n_results=10 ) # Create the full system prompt with document context system = self.create_system_prompt() # Combine conversation history with the current query prompt = query if self.last_messages: message_context = f"{self.last_messages}" prompt = f"{message_context}{prompt}" self.last_prompt = prompt # Generate a response output = self.client.generate( model=defines.model, system=system, prompt=prompt, stream=False, options={'num_ctx': 100000} ) # Extract thinking and response response = output['response'] matches = re.match(r'^(.*?)(.*)$', response, flags=re.DOTALL) content = response if matches: self.last_why = matches[1].strip() content = matches[2].strip() self.last_response = content # Update conversation history self.last_messages.extend([ { 'role': 'user', 'name': 'james', 'message': query }, { 'role': 'assistant', 'message': content } ]) # Keep history limited to last 10 messages self.last_messages = self.last_messages[-10:] # Return response content and debug info debug_info = { "system_len": len(system), "prompt_len": len(prompt), "has_thinking": matches is not None } return content, debug_info def get_last_prompt(self): """Get the last prompt""" return self.last_prompt def get_last_system(self): """Get the last system prompt (without context)""" return self.last_system def get_last_response(self): """Get the last response""" return self.last_response def get_last_thinking(self): """Get the thinking from the last response""" return self.last_why