from utils import logger from typing import AsyncGenerator # %% # Imports [standard] # Standard library modules (no try-except needed) import argparse import asyncio import json import logging import os import re import uuid import subprocess import re import math import warnings from typing import Any from collections import deque from datetime import datetime from uuid import uuid4 def try_import(module_name, pip_name=None): try: __import__(module_name) except ImportError: print(f"Module '{module_name}' not found. Install it using:") print(f" pip install {pip_name or module_name}") # Third-party modules with import checks try_import("ollama") try_import("requests") try_import("fastapi") try_import("uvicorn") try_import("numpy") try_import("umap") try_import("sklearn") try_import("prometheus_client") try_import("prometheus_fastapi_instrumentator") import ollama import requests from contextlib import asynccontextmanager from fastapi import FastAPI, Request, BackgroundTasks # type: ignore from fastapi.responses import JSONResponse, StreamingResponse, FileResponse, RedirectResponse # type: ignore from fastapi.middleware.cors import CORSMiddleware # type: ignore import uvicorn # type: ignore import numpy as np # type: ignore import umap # type: ignore from sklearn.preprocessing import MinMaxScaler # type: ignore from prometheus_client import Summary # type: ignore from prometheus_fastapi_instrumentator import Instrumentator # type: ignore from utils import ( rag as Rag, tools as Tools, Context, Conversation, Message, Agent, Tunables, defines, logger, ) CONTEXT_VERSION=2 rags = [ { "name": "JPK", "enabled": True, "description": "Expert data about James Ketrenos, including work history, personal hobbies, and projects." }, # { "name": "LKML", "enabled": False, "description": "Full associative data for entire LKML mailing list archive." }, ] REQUEST_TIME = Summary('request_processing_seconds', 'Time spent processing request') system_message_old = f""" Launched on {datetime.now().isoformat()}. When answering queries, follow these steps: 1. First analyze the query to determine if real-time information might be helpful 2. Even when <|context|> is provided, consider whether the tools would provide more current or comprehensive information 3. Use the provided tools whenever they would enhance your response, regardless of whether context is also available 4. When presenting weather forecasts, include relevant emojis immediately before the corresponding text. For example, for a sunny day, say \"☀️ Sunny\" or if the forecast says there will be \"rain showers, say \"🌧️ Rain showers\". Use this mapping for weather emojis: Sunny: ☀️, Cloudy: ☁️, Rainy: 🌧️, Snowy: ❄️ 4. When both <|context|> and tool outputs are relevant, synthesize information from both sources to provide the most complete answer 5. Always prioritize the most up-to-date and relevant information, whether it comes from <|context|> or tools 6. If <|context|> and tool outputs contain conflicting information, prefer the tool outputs as they likely represent more current data Always use tools and <|context|> when possible. Be concise, and never make up information. If you do not know the answer, say so. """.strip() system_fact_check_QA = f""" Launched on {datetime.now().isoformat()}. You are a professional resume fact checker. You are provided with a <|resume|> which was generated by you, the <|context|> you used to generate that <|resume|>, and a <|fact_check|> generated by you when you analyzed <|context|> against the <|resume|> to identify dicrepancies. Your task is to answer questions about the <|fact_check|> you generated based on the <|resume|> and <|context>. """ def get_installed_ram(): try: with open("/proc/meminfo", "r") as f: meminfo = f.read() match = re.search(r"MemTotal:\s+(\d+)", meminfo) if match: return f"{math.floor(int(match.group(1)) / 1000**2)}GB" # Convert KB to GB except Exception as e: return f"Error retrieving RAM: {e}" def get_graphics_cards(): gpus = [] try: # Run the ze-monitor utility result = subprocess.run(["ze-monitor"], capture_output=True, text=True, check=True) # Clean up the output (remove leading/trailing whitespace and newlines) output = result.stdout.strip() for index in range(len(output.splitlines())): result = subprocess.run(["ze-monitor", "--device", f"{index+1}", "--info"], capture_output=True, text=True, check=True) gpu_info = result.stdout.strip().splitlines() gpu = { "discrete": True, # Assume it's discrete initially "name": None, "memory": None } gpus.append(gpu) for line in gpu_info: match = re.match(r"^Device: [^(]*\((.*)\)", line) if match: gpu["name"] = match.group(1) continue match = re.match(r"^\s*Memory: (.*)", line) if match: gpu["memory"] = match.group(1) continue match = re.match(r"^.*Is integrated with host: Yes.*", line) if match: gpu["discrete"] = False continue return gpus except Exception as e: return f"Error retrieving GPU info: {e}" def get_cpu_info(): try: with open("/proc/cpuinfo", "r") as f: cpuinfo = f.read() model_match = re.search(r"model name\s+:\s+(.+)", cpuinfo) cores_match = re.findall(r"processor\s+:\s+\d+", cpuinfo) if model_match and cores_match: return f"{model_match.group(1)} with {len(cores_match)} cores" except Exception as e: return f"Error retrieving CPU info: {e}" def system_info(model): return { "System RAM": get_installed_ram(), "Graphics Card": get_graphics_cards(), "CPU": get_cpu_info(), "LLM Model": model, "Embedding Model": defines.embedding_model, "Context length": defines.max_context } # %% # Defaults OLLAMA_API_URL = defines.ollama_api_url MODEL_NAME = defines.model LOG_LEVEL="info" USE_TLS=False WEB_HOST="0.0.0.0" WEB_PORT=8911 DEFAULT_HISTORY_LENGTH=5 # %% # Globals def create_system_message(prompt): return [{"role": "system", "content": prompt}] tool_log = [] command_log = [] model = None client = None web_server = None # %% # Cmd line overrides def parse_args(): parser = argparse.ArgumentParser(description="AI is Really Cool") parser.add_argument("--ollama-server", type=str, default=OLLAMA_API_URL, help=f"Ollama API endpoint. default={OLLAMA_API_URL}") parser.add_argument("--ollama-model", type=str, default=MODEL_NAME, help=f"LLM model to use. default={MODEL_NAME}") parser.add_argument("--web-host", type=str, default=WEB_HOST, help=f"Host to launch Flask web server. default={WEB_HOST} only if --web-disable not specified.") parser.add_argument("--web-port", type=str, default=WEB_PORT, help=f"Port to launch Flask web server. default={WEB_PORT} only if --web-disable not specified.") parser.add_argument("--level", type=str, choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], default=LOG_LEVEL, help=f"Set the logging level. default={LOG_LEVEL}") return parser.parse_args() # %% # %% # %% def is_valid_uuid(value: str) -> bool: try: uuid_obj = uuid.UUID(value, version=4) return str(uuid_obj) == value except (ValueError, TypeError): return False # %% class WebServer: @asynccontextmanager async def lifespan(self, app: FastAPI): # Start the file watcher self.observer, self.file_watcher = Rag.start_file_watcher( llm=self.llm, watch_directory=defines.doc_dir, recreate=False # Don't recreate if exists ) logger.info(f"API started with {self.file_watcher.collection.count()} documents in the collection") yield if self.observer: self.observer.stop() self.observer.join() logger.info("File watcher stopped") def __init__(self, llm, model=MODEL_NAME): self.app = FastAPI(lifespan=self.lifespan) Instrumentator().instrument(self.app) Instrumentator().expose(self.app) self.contexts = {} self.llm = llm self.model = model self.processing = False self.file_watcher = None self.observer = None self.ssl_enabled = os.path.exists(defines.key_path) and os.path.exists(defines.cert_path) if self.ssl_enabled: allow_origins=["https://battle-linux.ketrenos.com:3000"] else: allow_origins=["http://battle-linux.ketrenos.com:3000"] logger.info(f"Allowed origins: {allow_origins}") self.app.add_middleware( CORSMiddleware, allow_origins=allow_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) self.setup_routes() def setup_routes(self): @self.app.get("/") async def root(): context = self.create_context() logger.info(f"Redirecting non-context to {context.id}") return RedirectResponse(url=f"/{context.id}", status_code=307) #return JSONResponse({"redirect": f"/{context.id}"}) @self.app.put("/api/umap/{context_id}") async def put_umap(context_id: str, request: Request): logger.info(f"{request.method} {request.url.path}") try: if not self.file_watcher: raise Exception("File watcher not initialized") context = self.upsert_context(context_id) if not context: return JSONResponse({"error": f"Invalid context: {context_id}"}, status_code=400) data = await request.json() dimensions = data.get("dimensions", 2) result = self.file_watcher.umap_collection if not result: return JSONResponse({"error": "No UMAP collection found"}, status_code=404) if dimensions == 2: logger.info("Returning 2D UMAP") umap_embedding = self.file_watcher.umap_embedding_2d else: logger.info("Returning 3D UMAP") umap_embedding = self.file_watcher.umap_embedding_3d if len(umap_embedding) == 0: return JSONResponse({"error": "No UMAP embedding found"}, status_code=404) result["embeddings"] = umap_embedding.tolist() return JSONResponse(result) except Exception as e: logger.error(f"put_umap error: {str(e)}") import traceback logger.error(traceback.format_exc()) return JSONResponse({"error": str(e)}, 500) @self.app.put("/api/similarity/{context_id}") async def put_similarity(context_id: str, request: Request): logger.info(f"{request.method} {request.url.path}") if not self.file_watcher: raise Exception("File watcher not initialized") if not is_valid_uuid(context_id): logger.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) try: data = await request.json() query = data.get("query", "") except: query = "" if not query: return JSONResponse({"error": "No query provided for similarity search"}, status_code=400) try: chroma_results = self.file_watcher.find_similar(query=query, top_k=10) if not chroma_results: return JSONResponse({"error": "No results found"}, status_code=404) chroma_embedding = np.array(chroma_results["query_embedding"]).flatten() # Ensure correct shape logger.info(f"Chroma embedding shape: {chroma_embedding.shape}") umap_2d = self.file_watcher.umap_model_2d.transform([chroma_embedding])[0].tolist() logger.info(f"UMAP 2D output: {umap_2d}, length: {len(umap_2d)}") # Debug output umap_3d = self.file_watcher.umap_model_3d.transform([chroma_embedding])[0].tolist() logger.info(f"UMAP 3D output: {umap_3d}, length: {len(umap_3d)}") # Debug output return JSONResponse({ **chroma_results, "query": query, "umap_embedding_2d": umap_2d, "umap_embedding_3d": umap_3d }) except Exception as e: logger.error(e) #return JSONResponse({"error": str(e)}, 500) @self.app.put("/api/reset/{context_id}/{agent_type}") async def put_reset(context_id: str, agent_type: str, request: Request): logger.info(f"{request.method} {request.url.path}") if not is_valid_uuid(context_id): logger.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) agent = context.get_agent(agent_type) if not agent: return JSONResponse({ "error": f"{agent_type} is not recognized", "context": context.id }, status_code=404) data = await request.json() try: response = {} for reset_operation in data["reset"]: match reset_operation: case "system_prompt": logger.info(f"Resetting {reset_operation}") # match agent_type: # case "chat": # prompt = system_message # case "job_description": # prompt = system_generate_resume # case "resume": # prompt = system_generate_resume # case "fact_check": # prompt = system_message # case _: # prompt = system_message # agent.system_prompt = prompt # response["system_prompt"] = { "system_prompt": prompt } case "rags": logger.info(f"Resetting {reset_operation}") context.rags = rags.copy() response["rags"] = context.rags case "tools": logger.info(f"Resetting {reset_operation}") context.tools = Tools.enabled_tools(Tools.tools) response["tools"] = context.tools case "history": reset_map = { "job_description": ("job_description", "resume", "fact_check"), "resume": ("job_description", "resume", "fact_check"), "fact_check": ("job_description", "resume", "fact_check"), "chat": ("chat",), } resets = reset_map.get(agent_type, ()) for mode in resets: tmp = context.get_agent(mode) if not tmp: continue logger.info(f"Resetting {reset_operation} for {mode}") context.conversation = Conversation() context.context_tokens = round(len(str(agent.system_prompt)) * 3 / 4) # Estimate context usage response["history"] = [] response["context_used"] = agent.context_tokens case "message_history_length": logger.info(f"Resetting {reset_operation}") context.message_history_length = DEFAULT_HISTORY_LENGTH response["message_history_length"] = DEFAULT_HISTORY_LENGTH if not response: return JSONResponse({ "error": "Usage: { reset: rags|tools|history|system_prompt}"}) else: self.save_context(context_id) return JSONResponse(response) except: return JSONResponse({ "error": "Usage: { reset: rags|tools|history|system_prompt}"}) @self.app.put("/api/tunables/{context_id}") async def put_tunables(context_id: str, request: Request): logger.info(f"{request.method} {request.url.path}") try: context = self.upsert_context(context_id) data = await request.json() agent = context.get_agent("chat") if not agent: return JSONResponse({ "error": f"chat is not recognized", "context": context.id }, status_code=404) for k in data.keys(): match k: case "tools": # { "tools": [{ "tool": tool?.name, "enabled": tool.enabled }] } tools : list[dict[str, Any]] = data[k] if not tools: return JSONResponse({ "status": "error", "message": "Tools can not be empty." }) for tool in tools: for context_tool in context.tools: if context_tool["function"]["name"] == tool["name"]: context_tool["enabled"] = tool["enabled"] self.save_context(context_id) return JSONResponse({ "tools": [ { **t["function"], "enabled": t["enabled"], } for t in context.tools] }) case "rags": # { "rags": [{ "tool": tool?.name, "enabled": tool.enabled }] } rags : list[dict[str, Any]] = data[k] if not rags: return JSONResponse({ "status": "error", "message": "RAGs can not be empty." }) for rag in rags: for context_rag in context.rags: if context_rag["name"] == rag["name"]: context_rag["enabled"] = rag["enabled"] self.save_context(context_id) return JSONResponse({ "rags": context.rags }) case "system_prompt": system_prompt = data[k].strip() if not system_prompt: return JSONResponse({ "status": "error", "message": "System prompt can not be empty." }) agent.system_prompt = system_prompt self.save_context(context_id) return JSONResponse({ "system_prompt": system_prompt }) case "message_history_length": value = max(0, int(data[k])) context.message_history_length = value self.save_context(context_id) return JSONResponse({ "message_history_length": value }) case _: return JSONResponse({ "error": f"Unrecognized tunable {k}"}, status_code=404) except Exception as e: logger.error(f"Error in put_tunables: {e}") return JSONResponse({"error": str(e)}, status_code=500) @self.app.get("/api/tunables/{context_id}") async def get_tunables(context_id: str, request: Request): logger.info(f"{request.method} {request.url.path}") if not is_valid_uuid(context_id): logger.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) agent = context.get_agent("chat") if not agent: return JSONResponse({ "error": f"chat is not recognized", "context": context.id }, status_code=404) return JSONResponse({ "system_prompt": agent.system_prompt, "message_history_length": context.message_history_length, "rags": context.rags, "tools": [ { **t["function"], "enabled": t["enabled"], } for t in context.tools ] }) @self.app.get("/api/system-info/{context_id}") async def get_system_info(context_id: str, request: Request): logger.info(f"{request.method} {request.url.path}") return JSONResponse(system_info(self.model)) @self.app.post("/api/chat/{context_id}/{agent_type}") async def post_chat_endpoint(context_id: str, agent_type: str, request: Request): logger.info(f"{request.method} {request.url.path}") if not is_valid_uuid(context_id): logger.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) try: context = self.upsert_context(context_id) try: agent = context.get_agent(agent_type) except Exception as e: logger.info(f"Attempt to create agent type: {agent_type} failed", e) return JSONResponse({ "error": f"{agent_type} is not recognized", "context": context.id }, status_code=404) query = await request.json() prompt = query["prompt"] if not isinstance(prompt, str) or len(prompt) == 0: logger.info(f"Prompt is empty") return JSONResponse({"error": "Prompt can not be empty"}, status_code=400) try: options = Tunables(**query["options"]) if "options" in query else None except Exception as e: logger.info(f"Attempt to set tunables failed: {query['options']}.", e) return JSONResponse({"error": f"Invalid options: {query['options']}"}, status_code=400) if not agent: # job_description is the only agent that is dynamically generated from a # Rest API endpoint. # - 'chat' is created on context creation. # - 'resume' is created on actions by 'job_description' # - 'fact_check' is created on ations by 'fact_check' match agent_type: case "job_description": logger.info(f"Agent {agent_type} not found. Returning empty history.") agent = context.get_or_create_agent("job_description", job_description=prompt) case _: logger.info(f"Invalid agent creation sequence for {agent_type}. Returning error.") return JSONResponse({ "error": f"{agent_type} is not recognized", "context": context.id }, status_code=404) # Create a custom generator that ensures flushing async def flush_generator(): logging.info(f"Message starting. Streaming partial results.") async for message in self.generate_response(context=context, agent=agent, prompt=prompt, options=options): if message.status != "done": result = { "status": message.status, "response": message.response } else: logging.info(f"Message complete. Providing full response.") try: result = message.model_dump(by_alias=True, mode='json') except Exception as e: result = { "status": "error", "response": e } exit(1) # Convert to JSON and add newline result = json.dumps(result) + "\n" message.network_packets += 1 message.network_bytes += len(result) yield result # Explicitly flush after each yield await asyncio.sleep(0) # Allow the event loop to process the write # Save the history once completed self.save_context(context_id) # Return StreamingResponse with appropriate headers return StreamingResponse( flush_generator(), media_type="application/json", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Prevents Nginx buffering if you're using it } ) except Exception as e: logger.error(f"Error in post_chat_endpoint: {e}") return JSONResponse({"error": str(e)}, status_code=500) @self.app.post("/api/context") async def create_context(): try: context = self.create_context() logger.info(f"Generated new agent as {context.id}") return JSONResponse({ "id": context.id }) except Exception as e: logger.error(f"get_history error: {str(e)}") import traceback logger.error(traceback.format_exc()) return JSONResponse({"error": str(e)}, status_code=404) @self.app.get("/api/history/{context_id}/{agent_type}") async def get_history(context_id: str, agent_type: str, request: Request): logger.info(f"{request.method} {request.url.path}") try: context = self.upsert_context(context_id) agent = context.get_agent(agent_type) if not agent: logger.info(f"Agent {agent_type} not found. Returning empty history.") return JSONResponse({ "messages": [] }) logger.info(f"History for {agent_type} contains {len(agent.conversation)} entries.") return agent.conversation except Exception as e: logger.error(f"get_history error: {str(e)}") import traceback logger.error(traceback.format_exc()) return JSONResponse({"error": str(e)}, status_code=404) @self.app.get("/api/tools/{context_id}") async def get_tools(context_id: str, request: Request): logger.info(f"{request.method} {request.url.path}") context = self.upsert_context(context_id) return JSONResponse(context.tools) @self.app.put("/api/tools/{context_id}") async def put_tools(context_id: str, request: Request): logger.info(f"{request.method} {request.url.path}") if not is_valid_uuid(context_id): logger.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) try: data = await request.json() modify = data["tool"] enabled = data["enabled"] for tool in context.tools: if modify == tool["function"]["name"]: tool["enabled"] = enabled self.save_context(context_id) return JSONResponse(context.tools) return JSONResponse({ "status": f"{modify} not found in tools." }, status_code=404) except: return JSONResponse({ "status": "error" }, 405) @self.app.get("/api/context-status/{context_id}/{agent_type}") async def get_context_status(context_id, agent_type: str, request: Request): logger.info(f"{request.method} {request.url.path}") if not is_valid_uuid(context_id): logger.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) agent = context.get_agent(agent_type) if not agent: return JSONResponse({"context_used": 0, "max_context": defines.max_context}) return JSONResponse({"context_used": agent.context_tokens, "max_context": defines.max_context}) @self.app.get("/api/health") async def health_check(): return JSONResponse({"status": "healthy"}) @self.app.get("/{path:path}") async def serve_static(path: str, request: Request): full_path = os.path.join(defines.static_content, path) if os.path.exists(full_path) and os.path.isfile(full_path): logger.info(f"Serve static request for {full_path}") return FileResponse(full_path) logger.info(f"Serve index.html for {path}") return FileResponse(os.path.join(defines.static_content, "index.html")) def save_context(self, context_id): """ Serialize a Python dictionary to a file in the agents directory. Args: data: Dictionary containing the agent data context_id: UUID string for the context. If it doesn't exist, it is created Returns: The context_id used for the file """ context = self.upsert_context(context_id) # Create agents directory if it doesn't exist if not os.path.exists(defines.context_dir): os.makedirs(defines.context_dir) # Create the full file path file_path = os.path.join(defines.context_dir, context_id) # Serialize the data to JSON and write to file with open(file_path, "w") as f: f.write(context.model_dump_json(by_alias=True)) return context_id def load_or_create_context(self, context_id) -> Context: """ Load a context from a file in the context directory or create a new one if it doesn't exist. Args: context_id: UUID string for the context. Returns: A Context object with the specified ID and default settings. """ if not self.file_watcher: raise Exception("File watcher not initialized") file_path = os.path.join(defines.context_dir, context_id) # Check if the file exists if not os.path.exists(file_path): logger.info(f"Context file {file_path} not found. Creating new context.") self.contexts[context_id] = self.create_context(context_id) else: # Read and deserialize the data with open(file_path, "r") as f: content = f.read() logger.info(f"Loading context from {file_path}, content length: {len(content)}") import json try: # Try parsing as JSON first to ensure valid JSON json_data = json.loads(content) logger.info("JSON parsed successfully, attempting model validation") # Now try Pydantic validation self.contexts[context_id] = Context.model_validate_json(content) self.contexts[context_id].file_watcher=self.file_watcher logger.info(f"Successfully loaded context {context_id}") except json.JSONDecodeError as e: logger.error(f"Invalid JSON in file: {e}") except Exception as e: logger.error(f"Error validating context: {str(e)}") import traceback logger.error(traceback.format_exc()) # Fallback to creating a new context self.contexts[context_id] = Context(id=context_id, file_watcher=self.file_watcher) return self.contexts[context_id] def create_context(self, context_id = None) -> Context: """ Create a new context with a unique ID and default settings. Args: context_id: Optional UUID string for the context. If not provided, a new UUID is generated. Returns: A Context object with the specified ID and default settings. """ if not self.file_watcher: raise Exception("File watcher not initialized") if not context_id: context_id = str(uuid4()) logger.info(f"Creating new context with ID: {context_id}") context = Context(id=context_id, file_watcher=self.file_watcher) if os.path.exists(defines.resume_doc): context.user_resume = open(defines.resume_doc, "r").read() context.get_or_create_agent(agent_type="chat") # system_prompt=system_message) # context.add_agent(Resume(system_prompt = system_generate_resume)) # context.add_agent(JobDescription(system_prompt = system_job_description)) # context.add_agent(FactCheck(system_prompt = system_fact_check)) context.tools = Tools.enabled_tools(Tools.tools) context.rags = rags.copy() logger.info(f"{context.id} created and added to contexts.") self.contexts[context.id] = context self.save_context(context.id) return context def upsert_context(self, context_id = None) -> Context: """ Upsert a context based on the provided context_id. Args: context_id: UUID string for the context. If it doesn't exist, a new context is created. Returns: A Context object with the specified ID and default settings. """ if not context_id: logger.warning("No context ID provided. Creating a new context.") return self.create_context() if context_id in self.contexts: return self.contexts[context_id] logger.info(f"Context {context_id} is not yet loaded.") return self.load_or_create_context(context_id) @REQUEST_TIME.time() async def generate_response(self, context : Context, agent : Agent, prompt : str, options: Tunables | None) -> AsyncGenerator[Message, None]: if not self.file_watcher: raise Exception("File watcher not initialized") agent_type = agent.get_agent_type() logger.info(f"generate_response: type - {agent_type}") message = Message(prompt=prompt, options=agent.tunables) if options: message.tunables = options async for message in agent.prepare_message(message): # logger.info(f"{agent_type}.prepare_message: {value.status} - {value.response}") if message.status == "error": yield message return if message.status != "done": yield message async for message in agent.process_message(self.llm, self.model, message): if message.status == "error": yield message return if message.status != "done": yield message logger.info(f"{agent_type}.process_message: {message.status} {f'...{message.response[-20:]}' if len(message.response) > 20 else message.response}") message.status = "done" yield message return if self.processing: logger.info("TODO: Implement delay queing; busy for same agent, otherwise return queue size and estimated wait time") yield {"status": "error", "message": "Busy processing another request."} return self.processing = True conversation : Conversation = agent.conversation message = Message(prompt=content) del content # Prevent accidental use of content # Default to not using tools enable_tools = False # Default to using RAG if there is content to check if message.prompt: enable_rag = True else: enable_rag = False # RAG is disabled when asking questions about the resume if agent.get_agent_type() == "resume": enable_rag = False # The first time through each agent agent_type a content_seed may be set for # future chat agents; use it once, then clear it message.preamble = agent.get_and_reset_content_seed() system_prompt = agent.system_prompt # After the first time a particular agent agent_type is used, it is handled as a chat. # The number of messages indicating the agent is ready for chat varies based on # the agent_type of agent process_type = agent.get_agent_type() match process_type: case "job_description": logger.info(f"job_description user_history len: {len(conversation.messages)}") if len(conversation.messages) >= 2: # USER, ASSISTANT process_type = "chat" case "resume": logger.info(f"resume user_history len: {len(conversation.messages)}") if len(conversation.messages) >= 3: # USER, ASSISTANT, FACT_CHECK process_type = "chat" case "fact_check": process_type = "chat" # Fact Check is always a chat agent match process_type: # Normal chat interactions with context history case "chat": if not message.prompt: yield {"status": "error", "message": "No query provided for chat."} logger.info(f"user_history len: {len(conversation.messages)}") self.processing = False return enable_tools = True # Generate RAG content if enabled, based on the content rag_context = "" if enable_rag: # Initialize metadata["rag"] to None or a default value message.metadata["rag"] = None for value in self.generate_rag_results(context, message.prompt): if "status" in value: yield value else: if value.get("documents") or value.get("rag") is not None: message.metadata["rag"] = value if message.metadata["rag"]: for doc in message.metadata["rag"]["documents"]: rag_context += f"{doc}\n" if rag_context: message.preamble = f""" <|context|> {rag_context} """ if context.user_resume: message.preamble += f""" <|resume|> {context.user_resume} """ message.preamble += """ <|rules|> - If there is information in the <|context|> or <|resume|> sections to enhance the answer, incorporate it seamlessly and refer to it using natural language instead of mentioning '<|context|>' or '<|resume|> or quoting it directly. - Avoid phrases like 'According to the <|context|>' or similar references to the <|context|> or <|resume|>. <|question|> Use that information to respond to:""" # Use the mode specific system_prompt instead of 'chat' system_prompt = agent.system_prompt # On first entry, a single job_description is provided ("user") # Generate a resume to append to RESUME history case "job_description": # Generate RAG content if enabled, based on the content rag_context = "" if enable_rag: # Initialize metadata["rag"] to None or a default value message.metadata["rag"] = None for value in self.generate_rag_results(context, message.prompt): if "status" in value: yield value else: if value.get("documents") or value.get("rag") is not None: message.metadata["rag"] = value if message.metadata["rag"]: for doc in message.metadata["rag"]["documents"]: rag_context += f"{doc}\n" message.preamble = "" if rag_context: message.preamble += f""" <|context|> {rag_context} """ if context.user_resume: message.preamble += f""" <|resume|> {context.user_resume} """ message.preamble += f""" <|job_description|> {message.prompt} """ tmp = context.get_agent("job_description") if not tmp: raise Exception(f"Job description agent not found.") # Set the content seed for the job_description agent tmp.set_content_seed(message.preamble + "<|question|>\nUse the above information to respond to this prompt: ") message.preamble += f""" <|rules|> 1. Use the above <|resume|> and <|context|> to create the resume for the <|job_description|>. 2. Do not use content from the <|job_description|> in the response unless the <|context|> or <|resume|> mentions them. <|question|> Use to the above information to respond to this prompt: """ # For all future calls to job_description, use the system_job_description agent.system_prompt = system_job_description # Seed the history for job_description stuffingMessage = Message(prompt=message.prompt) stuffingMessage.response = "Job description stored to use in future queries." stuffingMessage.metadata["origin"] = "job_description" stuffingMessage.metadata["display"] = "hide" conversation.add(stuffingMessage) message.add_action("generate_resume") logger.info("TODO: Convert these to generators, eg generate_resume() and then manually add results into agent'resume'") logger.info("TODO: For subsequent runs, have the Agent handler generate the follow up prompts so they can have correct context preamble") # Switch to resume agent for LLM responses # message.metadata["origin"] = "resume" # agent = context.get_or_create_agent("resume") # system_prompt = agent.system_prompt # llm_history = agent.llm_history = [] # user_history = agent.user_history = [] # Ignore the passed in content and invoke Fact Check case "resume": if len(context.get_or_create_agent("resume").conversation.messages) < 2: # USER, **ASSISTANT** raise Exception(f"No resume found in user history.") resume = context.get_or_create_agent("resume").conversation.messages[1] # Generate RAG content if enabled, based on the content rag_context = "" if enable_rag: # Initialize metadata["rag"] to None or a default value message.metadata["rag"] = None for value in self.generate_rag_results(context, resume["content"]): if "status" in value: yield value else: if value.get("documents") or value.get("rag") is not None: message.metadata["rag"] = value if message.metadata["rag"]: for doc in message.metadata["rag"]["documents"]: rag_context += f"{doc}\n" # This is being passed to Fact Check, so do not provide the <|job_description|> message.preamble = f"" if rag_context: message.preamble += f""" <|context|> {rag_context} """ if context.user_resume: # Do not prefix the resume with <|resume|>; just add to the <|context|> message.preamble += f""" {context.user_resume} """ message.preamble += f""" <|resume|> {resume['content']} <|rules|> 1. Do not invent or assume any information not explicitly present in the <|context|>. 2. Analyze the <|resume|> to identify any discrepancies or inaccuracies based on the <|context|>. <|question|> """ context.get_or_create_agent("resume").set_content_seed(f""" <|resume|> {resume["content"]} <|question|> Use the above <|resume|> and <|job_description|> to answer this query: """) message.prompt = "Fact check the resume and report discrepancies." # Seed the history for resume messages = [ { "role": "user", "content": "Fact check resume", "origin": "resume", "display": "hide" }, { "role": "assistant", "content": "Resume fact checked.", "origin": "resume", "display": "hide" } ] # Do not add this to the LLM history; it is only used for UI presentation stuffingMessage = Message(prompt="Fact check resume") stuffingMessage.response = "Resume fact checked." stuffingMessage.metadata["origin"] = "resume" stuffingMessage.metadata["display"] = "hide" stuffingMessage.actions = [ "fact_check" ] logger.info("TODO: Switch this to use actions to keep the UI from showingit") conversation.add(stuffingMessage) # For all future calls to job_description, use the system_job_description logger.info("TODO: Create a system_resume_QA prompt to use for the resume agent") agent.system_prompt = system_prompt # Switch to fact_check agent for LLM responses message.metadata["origin"] = "fact_check" agent = context.get_or_create_agent("fact_check", system_prompt=system_fact_check) llm_history = agent.llm_history = [] user_history = agent.user_history = [] case _: raise Exception(f"Invalid chat agent_type: {agent_type}") conversation.add(message) # llm_history.append({"role": "user", "content": message.preamble + content}) # user_history.append({"role": "user", "content": content, "origin": message.metadata["origin"]}) # message.metadata["full_query"] = llm_history[-1]["content"] # Uses cached system_prompt as agent.system_prompt may have been updated for follow up questions messages = create_system_message(system_prompt) if context.message_history_length: to_add = conversation.messages[-context.message_history_length:] else: to_add = conversation.messages for m in to_add: messages.extend([ { "role": "user", "content": m.content, }, { "role": "assistant", "content": m.response, } ]) message.content = message.preamble + message.prompt # To send to the LLM messages.append({ "role": "user", "content": message.content }) # Add the system message to the beginning of the messages list message.content = f""" <|system_prompt|> {system_prompt} {message.preamble} {message.prompt}""" # Estimate token length of new messages ctx_size = self.get_optimal_ctx_size(context.get_or_create_agent(process_type).context_tokens, messages=message.prompt) if len(conversation.messages) > 2: processing_message = f"Processing {'RAG augmented ' if enable_rag else ''}query..." else: match agent.get_agent_type(): case "job_description": processing_message = f"Generating {'RAG augmented ' if enable_rag else ''}resume..." case "resume": processing_message = f"Fact Checking {'RAG augmented ' if enable_rag else ''}resume..." case _: processing_message = f"Processing {'RAG augmented ' if enable_rag else ''}query..." yield {"status": "processing", "message": processing_message, "num_ctx": ctx_size} # Use the async generator in an async for loop try: if enable_tools: response = self.llm.chat(model=self.model, messages=messages, tools=llm_tools(context.tools), options={ "num_ctx": ctx_size }) else: response = self.llm.chat(model=self.model, messages=messages, options={ "num_ctx": ctx_size }) except Exception as e: logger.exception({ "model": self.model, "error": str(e) }) yield {"status": "error", "message": f"An error occurred communicating with LLM"} self.processing = False return message.metadata["eval_count"] += response["eval_count"] message.metadata["eval_duration"] += response["eval_duration"] message.metadata["prompt_eval_count"] += response["prompt_eval_count"] message.metadata["prompt_eval_duration"] += response["prompt_eval_duration"] agent.context_tokens = response["prompt_eval_count"] + response["eval_count"] tools_used = [] yield {"status": "processing", "message": "Initial response received..."} if "tool_calls" in response.get("message", {}): yield {"status": "processing", "message": "Processing tool calls..."} tool_message = response["message"] tool_result = None # Process all yielded items from the handler async for item in self.handle_tool_calls(tool_message): if isinstance(item, tuple) and len(item) == 2: # This is the final result tuple (tool_result, tools_used) tool_result, tools_used = item else: # This is a status update, forward it yield item message_dict = { "role": tool_message.get("role", "assistant"), "content": tool_message.get("content", "") } if "tool_calls" in tool_message: message_dict["tool_calls"] = [ {"function": {"name": tc["function"]["name"], "arguments": tc["function"]["arguments"]}} for tc in tool_message["tool_calls"] ] pre_add_index = len(messages) messages.append(message_dict) if isinstance(tool_result, list): messages.extend(tool_result) else: if tool_result: messages.append(tool_result) message.metadata["tools"] = tools_used # Estimate token length of new messages ctx_size = self.get_optimal_ctx_size(agent.context_tokens, messages=messages[pre_add_index:]) yield {"status": "processing", "message": "Generating final response...", "num_ctx": ctx_size } # Decrease creativity when processing tool call requests response = self.llm.chat(model=self.model, messages=messages, stream=False, options={ "num_ctx": ctx_size }) #, "temperature": 0.5 }) message.metadata["eval_count"] += response["eval_count"] message.metadata["eval_duration"] += response["eval_duration"] message.metadata["prompt_eval_count"] += response["prompt_eval_count"] message.metadata["prompt_eval_duration"] += response["prompt_eval_duration"] agent.context_tokens = response["prompt_eval_count"] + response["eval_count"] reply = response["message"]["content"] message.response = reply message.metadata["origin"] = agent.get_agent_type() # final_message = {"role": "assistant", "content": reply } # # history is provided to the LLM and should not have additional metadata # llm_history.append(final_message) # user_history is provided to the REST API and does not include CONTEXT # It does include metadata # final_message["metadata"] = message.metadata # user_history.append({**final_message, "origin": message.metadata["origin"]}) # Return the REST API with metadata yield { "status": "done", "message": { **message.model_dump(mode='json'), } } # except Exception as e: # logger.exception({ "model": self.model, "origin": agent_type, "content": content, "error": str(e) }) # yield {"status": "error", "message": f"An error occurred: {str(e)}"} # finally: # self.processing = False self.processing = False return def run(self, host="0.0.0.0", port=WEB_PORT, **kwargs): try: if self.ssl_enabled: logger.info(f"Starting web server at https://{host}:{port}") uvicorn.run( self.app, host=host, port=port, log_config=None, ssl_keyfile=defines.key_path, ssl_certfile=defines.cert_path ) else: logger.info(f"Starting web server at http://{host}:{port}") uvicorn.run( self.app, host=host, port=port, log_config=None ) except KeyboardInterrupt: if self.observer: self.observer.stop() if self.observer: self.observer.join() # %% # Main function to run everything def main(): global model # Parse command-line arguments args = parse_args() # Setup logging based on the provided level logger.setLevel(args.level.upper()) warnings.filterwarnings( "ignore", category=FutureWarning, module="sklearn.*" ) warnings.filterwarnings( "ignore", category=UserWarning, module="umap.*" ) llm = ollama.Client(host=args.ollama_server) # type: ignore model = args.ollama_model web_server = WebServer(llm, model) web_server.run(host=args.web_host, port=args.web_port, use_reloader=False) main()