# %% # Imports [standard] # Standard library modules (no try-except needed) import argparse import asyncio import json import logging import os import re import uuid import subprocess import re import math def try_import(module_name, pip_name=None): try: __import__(module_name) except ImportError: print(f"Module '{module_name}' not found. Install it using:") print(f" pip install {pip_name or module_name}") # Third-party modules with import checks try_import('ollama') try_import('requests') try_import('bs4', 'beautifulsoup4') try_import('fastapi') try_import('uvicorn') try_import('numpy') try_import('umap') try_import('sklearn') import ollama import requests from bs4 import BeautifulSoup from fastapi import FastAPI, Request, BackgroundTasks from fastapi.responses import JSONResponse, StreamingResponse, FileResponse, RedirectResponse from fastapi.middleware.cors import CORSMiddleware import uvicorn import numpy as np import umap from sklearn.preprocessing import MinMaxScaler from utils import ( rag as Rag, defines ) from tools import ( DateTime, WeatherForecast, TickerValue, tools ) rags = [ { "name": "JPK", "enabled": True, "description": "Expert data about James Ketrenos, including work history, personal hobbies, and projects." }, # { "name": "LKML", "enabled": False, "description": "Full associative data for entire LKML mailing list archive." }, ] def get_installed_ram(): try: with open('/proc/meminfo', 'r') as f: meminfo = f.read() match = re.search(r'MemTotal:\s+(\d+)', meminfo) if match: return f"{math.floor(int(match.group(1)) / 1000**2)}GB" # Convert KB to GB except Exception as e: return f"Error retrieving RAM: {e}" def get_graphics_cards(): gpus = [] try: # Run the ze-monitor utility result = subprocess.run(['ze-monitor'], capture_output=True, text=True, check=True) # Clean up the output (remove leading/trailing whitespace and newlines) output = result.stdout.strip() for index in range(len(output.splitlines())): result = subprocess.run(['ze-monitor', '--device', f'{index+1}', '--info'], capture_output=True, text=True, check=True) gpu_info = result.stdout.strip().splitlines() gpu = { "discrete": True, # Assume it's discrete initially "name": None, "memory": None } gpus.append(gpu) for line in gpu_info: match = re.match(r'^Device: [^(]*\((.*)\)', line) if match: gpu["name"] = match.group(1) continue match = re.match(r'^\s*Memory: (.*)', line) if match: gpu["memory"] = match.group(1) continue match = re.match(r'^.*Is integrated with host: Yes.*', line) if match: gpu["discrete"] = False continue return gpus except Exception as e: return f"Error retrieving GPU info: {e}" def get_cpu_info(): try: with open('/proc/cpuinfo', 'r') as f: cpuinfo = f.read() model_match = re.search(r'model name\s+:\s+(.+)', cpuinfo) cores_match = re.findall(r'processor\s+:\s+\d+', cpuinfo) if model_match and cores_match: return f"{model_match.group(1)} with {len(cores_match)} cores" except Exception as e: return f"Error retrieving CPU info: {e}" def system_info(model): return { "System RAM": get_installed_ram(), "Graphics Card": get_graphics_cards(), "CPU": get_cpu_info(), "LLM Model": model, "Context length": defines.max_context } # %% # Defaults OLLAMA_API_URL = defines.ollama_api_url MODEL_NAME = defines.model LOG_LEVEL="info" USE_TLS=False WEB_HOST="0.0.0.0" WEB_PORT=8911 DEFAULT_HISTORY_LENGTH=5 # %% # Globals context_tag = "INFO" system_message = f""" Launched on {DateTime()}. When answering queries, follow these steps: 1. First analyze the query to determine if real-time information might be helpful 2. Even when [{context_tag}] is provided, consider whether the tools would provide more current or comprehensive information 3. Use the provided tools whenever they would enhance your response, regardless of whether context is also available 4. When presenting weather forecasts, include relevant emojis immediately before the corresponding text. For example, for a sunny day, say \"☀️ Sunny\" or if the forecast says there will be \"rain showers, say \"🌧️ Rain showers\". Use this mapping for weather emojis: Sunny: ☀️, Cloudy: ☁️, Rainy: 🌧️, Snowy: ❄️ 4. When both [{context_tag}] and tool outputs are relevant, synthesize information from both sources to provide the most complete answer 5. Always prioritize the most up-to-date and relevant information, whether it comes from [{context_tag}] or tools 6. If [{context_tag}] and tool outputs contain conflicting information, prefer the tool outputs as they likely represent more current data Always use tools and [{context_tag}] when possible. Be concise, and never make up information. If you do not know the answer, say so. """.strip() system_generate_resume = f""" You are a professional resume writer. Your task is to write a polished, tailored resume for a specific job based only on the individual's [WORK HISTORY]. When answering queries, follow these steps: 1. You must not invent or assume any inforation not explicitly present in the [WORK HISTORY]. 2. Analyze the [JOB DESCRIPTION] to identify skills required for the job. 3. Use the [JOB DESCRIPTION] provided to guide the focus, tone, and relevant skills or experience to highlight from the [WORK HISTORY]. 4. Identify and emphasisze the experiences, achievements, and responsibilities from the [WORK HISTORY] that best align with the [JOB DESCRIPTION]. 5. Do not use the [JOB DESCRIPTION] skills unless listed in [WORK HISTORY]. Structure the resume professionally with the following sections where applicable: * "Name: Use full name." * "Professional Summary: A 2-4 sentence overview tailored to the job." * "Skills: A bullet list of key skills derived from the work history and relevant to the job." * Professional Experience: A detailed list of roles, achievements, and responsibilities from the work history that relate to the job." * Education: Include only if available in the work history." Do not include any information unless it is provided in [WORK HISTORY]. Ensure the langauge is clear, concise, and aligned with industry standards for professional resumes. """ system_fact_check = f""" You are a professional resume fact checker. Your task is to identify any inaccuracies in the [RESUME] based on the individual's [WORK HISTORY]. If there are inaccuracies, list them in a bullet point format. When answering queries, follow these steps: 1. You must not invent or assume any information not explicitly present in the [WORK HISTORY]. 2. Analyze the [RESUME] to identify any discrepancies or inaccuracies based on the [WORK HISTORY]. """ tool_log = [] command_log = [] model = None client = None web_server = None # %% # Cmd line overrides def parse_args(): parser = argparse.ArgumentParser(description="AI is Really Cool") parser.add_argument("--ollama-server", type=str, default=OLLAMA_API_URL, help=f"Ollama API endpoint. default={OLLAMA_API_URL}") parser.add_argument("--ollama-model", type=str, default=MODEL_NAME, help=f"LLM model to use. default={MODEL_NAME}") parser.add_argument("--web-host", type=str, default=WEB_HOST, help=f"Host to launch Flask web server. default={WEB_HOST} only if --web-disable not specified.") parser.add_argument("--web-port", type=str, default=WEB_PORT, help=f"Port to launch Flask web server. default={WEB_PORT} only if --web-disable not specified.") parser.add_argument('--level', type=str, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default=LOG_LEVEL, help=f'Set the logging level. default={LOG_LEVEL}') return parser.parse_args() def setup_logging(level): numeric_level = getattr(logging, level.upper(), None) if not isinstance(numeric_level, int): raise ValueError(f"Invalid log level: {level}") logging.basicConfig(level=numeric_level, format='%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s') logging.info(f"Logging is set to {level} level.") # %% async def AnalyzeSite(url, question): """ Fetches content from a URL, extracts the text, and uses Ollama to summarize it. Args: url (str): The URL of the website to summarize Returns: str: A summary of the website content """ global model, client try: # Fetch the webpage headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } logging.info(f"Fetching {url}") response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() logging.info(f"{url} returned. Processing...") # Parse the HTML soup = BeautifulSoup(response.text, 'html.parser') # Remove script and style elements for script in soup(["script", "style"]): script.extract() # Get text content text = soup.get_text(separator=' ', strip=True) # Clean up text (remove extra whitespace) lines = (line.strip() for line in text.splitlines()) chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) text = ' '.join(chunk for chunk in chunks if chunk) # Limit text length if needed (Ollama may have token limits) max_chars = 100000 if len(text) > max_chars: text = text[:max_chars] + "..." # Create Ollama client # logging.info(f"Requesting summary of: {text}") # Generate summary using Ollama prompt = f"CONTENTS:\n\n{text}\n\n{question}" response = client.generate(model=model, system="You are given the contents of {url}. Answer the question about the contents", prompt=prompt) #logging.info(response['response']) return { 'source': 'summarizer-llm', 'content': response['response'], 'metadata': DateTime() } except requests.exceptions.RequestException as e: return f"Error fetching the URL: {str(e)}" except Exception as e: return f"Error processing the website content: {str(e)}" # %% # %% def is_valid_uuid(value): try: uuid_obj = uuid.UUID(value, version=4) return str(uuid_obj) == value except (ValueError, TypeError): return False def default_tools(tools): return [{**tool, "enabled": True} for tool in tools] def find_summarize_tool(tools): return [{**tool, "enabled": True} for tool in tools if tool.get("name", "") == "AnalyzeSite"] def llm_tools(tools): return [tool for tool in tools if tool.get("enabled", False) == True] # %% async def handle_tool_calls(message): """ Process tool calls and yield status updates along the way. The last yielded item will be a tuple containing (tool_result, tools_used). """ tools_used = [] all_responses = [] for i, tool_call in enumerate(message['tool_calls']): arguments = tool_call['function']['arguments'] tool = tool_call['function']['name'] # Yield status update before processing each tool yield {"status": "processing", "message": f"Processing tool {i+1}/{len(message['tool_calls'])}: {tool}..."} # Process the tool based on its type match tool: case 'TickerValue': ticker = arguments.get('ticker') if not ticker: ret = None else: ret = TickerValue(ticker) tools_used.append({ "tool": f"{tool}({ticker})", "result": ret}) case 'AnalyzeSite': url = arguments.get('url') question = arguments.get('question', 'what is the summary of this content?') # Additional status update for long-running operations yield {"status": "processing", "message": f"Retrieving and summarizing content from {url}..."} ret = await AnalyzeSite(url, question) tools_used.append({ "tool": f"{tool}('{url}', '{question}')", "result": ret }) case 'DateTime': tz = arguments.get('timezone') ret = DateTime(tz) tools_used.append({ "tool": f"{tool}('{tz}')", "result": ret }) case 'WeatherForecast': city = arguments.get('city') state = arguments.get('state') yield {"status": "processing", "message": f"Fetching weather data for {city}, {state}..."} ret = WeatherForecast(city, state) tools_used.append({ "tool": f"{tool}('{city}', '{state}')", "result": ret }) case _: ret = None # Build response for this tool tool_response = { "role": "tool", "content": str(ret), "name": tool_call['function']['name'] } all_responses.append(tool_response) # Yield the final result as the last item final_result = all_responses[0] if len(all_responses) == 1 else all_responses yield (final_result, tools_used) # %% class WebServer: def __init__(self, logging, client, model=MODEL_NAME): self.logging = logging self.app = FastAPI() self.contexts = {} self.client = client self.model = model self.processing = False self.file_watcher = None self.observer = None self.app.add_middleware( CORSMiddleware, allow_origins=["http://battle-linux.ketrenos.com:3000"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @self.app.on_event("startup") async def startup_event(): # Start the file watcher self.observer, self.file_watcher = Rag.start_file_watcher( llm=client, watch_directory=defines.doc_dir, recreate=False # Don't recreate if exists ) print(f"API started with {self.file_watcher.collection.count()} documents in the collection") @self.app.on_event("shutdown") async def shutdown_event(): if self.observer: self.observer.stop() self.observer.join() print("File watcher stopped") self.setup_routes() def setup_routes(self): @self.app.get('/') async def root(): context = self.create_context() self.logging.info(f"Redirecting non-session to {context['id']}") return RedirectResponse(url=f"/{context['id']}", status_code=307) #return JSONResponse({"redirect": f"/{context['id']}"}) @self.app.get("/api/query") async def query_documents(query: str, top_k: int = 3): if not self.file_watcher: return """Query the RAG system with the given prompt.""" results = self.file_watcher.find_similar(query, top_k=top_k) return { "query": query, "results": [ { "content": doc, "metadata": meta, "distance": dist } for doc, meta, dist in zip( results["documents"], results["metadatas"], results["distances"] ) ] } @self.app.post("/api/refresh/{file_path:path}") async def refresh_document(file_path: str, background_tasks: BackgroundTasks): if not self.file_watcher: return """Manually refresh a specific document in the collection.""" full_path = os.path.join(defines.doc_dir, file_path) if not os.path.exists(full_path): return {"status": "error", "message": "File not found"} # Schedule the update in the background background_tasks.add_task( self.file_watcher.process_file_update, full_path ) return { "status": "success", "message": f"Document refresh scheduled for {file_path}" } # @self.app.post("/api/refresh-all") # async def refresh_all_documents(): # if not self.file_watcher: # return # """Refresh all documents in the collection.""" # # Re-initialize file hashes and process all files # self.file_watcher._initialize_file_hashes() # # Schedule updates for all files # file_paths = self.file_watcher.file_hashes.keys() # tasks = [self.file_watcher.process_file_update(path) for path in file_paths] # # Wait for all updates to complete # await asyncio.gather(*tasks) # return { # "status": "success", # "message": f"Refreshed {len(file_paths)} documents", # "document_count": file_watcher.collection.count() # } @self.app.put('/api/umap/{context_id}') async def put_umap(context_id: str, request: Request): if not self.file_watcher: return if not is_valid_uuid(context_id): logging.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) try: data = await request.json() dimensions = data.get('dimensions', 2) except: dimensions = 2 try: result = self.file_watcher.collection.get(include=['embeddings', 'documents', 'metadatas']) vectors = np.array(result['embeddings']) umap_model = umap.UMAP(n_components=dimensions, random_state=42) #, n_neighbors=15, min_dist=0.1) embedding = umap_model.fit_transform(vectors) context['umap_model'] = umap_model result['embeddings'] = embedding.tolist() return JSONResponse(result) except Exception as e: logging.error(e) return JSONResponse({"error": str(e)}, 500) @self.app.put('/api/similarity/{context_id}') async def put_similarity(context_id: str, request: Request): if not self.file_watcher: return if not is_valid_uuid(context_id): logging.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) if not context.get("umap_model"): return JSONResponse({"error": "No umap_model found in context"}, status_code=404) try: data = await request.json() query = data.get('query', '') except: query = '' if not query: return JSONResponse({"error": "No query provided"}, status_code=400) try: chroma_results = self.file_watcher.find_similar(query=query, top_k=10) if not chroma_results: return JSONResponse({"error": "No results found"}, status_code=404) chroma_embedding = chroma_results["query_embedding"] umap_embedding = context["umap_model"].transform([chroma_embedding])[0].tolist() return JSONResponse({ **chroma_results, "query": query, "vector_embedding": umap_embedding }) except Exception as e: logging.error(e) #return JSONResponse({"error": str(e)}, 500) @self.app.put('/api/reset/{context_id}') async def put_reset(context_id: str, request: Request): if not is_valid_uuid(context_id): logging.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) data = await request.json() try: response = {} for reset in data["reset"]: match reset: case "system-prompt": context["system"] = [{"role": "system", "content": system_message}] response["system-prompt"] = { "system-prompt": system_message } case "rags": context["rags"] = rags.copy() response["rags"] = context["rags"] case "tools": context["tools"] = default_tools(tools) response["tools"] = context["tools"] case "history": context["llm_history"] = [] context["user_history"] = [] response["history"] = [] context["context_tokens"] = round(len(str(context["system"])) * 3 / 4) # Estimate context usage response["context_used"] = context["context_tokens"] case "message-history-length": context["message_history_length"] = DEFAULT_HISTORY_LENGTH response["message-history-length"] = DEFAULT_HISTORY_LENGTH if not response: return JSONResponse({ "error": "Usage: { reset: rags|tools|history|system-prompt}"}) else: self.save_context(context_id) return JSONResponse(response) except: return JSONResponse({ "error": "Usage: { reset: rags|tools|history|system-prompt}"}) @self.app.put('/api/tunables/{context_id}') async def put_tunables(context_id: str, request: Request): if not is_valid_uuid(context_id): logging.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) data = await request.json() for k in data.keys(): match k: case "system-prompt": system_prompt = data[k].strip() if not system_prompt: return JSONResponse({ "status": "error", "message": "System prompt can not be empty." }) context["system"] = [{"role": "system", "content": system_prompt}] self.save_context(context_id) return JSONResponse({ "system-prompt": system_prompt }) case "message-history-length": value = max(0, int(data[k])) context["message_history_length"] = value self.save_context(context_id) return JSONResponse({ "message-history-length": value }) case _: return JSONResponse({ "error": f"Unrecognized tunable {k}"}, 404) @self.app.get('/api/tunables/{context_id}') async def get_tunables(context_id: str): if not is_valid_uuid(context_id): logging.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) return JSONResponse({ "system-prompt": context["system"][0]["content"], "message-history-length": context["message_history_length"] }) @self.app.get('/api/resume/{context_id}') async def get_resume(context_id: str): if not is_valid_uuid(context_id): logging.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) return JSONResponse(context["resume_history"]) @self.app.get('/api/system-info/{context_id}') async def get_system_info(context_id: str): return JSONResponse(system_info(self.model)) @self.app.post('/api/chat/{context_id}') async def chat_endpoint(context_id: str, request: Request): if not is_valid_uuid(context_id): logging.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) data = await request.json() # Create a custom generator that ensures flushing async def flush_generator(): async for message in self.chat(context=context, content=data['content']): # Convert to JSON and add newline yield json.dumps(message) + "\n" # Save the history as its generated self.save_context(context_id) # Explicitly flush after each yield await asyncio.sleep(0) # Allow the event loop to process the write # Return StreamingResponse with appropriate headers return StreamingResponse( flush_generator(), media_type="application/json", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Prevents Nginx buffering if you're using it } ) @self.app.post('/api/generate-resume/{context_id}') async def post_generate_resume(context_id: str, request: Request): if not is_valid_uuid(context_id): logging.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) data = await request.json() # Create a custom generator that ensures flushing async def flush_generator(): async for message in self.generate_resume(context=context, content=data['content']): # Convert to JSON and add newline yield json.dumps(message) + "\n" # Save the history as its generated self.save_context(context_id) # Explicitly flush after each yield await asyncio.sleep(0) # Allow the event loop to process the write # Return StreamingResponse with appropriate headers return StreamingResponse( flush_generator(), media_type="application/json", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Prevents Nginx buffering if you're using it } ) @self.app.post('/api/fact-check/{context_id}') async def post_fact_check(context_id: str, request: Request): if not is_valid_uuid(context_id): logging.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) data = await request.json() # Create a custom generator that ensures flushing async def flush_generator(): async for message in self.fact_check(context=context, content=data['content']): # Convert to JSON and add newline yield json.dumps(message) + "\n" # Save the history as its generated self.save_context(context_id) # Explicitly flush after each yield await asyncio.sleep(0) # Allow the event loop to process the write # Return StreamingResponse with appropriate headers return StreamingResponse( flush_generator(), media_type="application/json", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Prevents Nginx buffering if you're using it } ) @self.app.post('/api/context') async def create_context(): context = self.create_context() self.logging.info(f"Generated new session as {context['id']}") return JSONResponse(context) @self.app.get('/api/history/{context_id}') async def get_history(context_id: str): context = self.upsert_context(context_id) return JSONResponse(context["user_history"]) @self.app.get('/api/tools/{context_id}') async def get_tools(context_id: str): context = self.upsert_context(context_id) return JSONResponse(context["tools"]) @self.app.put('/api/tools/{context_id}') async def put_tools(context_id: str, request: Request): if not is_valid_uuid(context_id): logging.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) try: data = await request.json() modify = data["tool"] enabled = data["enabled"] for tool in context["tools"]: if modify == tool["function"]["name"]: tool["enabled"] = enabled self.save_context(context_id) return JSONResponse(context["tools"]) return JSONResponse({ "status": f"{modify} not found in tools." }), 404 except: return JSONResponse({ "status": "error" }), 405 @self.app.get('/api/rags/{context_id}') async def get_rags(context_id: str): context = self.upsert_context(context_id) return JSONResponse(context["rags"]) @self.app.put('/api/rags/{context_id}') async def put_rags(context_id: str, request: Request): if not is_valid_uuid(context_id): logging.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) try: data = await request.json() modify = data["tool"] enabled = data["enabled"] for tool in context["rags"]: if modify == tool["name"]: tool["enabled"] = enabled self.save_context(context_id) return JSONResponse(context["rags"]) return JSONResponse({ "status": f"{modify} not found in tools." }), 404 except: return JSONResponse({ "status": "error" }), 405 @self.app.get('/api/context-status/{context_id}') async def get_context_status(context_id): if not is_valid_uuid(context_id): logging.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) return JSONResponse({"context_used": context["context_tokens"], "max_context": defines.max_context}) @self.app.get('/api/health') async def health_check(): return JSONResponse({"status": "healthy"}) @self.app.get('/{path:path}') async def serve_static(path: str): full_path = os.path.join(defines.static_content, path) if os.path.exists(full_path) and os.path.isfile(full_path): self.logging.info(f"Serve static request for {full_path}") return FileResponse(full_path) self.logging.info(f"Serve index.html for {path}") return FileResponse(os.path.join(defines.static_content, 'index.html')) def save_context(self, session_id): """ Serialize a Python dictionary to a file in the sessions directory. Args: data: Dictionary containing the session data session_id: UUID string for the context. If it doesn't exist, it is created Returns: The session_id used for the file """ context = self.upsert_context(session_id) # Create sessions directory if it doesn't exist if not os.path.exists(defines.session_dir): os.makedirs(defines.session_dir) # Create the full file path file_path = os.path.join(defines.session_dir, session_id) umap_model = context.get("umap_model") if umap_model: del context["umap_model"] # Serialize the data to JSON and write to file with open(file_path, 'w') as f: json.dump(context, f) if umap_model: context["umap_model"] = umap_model return session_id def load_context(self, session_id): """ Load a serialized Python dictionary from a file in the sessions directory. Args: session_id: UUID string for the filename Returns: The deserialized dictionary, or a new context if it doesn't exist on disk. """ file_path = os.path.join(defines.session_dir, session_id) # Check if the file exists if not os.path.exists(file_path): return self.create_context(session_id) # Read and deserialize the data with open(file_path, 'r') as f: self.contexts[session_id] = json.load(f) return self.contexts[session_id] def create_context(self, context_id = None): if not context_id: context_id = str(uuid.uuid4()) system_context = [{"role": "system", "content": system_message}]; context = { "id": context_id, "system": system_context, "system_generate_resume": system_generate_resume, "llm_history": [], "user_history": [], "tools": default_tools(tools), "resume_history": [], "rags": rags.copy(), "context_tokens": round(len(str(system_context)) * 3 / 4), # Estimate context usage "message_history_length": 5 # Number of messages to supply in context } logging.info(f"{context_id} created and added to sessions.") self.contexts[context_id] = context return context def get_optimal_ctx_size(self, context, messages, ctx_buffer = 4096): ctx = round(context + len(str(messages)) * 3 / 4) return max(defines.max_context, min(2048, ctx + ctx_buffer)) def upsert_context(self, context_id): if not context_id: logging.warning("No context ID provided. Creating a new context.") return self.create_context() if context_id in self.contexts: logging.info(f"Context {context_id} found.") return self.contexts[context_id] logging.info(f"Context {context_id} not found. Creating new context.") return self.load_context(context_id) async def chat(self, context, content): if not self.file_watcher: return content = content.strip() if not content: yield {"status": "error", "message": "Invalid request"} return if self.processing: yield {"status": "error", "message": "Busy"} return self.processing = True llm_history = context["llm_history"] user_history = context["user_history"] metadata = { "rag": {}, "tools": [], "eval_count": 0, "eval_duration": 0, "prompt_eval_count": 0, "prompt_eval_duration": 0, } rag_docs = [] for rag in context["rags"]: if rag["enabled"] and rag["name"] == "JPK": # Only support JPK rag right now... yield {"status": "processing", "message": f"Checking RAG context {rag['name']}..."} chroma_results = self.file_watcher.find_similar(query=content, top_k=10) if chroma_results: rag_docs.extend(chroma_results["documents"]) metadata["rag"] = { "name": rag["name"], **chroma_results } preamble = "" if len(rag_docs): preamble = f""" 1. Respond to this query: {content} 2. If there is information in this context to enhance the answer, do so: [{context_tag}]:\n""" for doc in rag_docs: preamble += doc preamble += f"\n[/{context_tag}]\nUse all of that information to respond to: " # Figure llm_history.append({"role": "user", "content": preamble + content}) user_history.append({"role": "user", "content": content}) if context["message_history_length"]: messages = context["system"] + llm_history[-context["message_history_length"]:] else: messages = context["system"] + llm_history try: # Estimate token length of new messages ctx_size = self.get_optimal_ctx_size(context["context_tokens"], messages=llm_history[-1]["content"]) yield {"status": "processing", "message": "Processing request...", "num_ctx": ctx_size} # Use the async generator in an async for loop response = self.client.chat(model=self.model, messages=messages, tools=llm_tools(context["tools"]), options={ 'num_ctx': ctx_size }) metadata["eval_count"] += response['eval_count'] metadata["eval_duration"] += response['eval_duration'] metadata["prompt_eval_count"] += response['prompt_eval_count'] metadata["prompt_eval_duration"] += response['prompt_eval_duration'] context["context_tokens"] = response['prompt_eval_count'] + response['eval_count'] tools_used = [] yield {"status": "processing", "message": "Initial response received..."} if 'tool_calls' in response.get('message', {}): yield {"status": "processing", "message": "Processing tool calls..."} message = response['message'] tool_result = None # Process all yielded items from the handler async for item in handle_tool_calls(message): if isinstance(item, tuple) and len(item) == 2: # This is the final result tuple (tool_result, tools_used) tool_result, tools_used = item else: # This is a status update, forward it yield item message_dict = { 'role': message.get('role', 'assistant'), 'content': message.get('content', '') } if 'tool_calls' in message: message_dict['tool_calls'] = [ {'function': {'name': tc['function']['name'], 'arguments': tc['function']['arguments']}} for tc in message['tool_calls'] ] pre_add_index = len(messages) messages.append(message_dict) if isinstance(tool_result, list): messages.extend(tool_result) else: messages.append(tool_result) metadata["tools"] = tools_used # Estimate token length of new messages ctx_size = self.get_optimal_ctx_size(context["context_tokens"], messages=messages[pre_add_index:]) yield {"status": "processing", "message": "Generating final response...", "num_ctx": ctx_size } # Decrease creativity when processing tool call requests response = self.client.chat(model=self.model, messages=messages, stream=False, options={ 'num_ctx': ctx_size }) #, "temperature": 0.5 }) metadata["eval_count"] += response['eval_count'] metadata["eval_duration"] += response['eval_duration'] metadata["prompt_eval_count"] += response['prompt_eval_count'] metadata["prompt_eval_duration"] += response['prompt_eval_duration'] context["context_tokens"] = response['prompt_eval_count'] + response['eval_count'] reply = response['message']['content'] final_message = {"role": "assistant", "content": reply } # history is provided to the LLM and should not have additional metadata llm_history.append(final_message) final_message["metadata"] = metadata # user_history is provided to the REST API and does not include CONTEXT or metadata user_history.append(final_message) # Return the REST API with metadata yield {"status": "done", "message": final_message } except Exception as e: logging.exception({ 'model': self.model, 'messages': messages, 'error': str(e) }) yield {"status": "error", "message": f"An error occurred: {str(e)}"} finally: self.processing = False async def generate_resume(self, context, content): if not self.file_watcher: return content = content.strip() if not content: yield {"status": "error", "message": "Invalid request"} return if self.processing: yield {"status": "error", "message": "Busy"} return self.processing = True resume_history = context["resume_history"] resume = { "job_description": content, "resume": "", "metadata": {}, "rag": "", "fact_check": "" } metadata = { "rag": {}, "tools": [], "eval_count": 0, "eval_duration": 0, "prompt_eval_count": 0, "prompt_eval_duration": 0, } rag_docs = [] resume_doc = open(defines.resume_doc, 'r').read() rag_docs.append(resume_doc) for rag in context["rags"]: if rag["enabled"] and rag["name"] == "JPK": # Only support JPK rag right now... yield {"status": "processing", "message": f"Checking RAG context {rag['name']}..."} chroma_results = self.file_watcher.find_similar(query=content, top_k=10) if chroma_results: rag_docs.extend(chroma_results["documents"]) metadata["rag"] = { "name": rag["name"], **chroma_results } preamble = f"The current time is {DateTime()}\n" preamble = f"""[WORK HISTORY]:\n""" for doc in rag_docs: preamble += f"{doc}\n" resume["rag"] += f"{doc}\n" preamble += f"\n[/WORK HISTORY]\n" content = f"{preamble}\nUse the above WORK HISTORY to create the resume for this JOB DESCRIPTION. Do not use the JOB DESCRIPTION skills as skills the user posseses unless listed in WORK HISTORY:\n[JOB DESCRIPTION]\n{content}\n[/JOB DESCRIPTION]\n" try: # Estimate token length of new messages ctx_size = self.get_optimal_ctx_size(context["context_tokens"], messages=[system_generate_resume, content]) yield {"status": "processing", "message": "Processing request...", "num_ctx": ctx_size} # Use the async generator in an async for loop # # To support URL lookup: # # 1. Enable tools in a call to chat() with a simple prompt to invoke the tool to generate the summary if requested. # 2. If not requested (no tool call,) abort the path # 3. Otherwise, we know the URL was good and can use that URLs fetched content as context. # response = self.client.generate(model=self.model, system=system_generate_resume, prompt=content, options={ 'num_ctx': ctx_size }) metadata["eval_count"] += response['eval_count'] metadata["eval_duration"] += response['eval_duration'] metadata["prompt_eval_count"] += response['prompt_eval_count'] metadata["prompt_eval_duration"] += response['prompt_eval_duration'] context["context_tokens"] = response['prompt_eval_count'] + response['eval_count'] reply = response['response'] final_message = {"role": "assistant", "content": reply, "metadata": metadata } resume['resume'] = final_message resume_history.append(resume) # Return the REST API with metadata yield {"status": "done", "message": final_message } except Exception as e: logging.exception({ 'model': self.model, 'content': content, 'error': str(e) }) yield {"status": "error", "message": f"An error occurred: {str(e)}"} finally: self.processing = False async def fact_check(self, context, content): content = content.strip() if not content: yield {"status": "error", "message": "Invalid request"} return if self.processing: yield {"status": "error", "message": "Busy"} return self.processing = True resume_history = context["resume_history"] if len(resume_history) == 0: yield {"status": "done", "message": "No resume history found." } return resume = resume_history[-1] metadata = resume["metadata"] metadata["eval_count"] = 0 metadata["eval_duration"] = 0 metadata["prompt_eval_count"] = 0 metadata["prompt_eval_duration"] = 0 content = f"[WORK HISTORY]:{resume['rag']}[/WORK HISTORY]\n\n[RESUME]\n{resume['resume']['content']}\n[/RESUME]\n\n" try: # Estimate token length of new messages ctx_size = self.get_optimal_ctx_size(context["context_tokens"], messages=[system_fact_check, content]) yield {"status": "processing", "message": "Processing request...", "num_ctx": ctx_size} response = self.client.generate(model=self.model, system=system_fact_check, prompt=content, options={ 'num_ctx': ctx_size }) logging.info(f"Fact checking {ctx_size} tokens.") metadata["eval_count"] += response['eval_count'] metadata["eval_duration"] += response['eval_duration'] metadata["prompt_eval_count"] += response['prompt_eval_count'] metadata["prompt_eval_duration"] += response['prompt_eval_duration'] context["context_tokens"] = response['prompt_eval_count'] + response['eval_count'] reply = response['response'] final_message = {"role": "assistant", "content": reply, "metadata": metadata } resume['fact_check'] = final_message # Return the REST API with metadata yield {"status": "done", "message": final_message } except Exception as e: logging.exception({ 'model': self.model, 'content': content, 'error': str(e) }) yield {"status": "error", "message": f"An error occurred: {str(e)}"} finally: self.processing = False def run(self, host='0.0.0.0', port=WEB_PORT, **kwargs): try: uvicorn.run(self.app, host=host, port=port) except KeyboardInterrupt: if self.observer: self.observer.stop() if self.observer: self.observer.join() # %% # Main function to run everything def main(): global client, model, web_server # Parse command-line arguments args = parse_args() # Setup logging based on the provided level setup_logging(args.level) client = ollama.Client(host=args.ollama_server) model = args.ollama_model # documents = Rag.load_text_files(defines.doc_dir) # print(f"Documents loaded {len(documents)}") # chunks = Rag.create_chunks_from_documents(documents) # doc_types = set(chunk.metadata['doc_type'] for chunk in chunks) # print(f"Document types: {doc_types}") # print(f"Vectorstore created with {collection.count()} documents") web_server = WebServer(logging, client, model) logging.info(f"Starting web server at http://{args.web_host}:{args.web_port}") web_server.run(host=args.web_host, port=args.web_port, use_reloader=False) main()