From dc551963114250051999d38496b82372237983c5 Mon Sep 17 00:00:00 2001 From: James Ketrenos Date: Mon, 28 Apr 2025 23:12:16 -0700 Subject: [PATCH] Rework in progress --- Dockerfile | 2 +- frontend/src/App.tsx | 1 + frontend/src/Controls.tsx | 346 ++++---- frontend/src/Conversation.tsx | 9 +- frontend/src/DeleteConfirmation.tsx | 24 +- frontend/src/Message.tsx | 118 ++- frontend/src/ResumeBuilder.tsx | 18 +- src/server.py | 1236 ++++++++++++++------------- src/utils/__init__.py | 8 +- src/utils/context.py | 98 +++ src/utils/conversation.py | 23 + src/utils/message.py | 31 + src/utils/session.py | 78 ++ 13 files changed, 1131 insertions(+), 861 deletions(-) create mode 100644 src/utils/context.py create mode 100644 src/utils/conversation.py create mode 100644 src/utils/message.py create mode 100644 src/utils/session.py diff --git a/Dockerfile b/Dockerfile index b7516ac..d3ae56f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -257,7 +257,7 @@ FROM llm-base AS backstory COPY /src/requirements.txt /opt/backstory/src/requirements.txt RUN pip install -r /opt/backstory/src/requirements.txt -RUN pip install 'markitdown[all]' +RUN pip install 'markitdown[all]' pydantic SHELL [ "/bin/bash", "-c" ] diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index c4c323b..113dbe6 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -281,6 +281,7 @@ const App = () => { throw Error("Server is temporarily down."); } const data = await response.json(); + console.log(`Session created: ${data.id}`); setSessionId(data.id); const newPath = `/${data.id}`; diff --git a/frontend/src/Controls.tsx b/frontend/src/Controls.tsx index f74d1bf..0695fe6 100644 --- a/frontend/src/Controls.tsx +++ b/frontend/src/Controls.tsx @@ -16,17 +16,20 @@ import ExpandMoreIcon from '@mui/icons-material/ExpandMore'; import { SetSnackType } from './Snack'; +interface ServerTunables { + system_prompt: string, + message_history_length: number, + tools: Tool[], + rags: Tool[] +}; + type Tool = { type: string, - function?: { - name: string, - description: string, - parameters?: any, - returns?: any - }, - name?: string, - description?: string, enabled: boolean + name: string, + description: string, + parameters?: any, + returns?: any }; interface ControlsParams { @@ -41,7 +44,6 @@ type GPUInfo = { discrete: boolean } - type SystemInfo = { "Installed RAM": string, "Graphics Card": GPUInfo[], @@ -94,116 +96,111 @@ const Controls = ({ sessionId, setSnack, connectionBase }: ControlsParams) => { const [tools, setTools] = useState([]); const [rags, setRags] = useState([]); const [systemPrompt, setSystemPrompt] = useState(""); - const [serverSystemPrompt, setServerSystemPrompt] = useState(""); const [messageHistoryLength, setMessageHistoryLength] = useState(5); + const [serverTunables, setServerTunables] = useState(undefined); - useEffect(() => { - if (systemPrompt === serverSystemPrompt || !systemPrompt.trim() || sessionId === undefined) { - return; - } - const sendSystemPrompt = async (prompt: string) => { - try { - const response = await fetch(connectionBase + `/api/tunables/${sessionId}`, { - method: 'PUT', - headers: { - 'Content-Type': 'application/json', - 'Accept': 'application/json', - }, - body: JSON.stringify({ "system_prompt": prompt }), - }); - - const data = await response.json(); - const newPrompt = data["system_prompt"]; - if (newPrompt !== serverSystemPrompt) { - setServerSystemPrompt(newPrompt); - setSystemPrompt(newPrompt) - setSnack("System prompt updated", "success"); - } - } catch (error) { - console.error('Fetch error:', error); - setSnack("System prompt update failed", "error"); - } - }; - - sendSystemPrompt(systemPrompt); - - }, [systemPrompt, setServerSystemPrompt, serverSystemPrompt, connectionBase, sessionId, setSnack]); - - useEffect(() => { - if (sessionId === undefined) { - return; - } - const sendMessageHistoryLength = async (length: number) => { - try { - const response = await fetch(connectionBase + `/api/tunables/${sessionId}`, { - method: 'PUT', - headers: { - 'Content-Type': 'application/json', - 'Accept': 'application/json', - }, - body: JSON.stringify({ "message_history_length": length }), - }); - - const data = await response.json(); - const newLength = data["message_history_length"]; - if (newLength !== messageHistoryLength) { - setMessageHistoryLength(newLength); - setSnack("Message history length updated", "success"); - } - } catch (error) { - console.error('Fetch error:', error); - setSnack("Message history length update failed", "error"); - } - }; - - sendMessageHistoryLength(messageHistoryLength); - - }, [messageHistoryLength, setMessageHistoryLength, connectionBase, sessionId, setSnack]); - - const reset = async (types: ("rags" | "tools" | "history" | "system_prompt" | "message_history_length")[], message: string = "Update successful.") => { + useEffect(() => { + if (serverTunables === undefined || systemPrompt === serverTunables.system_prompt || !systemPrompt.trim() || sessionId === undefined) { + return; + } + const sendSystemPrompt = async (prompt: string) => { try { - const response = await fetch(connectionBase + `/api/reset/${sessionId}`, { + const response = await fetch(connectionBase + `/api/tunables/${sessionId}`, { method: 'PUT', headers: { 'Content-Type': 'application/json', 'Accept': 'application/json', }, - body: JSON.stringify({ "reset": types }), + body: JSON.stringify({ "system_prompt": prompt }), }); - - if (response.ok) { - const data = await response.json(); - if (data.error) { - throw Error() - } - for (const [key, value] of Object.entries(data)) { - switch (key) { - case "rags": - setRags(value as Tool[]); - break; - case "tools": - setTools(value as Tool[]); - break; - case "system_prompt": - setServerSystemPrompt((value as any)["system_prompt"].trim()); - setSystemPrompt((value as any)["system_prompt"].trim()); - break; - case "history": - console.log('TODO: handle history reset'); - break; - } - } - setSnack(message, "success"); - } else { - throw Error(`${{ status: response.status, message: response.statusText }}`); + + const tunables = await response.json(); + serverTunables.system_prompt = tunables.system_prompt; + setSystemPrompt(tunables.system_prompt) + setSnack("System prompt updated", "success"); + } catch (error) { + console.error('Fetch error:', error); + setSnack("System prompt update failed", "error"); + } + }; + + sendSystemPrompt(systemPrompt); + + }, [systemPrompt, connectionBase, sessionId, setSnack, serverTunables]); + + useEffect(() => { + if (serverTunables === undefined || messageHistoryLength === serverTunables.message_history_length || !messageHistoryLength || sessionId === undefined) { + return; + } + const sendMessageHistoryLength = async (length: number) => { + try { + const response = await fetch(connectionBase + `/api/tunables/${sessionId}`, { + method: 'PUT', + headers: { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + }, + body: JSON.stringify({ "message_history_length": length }), + }); + + const data = await response.json(); + const newLength = data["message_history_length"]; + if (newLength !== messageHistoryLength) { + setMessageHistoryLength(newLength); + setSnack("Message history length updated", "success"); } } catch (error) { console.error('Fetch error:', error); - setSnack("Unable to restore defaults", "error"); + setSnack("Message history length update failed", "error"); } }; - - + + sendMessageHistoryLength(messageHistoryLength); + + }, [messageHistoryLength, setMessageHistoryLength, connectionBase, sessionId, setSnack, serverTunables]); + + const reset = async (types: ("rags" | "tools" | "history" | "system_prompt" | "message_history_length")[], message: string = "Update successful.") => { + try { + const response = await fetch(connectionBase + `/api/reset/${sessionId}`, { + method: 'PUT', + headers: { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + }, + body: JSON.stringify({ "reset": types }), + }); + + if (response.ok) { + const data = await response.json(); + if (data.error) { + throw Error() + } + for (const [key, value] of Object.entries(data)) { + switch (key) { + case "rags": + setRags(value as Tool[]); + break; + case "tools": + setTools(value as Tool[]); + break; + case "system_prompt": + setSystemPrompt((value as ServerTunables)["system_prompt"].trim()); + break; + case "history": + console.log('TODO: handle history reset'); + break; + } + } + setSnack(message, "success"); + } else { + throw Error(`${{ status: response.status, message: response.statusText }}`); + } + } catch (error) { + console.error('Fetch error:', error); + setSnack("Unable to restore defaults", "error"); + } + }; + // Get the system information useEffect(() => { if (systemInfo !== undefined || sessionId === undefined) { @@ -229,21 +226,20 @@ const Controls = ({ sessionId, setSnack, connectionBase }: ControlsParams) => { setEditSystemPrompt(systemPrompt); }, [systemPrompt, setEditSystemPrompt]); - const toggleRag = async (tool: Tool) => { tool.enabled = !tool.enabled try { - const response = await fetch(connectionBase + `/api/rags/${sessionId}`, { + const response = await fetch(connectionBase + `/api/tunables/${sessionId}`, { method: 'PUT', headers: { 'Content-Type': 'application/json', 'Accept': 'application/json', }, - body: JSON.stringify({ "tool": tool?.name, "enabled": tool.enabled }), + body: JSON.stringify({ "rags": [{ "name": tool?.name, "enabled": tool.enabled }] }), }); - const rags = await response.json(); - setRags([...rags]) + const tunables: ServerTunables = await response.json(); + setRags(tunables.rags) setSnack(`${tool?.name} ${tool.enabled ? "enabled" : "disabled"}`); } catch (error) { console.error('Fetch error:', error); @@ -255,117 +251,63 @@ const Controls = ({ sessionId, setSnack, connectionBase }: ControlsParams) => { const toggleTool = async (tool: Tool) => { tool.enabled = !tool.enabled try { - const response = await fetch(connectionBase + `/api/tools/${sessionId}`, { + const response = await fetch(connectionBase + `/api/tunables/${sessionId}`, { method: 'PUT', headers: { 'Content-Type': 'application/json', 'Accept': 'application/json', }, - body: JSON.stringify({ "tool": tool?.function?.name, "enabled": tool.enabled }), + body: JSON.stringify({ "tools": [{ "name": tool.name, "enabled": tool.enabled }] }), }); - const tools = await response.json(); - setTools([...tools]) - setSnack(`${tool?.function?.name} ${tool.enabled ? "enabled" : "disabled"}`); + const tunables: ServerTunables = await response.json(); + setTools(tunables.tools) + setSnack(`${tool.name} ${tool.enabled ? "enabled" : "disabled"}`); } catch (error) { console.error('Fetch error:', error); - setSnack(`${tool?.function?.name} ${tool.enabled ? "enabling" : "disabling"} failed.`, "error"); + setSnack(`${tool.name} ${tool.enabled ? "enabling" : "disabling"} failed.`, "error"); tool.enabled = !tool.enabled } }; - // If the tools have not been set, fetch them from the server + // If the systemPrompt has not been set, fetch it from the server useEffect(() => { - if (tools.length || sessionId === undefined) { + if (serverTunables !== undefined || sessionId === undefined) { return; } - const fetchTools = async () => { - try { - // Make the fetch request with proper headers - const response = await fetch(connectionBase + `/api/tools/${sessionId}`, { - method: 'GET', - headers: { - 'Content-Type': 'application/json', - 'Accept': 'application/json', - }, - }); - if (!response.ok) { - throw Error(); - } - const tools = await response.json(); - setTools(tools); - } catch (error: any) { - setSnack("Unable to fetch tools", "error"); - console.error(error); - } + const fetchTunables = async () => { + // Make the fetch request with proper headers + const response = await fetch(connectionBase + `/api/tunables/${sessionId}`, { + method: 'GET', + headers: { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + }, + }); + const data = await response.json(); + console.log("Server tunables: ", data); + setServerTunables(data); + setSystemPrompt(data["system_prompt"]); + setMessageHistoryLength(data["message_history_length"]); + setTools(data["tools"]); + setRags(data["rags"]); } - fetchTools(); - }, [sessionId, tools, setTools, setSnack, connectionBase]); - - // If the RAGs have not been set, fetch them from the server - useEffect(() => { - if (rags.length || sessionId === undefined) { - return; - } - const fetchRags = async () => { - try { - // Make the fetch request with proper headers - const response = await fetch(connectionBase + `/api/rags/${sessionId}`, { - method: 'GET', - headers: { - 'Content-Type': 'application/json', - 'Accept': 'application/json', - }, - }); - if (!response.ok) { - throw Error(); - } - const rags = await response.json(); - setRags(rags); - } catch (error: any) { - setSnack("Unable to fetch RAGs", "error"); - console.error(error); - } - } - - fetchRags(); - }, [sessionId, rags, setRags, setSnack, connectionBase]); - - // If the systemPrompt has not been set, fetch it from the server - useEffect(() => { - if (serverSystemPrompt !== "" || sessionId === undefined) { - return; - } - const fetchTunables = async () => { - // Make the fetch request with proper headers - const response = await fetch(connectionBase + `/api/tunables/${sessionId}`, { - method: 'GET', - headers: { - 'Content-Type': 'application/json', - 'Accept': 'application/json', - }, - }); - const data = await response.json(); - const serverSystemPrompt = data["system_prompt"].trim(); - setServerSystemPrompt(serverSystemPrompt); - setSystemPrompt(serverSystemPrompt); - setMessageHistoryLength(data["message_history_length"]); - } - - fetchTunables(); - }, [sessionId, serverSystemPrompt, setServerSystemPrompt, connectionBase]); - - - - + fetchTunables(); + }, [sessionId, connectionBase, setServerTunables, setSystemPrompt, setMessageHistoryLength, serverTunables, setTools, setRags]); const toggle = async (type: string, index: number) => { switch (type) { case "rag": + if (rags === undefined) { + return; + } toggleRag(rags[index]) break; case "tool": + if (tools === undefined) { + return; + } toggleTool(tools[index]); } }; @@ -442,11 +384,11 @@ const Controls = ({ sessionId, setSnack, connectionBase }: ControlsParams) => { { - tools.map((tool, index) => + (tools || []).map((tool, index) => - } onChange={() => toggle("tool", index)} label={tool?.function?.name} /> - {tool?.function?.description} + } onChange={() => toggle("tool", index)} label={tool.name} /> + {tool.description} ) } @@ -463,14 +405,14 @@ const Controls = ({ sessionId, setSnack, connectionBase }: ControlsParams) => { { - rags.map((rag, index) => + (rags || []).map((rag, index) => } - onChange={() => toggle("rag", index)} label={rag?.name} + onChange={() => toggle("rag", index)} label={rag.name} /> - {rag?.description} + {rag.description} ) } diff --git a/frontend/src/Conversation.tsx b/frontend/src/Conversation.tsx index 7db7954..ce544a8 100644 --- a/frontend/src/Conversation.tsx +++ b/frontend/src/Conversation.tsx @@ -160,14 +160,15 @@ const Conversation = forwardRef(({ throw new Error(`Server responded with ${response.status}: ${response.statusText}`); } - const data = await response.json(); + const { messages } = await response.json(); - console.log(`History returned for ${type} from server with ${data.length} entries`) - if (data.length === 0) { + if (messages === undefined || messages.length === 0) { + console.log(`History returned for ${type} from server with 0 entries`) setConversation([]) setNoInteractions(true); } else { - setConversation(data); + console.log(`History returned for ${type} from server with ${messages.length} entries:`, messages) + setConversation(messages); setNoInteractions(false); } setProcessingMessage(undefined); diff --git a/frontend/src/DeleteConfirmation.tsx b/frontend/src/DeleteConfirmation.tsx index 84e0900..9fdcad7 100644 --- a/frontend/src/DeleteConfirmation.tsx +++ b/frontend/src/DeleteConfirmation.tsx @@ -42,17 +42,19 @@ const DeleteConfirmation = (props : DeleteConfirmationProps) => { return ( <> - - - + { /* This span is used to wrap the IconButton to ensure Tooltip works even when disabled */} + + + + , - message?: MessageData, + message: MessageData, isFullWidth?: boolean, submitQuery?: (text: string) => void, sessionId?: string, @@ -78,8 +78,25 @@ interface ChatQueryInterface { submitQuery?: (text: string) => void } +interface MessageMetaProps { + metadata: MessageMetaData, + messageProps: MessageProps +}; + + +const MessageMeta = (props: MessageMetaProps) => { + const { + /* MessageData */ + full_query, + rag, + tools, + eval_count, + eval_duration, + prompt_eval_count, + prompt_eval_duration, + } = props.metadata || {}; + const messageProps = props.messageProps; -const MessageMeta = ({ ...props }: MessageMetaProps) => { return (<> Below is the LLM performance of this query. Note that if tools are called, the @@ -99,28 +116,28 @@ const MessageMeta = ({ ...props }: MessageMetaProps) => { Prompt - {props.prompt_eval_count} - {Math.round(props.prompt_eval_duration / 10 ** 7) / 100} - {Math.round(props.prompt_eval_count * 10 ** 9 / props.prompt_eval_duration)} + {prompt_eval_count} + {Math.round(prompt_eval_duration / 10 ** 7) / 100} + {Math.round(prompt_eval_count * 10 ** 9 / prompt_eval_duration)} Response - {props.eval_count} - {Math.round(props.eval_duration / 10 ** 7) / 100} - {Math.round(props.eval_count * 10 ** 9 / props.eval_duration)} + {eval_count} + {Math.round(eval_duration / 10 ** 7) / 100} + {Math.round(eval_count * 10 ** 9 / eval_duration)} Total - {props.prompt_eval_count + props.eval_count} - {Math.round((props.prompt_eval_duration + props.eval_duration) / 10 ** 7) / 100} - {Math.round((props.prompt_eval_count + props.eval_count) * 10 ** 9 / (props.prompt_eval_duration + props.eval_duration))} + {prompt_eval_count + eval_count} + {Math.round((prompt_eval_duration + eval_duration) / 10 ** 7) / 100} + {Math.round((prompt_eval_count + eval_count) * 10 ** 9 / (prompt_eval_duration + eval_duration))} { - props?.full_query !== undefined && + full_query !== undefined && }> @@ -128,12 +145,12 @@ const MessageMeta = ({ ...props }: MessageMetaProps) => { -
{props.full_query.trim()}
+
{full_query?.trim()}
} { - props.tools !== undefined && props.tools.length !== 0 && + tools !== undefined && tools.length !== 0 && }> @@ -141,7 +158,7 @@ const MessageMeta = ({ ...props }: MessageMetaProps) => { - {props.tools.map((tool: any, index: number) => + {tools.map((tool: any, index: number) => {index !== 0 && }
@@ -165,24 +182,24 @@ const MessageMeta = ({ ...props }: MessageMetaProps) => { } { - props?.rag?.name !== undefined && <> + rag?.name !== undefined && <> }> - Top RAG {props.rag.ids.length} matches from '{props.rag.name}' collection against embedding vector of {props.rag.query_embedding.length} dimensions + Top RAG {rag.ids.length} matches from '{rag.name}' collection against embedding vector of {rag.query_embedding.length} dimensions - {props.rag.ids.map((id: number, index: number) => + {rag.ids.map((id: number, index: number) => {index !== 0 && }
-
Doc ID: {props.rag.ids[index].slice(-10)}
-
Similarity: {Math.round(props.rag.distances[index] * 100) / 100}
-
Type: {props.rag.metadatas[index].doc_type}
-
Chunk Len: {props.rag.documents[index].length}
+
Doc ID: {rag.ids[index].slice(-10)}
+
Similarity: {Math.round(rag.distances[index] * 100) / 100}
+
Type: {rag.metadatas[index].doc_type}
+
Chunk Len: {rag.documents[index].length}
-
{props.rag.documents[index]}
+
{rag.documents[index]}
)} @@ -195,13 +212,53 @@ const MessageMeta = ({ ...props }: MessageMetaProps) => {
- +
+ + }> + + All response fields + + + + {Object.entries(props.messageProps.message) + .filter(([key, value]) => key !== undefined && value !== undefined) + .map(([key, value]) => (<> + {(typeof (value) !== "string" || value?.trim() !== "") && + + }> + {key} + + + {key === "metadata" && + Object.entries(value) + .filter(([key, value]) => key !== undefined && value !== undefined) + .map(([key, value]) => ( + + }> + {key} + + +
{`${typeof (value) !== "object" ? value : JSON.stringify(value)}`}
+
+
+ ))} + {key !== "metadata" && +
{typeof (value) !== "object" ? value : JSON.stringify(value)}
+ } +
+
+ } + + ) + )} +
+
+ } - - ); + ); }; const ChatQuery = ({ text, submitQuery }: ChatQueryInterface) => { @@ -221,7 +278,7 @@ const ChatQuery = ({ text, submitQuery }: ChatQueryInterface) => { } const Message = (props: MessageProps) => { - const { message, submitQuery, isFullWidth, sessionId, setSnack, connectionBase, sx, className } = props; + const { message, submitQuery, isFullWidth, sx, className } = props; const [expanded, setExpanded] = useState(false); const textFieldRef = useRef(null); @@ -293,7 +350,7 @@ const Message = (props: MessageProps) => { {message.metadata && <> - + } @@ -305,7 +362,6 @@ export type { MessageProps, MessageList, ChatQueryInterface, - MessageMetaProps, MessageData, MessageRoles }; diff --git a/frontend/src/ResumeBuilder.tsx b/frontend/src/ResumeBuilder.tsx index cf9587a..45f7ec1 100644 --- a/frontend/src/ResumeBuilder.tsx +++ b/frontend/src/ResumeBuilder.tsx @@ -112,6 +112,12 @@ const ResumeBuilder: React.FC = ({ return keep; }); + /* If Resume hasn't occurred yet and there is still more than one message, + * resume has been generated. */ + if (!hasResume && reduced.length > 1) { + setHasResume(true); + } + if (reduced.length > 0) { // First message is always 'content' reduced[0].title = 'Job Description'; @@ -123,7 +129,7 @@ const ResumeBuilder: React.FC = ({ reduced = reduced.filter(m => m.display !== "hide"); return reduced; - }, [setHasJobDescription, setHasResume]); + }, [setHasJobDescription, setHasResume, hasResume]); const filterResumeMessages = useCallback((messages: MessageList): MessageList => { if (messages === undefined || messages.length === 0) { @@ -135,11 +141,11 @@ const ResumeBuilder: React.FC = ({ if ((m.metadata?.origin || m.origin || "no origin") === 'fact_check') { setHasFacts(true); } - // if (!keep) { - // console.log(`filterResumeMessages: ${i + 1} filtered:`, m); - // } else { - // console.log(`filterResumeMessages: ${i + 1}:`, m); - // } + if (!keep) { + console.log(`filterResumeMessages: ${i + 1} filtered:`, m); + } else { + console.log(`filterResumeMessages: ${i + 1}:`, m); + } return keep; }); diff --git a/src/server.py b/src/server.py index 67784f3..3f7a5b7 100644 --- a/src/server.py +++ b/src/server.py @@ -11,6 +11,8 @@ import uuid import subprocess import re import math +import warnings +from typing import Any def try_import(module_name, pip_name=None): try: @@ -42,6 +44,7 @@ from sklearn.preprocessing import MinMaxScaler from utils import ( rag as Rag, + Context, Conversation, Message, Chat, Resume, JobDescription, FactCheck, defines ) @@ -59,6 +62,81 @@ rags = [ # { "name": "LKML", "enabled": False, "description": "Full associative data for entire LKML mailing list archive." }, ] +system_message = f""" +Launched on {DateTime()}. + +When answering queries, follow these steps: + +- First analyze the query to determine if real-time information might be helpful +- Even when <|context|> is provided, consider whether the tools would provide more current or comprehensive information +- Use the provided tools whenever they would enhance your response, regardless of whether context is also available +- When presenting weather forecasts, include relevant emojis immediately before the corresponding text. For example, for a sunny day, say \"☀️ Sunny\" or if the forecast says there will be \"rain showers, say \"🌧️ Rain showers\". Use this mapping for weather emojis: Sunny: ☀️, Cloudy: ☁️, Rainy: 🌧️, Snowy: ❄️ +- When both <|context|> and tool outputs are relevant, synthesize information from both sources to provide the most complete answer +- Always prioritize the most up-to-date and relevant information, whether it comes from <|context|> or tools +- If <|context|> and tool outputs contain conflicting information, prefer the tool outputs as they likely represent more current data +- If there is information in the <|context|>, <|job_description|>, or <|work_history|> sections to enhance the answer, incorporate it seamlessly and refer to it as 'the latest information' or 'recent data' instead of mentioning '<|context|>' (etc.) or quoting it directly. +- Avoid phrases like 'According to the <|context|>' or similar references to the <|context|>, <|job_description|>, or <|work_history|> tags. + +Always use tools and <|context|> when possible. Be concise, and never make up information. If you do not know the answer, say so. +""" + +system_generate_resume = f""" +Launched on {DateTime()}. + +You are a professional resume writer. Your task is to write a concise, polished, and tailored resume for a specific job based only on the individual's <|work_history|>. + +When answering queries, follow these steps: + +- You must not invent or assume any inforation not explicitly present in the <|work_history|>. +- Analyze the <|job_description|> to identify skills required for the job. +- Use the <|job_description|> provided to guide the focus, tone, and relevant skills or experience to highlight from the <|work_history|>. +- Identify and emphasize the experiences, achievements, and responsibilities from the <|work_history|> that best align with the <|job_description|>. +- Only provide information from <|work_history|> items if it is relevant to the <|job_description|>. +- Do not use the <|job_description|> skills unless listed in <|work_history|>. +- Do not include any information unless it is provided in <|work_history|>. +- Use the <|work_history|> to create a polished, professional resume. +- Do not list any locations or mailing addresses in the resume. +- If there is information in the <|context|>, <|job_description|>, <|work_history|>, or <|resume|> sections to enhance the answer, incorporate it seamlessly and refer to it using natural language instead of mentioning '<|job_description|>' (etc.) or quoting it directly. +- Avoid phrases like 'According to the <|context|>' or similar references to the <|context|>, <|job_description|>, or <|work_history|> tags. +- Ensure the langauge is clear, concise, and aligned with industry standards for professional resumes. + +Structure the resume professionally with the following sections where applicable: + +* Name: Use full name +* Professional Summary: A 2-4 sentence overview tailored to the job. +* Skills: A bullet list of key skills derived from the work history and relevant to the job. +* Professional Experience: A detailed list of roles, achievements, and responsibilities from <|work_history|> that relate to the <|job_description|>. +* Education: Include only if available in the work history. +* Notes: Indicate the initial draft of the resume was generated using the Backstory application. + +""".strip() + +system_fact_check = f""" +Launched on {DateTime()}. + +You are a professional resume fact checker. Your task is to identify any inaccuracies in the <|resume|> based on the individual's <|work_history|>. + +If there are inaccuracies, list them in a bullet point format. + +When answering queries, follow these steps: +- You must not invent or assume any information not explicitly present in the <|work_history|>. +- Analyze the <|resume|> to identify any discrepancies or inaccuracies based on the <|work_history|>. +- If there is information in the <|context|>, <|job_description|>, <|work_history|>, or <|resume|> sections to enhance the answer, incorporate it seamlessly and refer to it using natural language instead of mentioning '<|job_description|>' (etc.) or quoting it directly. +- Avoid phrases like 'According to the <|context|>' or similar references to the <|context|>, <|job_description|>, <|resume|>, or <|work_history|> tags. +""".strip() + +system_job_description = f""" +Launched on {DateTime()}. + +You are a hiring and job placing specialist. Your task is to answers about a job description. + +When answering queries, follow these steps: +- Analyze the <|job_description|> to provide insights for the asked question. +- If any financial information is requested, be sure to account for inflation. +- If there is information in the <|context|>, <|job_description|>, <|work_history|>, or <|resume|> sections to enhance the answer, incorporate it seamlessly and refer to it using natural language instead of mentioning '<|job_description|>' (etc.) or quoting it directly. +- Avoid phrases like 'According to the <|context|>' or similar references to the <|context|>, <|job_description|>, <|resume|>, or <|work_history|> tags. +""".strip() + def get_installed_ram(): try: with open("/proc/meminfo", "r") as f: @@ -140,80 +218,6 @@ DEFAULT_HISTORY_LENGTH=5 # %% # Globals -system_message = f""" -Launched on {DateTime()}. - -When answering queries, follow these steps: - -- First analyze the query to determine if real-time information might be helpful -- Even when <|context|> is provided, consider whether the tools would provide more current or comprehensive information -- Use the provided tools whenever they would enhance your response, regardless of whether context is also available -- When presenting weather forecasts, include relevant emojis immediately before the corresponding text. For example, for a sunny day, say \"☀️ Sunny\" or if the forecast says there will be \"rain showers, say \"🌧️ Rain showers\". Use this mapping for weather emojis: Sunny: ☀️, Cloudy: ☁️, Rainy: 🌧️, Snowy: ❄️ -- When both <|context|> and tool outputs are relevant, synthesize information from both sources to provide the most complete answer -- Always prioritize the most up-to-date and relevant information, whether it comes from <|context|> or tools -- If <|context|> and tool outputs contain conflicting information, prefer the tool outputs as they likely represent more current data -- If there is information in the <|context|>, <|job_description|>, or <|work_history|> sections to enhance the answer, incorporate it seamlessly and refer to it as 'the latest information' or 'recent data' instead of mentioning '<|context|>' (etc.) or quoting it directly. -- Avoid phrases like 'According to the <|context|>' or similar references to the <|context|>, <|job_description|>, or <|work_history|> tags. - -Always use tools and <|context|> when possible. Be concise, and never make up information. If you do not know the answer, say so. -""" - -system_generate_resume = f""" -Launched on {DateTime()}. - -You are a professional resume writer. Your task is to write a concise, polished, and tailored resume for a specific job based only on the individual's <|work_history|>. - -When answering queries, follow these steps: - -- You must not invent or assume any inforation not explicitly present in the <|work_history|>. -- Analyze the <|job_description|> to identify skills required for the job. -- Use the <|job_description|> provided to guide the focus, tone, and relevant skills or experience to highlight from the <|work_history|>. -- Identify and emphasize the experiences, achievements, and responsibilities from the <|work_history|> that best align with the <|job_description|>. -- Only provide information from <|work_history|> items if it is relevant to the <|job_description|>. -- Do not use the <|job_description|> skills unless listed in <|work_history|>. -- Do not include any information unless it is provided in <|work_history|>. -- Use the <|work_history|> to create a polished, professional resume. -- Do not list any locations or mailing addresses in the resume. -- If there is information in the <|context|>, <|job_description|>, <|work_history|>, or <|resume|> sections to enhance the answer, incorporate it seamlessly and refer to it using natural language instead of mentioning '<|job_description|>' (etc.) or quoting it directly. -- Avoid phrases like 'According to the <|context|>' or similar references to the <|context|>, <|job_description|>, or <|work_history|> tags. -- Ensure the langauge is clear, concise, and aligned with industry standards for professional resumes. - -Structure the resume professionally with the following sections where applicable: - -* Name: Use full name -* Professional Summary: A 2-4 sentence overview tailored to the job. -* Skills: A bullet list of key skills derived from the work history and relevant to the job. -* Professional Experience: A detailed list of roles, achievements, and responsibilities from <|work_history|> that relate to the <|job_description|>. -* Education: Include only if available in the work history. -* Notes: Indicate the initial draft of the resume was generated using the Backstory application. - -""".strip() - -system_fact_check = f""" -Launched on {DateTime()}. - -You are a professional resume fact checker. Your task is to identify any inaccuracies in the <|resume|> based on the individual's <|work_history|>. - -If there are inaccuracies, list them in a bullet point format. - -When answering queries, follow these steps: -- You must not invent or assume any information not explicitly present in the <|work_history|>. -- Analyze the <|resume|> to identify any discrepancies or inaccuracies based on the <|work_history|>. -- If there is information in the <|context|>, <|job_description|>, <|work_history|>, or <|resume|> sections to enhance the answer, incorporate it seamlessly and refer to it using natural language instead of mentioning '<|job_description|>' (etc.) or quoting it directly. -- Avoid phrases like 'According to the <|context|>' or similar references to the <|context|>, <|job_description|>, <|resume|>, or <|work_history|> tags. -""".strip() - -system_job_description = f""" -Launched on {DateTime()}. - -You are a hiring and job placing specialist. Your task is to answers about a job description. - -When answering queries, follow these steps: -- Analyze the <|job_description|> to provide insights for the asked question. -- If any financial information is requested, be sure to account for inflation. -- If there is information in the <|context|>, <|job_description|>, <|work_history|>, or <|resume|> sections to enhance the answer, incorporate it seamlessly and refer to it using natural language instead of mentioning '<|job_description|>' (etc.) or quoting it directly. -- Avoid phrases like 'According to the <|context|>' or similar references to the <|context|>, <|job_description|>, <|resume|>, or <|work_history|> tags. -""".strip() def create_system_message(prompt): return [{"role": "system", "content": prompt}] @@ -237,11 +241,22 @@ def parse_args(): return parser.parse_args() def setup_logging(level): + global logging + numeric_level = getattr(logging, level.upper(), None) if not isinstance(numeric_level, int): raise ValueError(f"Invalid log level: {level}") - logging.basicConfig(level=numeric_level, format="%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s") + logging.basicConfig( + level=numeric_level, + format="%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + force=True + ) + + # Now reduce verbosity for FastAPI, Uvicorn, Starlette + for noisy_logger in ("uvicorn", "uvicorn.error", "uvicorn.access", "fastapi", "starlette"): + logging.getLogger(noisy_logger).setLevel(logging.WARNING) logging.info(f"Logging is set to {level} level.") @@ -320,7 +335,7 @@ def is_valid_uuid(value): except (ValueError, TypeError): return False -def default_tools(tools): +def default_tools(tools: list[dict[str, Any]]) -> list[dict[str, Any]]: return [{**tool, "enabled": True} for tool in tools] def find_summarize_tool(tools): @@ -397,11 +412,10 @@ async def handle_tool_calls(message): # %% class WebServer: - def __init__(self, logging, client, model=MODEL_NAME): - self.logging = logging + def __init__(self, llm, model=MODEL_NAME): self.app = FastAPI() self.contexts = {} - self.client = client + self.llm = llm self.model = model self.processing = False self.file_watcher = None @@ -428,7 +442,7 @@ class WebServer: async def startup_event(): # Start the file watcher self.observer, self.file_watcher = Rag.start_file_watcher( - llm=client, + llm=llm, watch_directory=defines.doc_dir, recreate=False # Don't recreate if exists ) @@ -448,29 +462,25 @@ class WebServer: @self.app.get("/") async def root(): context = self.create_context() - self.logging.info(f"Redirecting non-session to {context['id']}") - return RedirectResponse(url=f"/{context['id']}", status_code=307) - #return JSONResponse({"redirect": f"/{context['id']}"}) + logging.info(f"Redirecting non-session to {context.id}") + return RedirectResponse(url=f"/{context.id}", status_code=307) + #return JSONResponse({"redirect": f"/{context.id}"}) @self.app.put("/api/umap/{context_id}") async def put_umap(context_id: str, request: Request): - if not self.file_watcher: - return - - if not is_valid_uuid(context_id): - logging.warning(f"Invalid context_id: {context_id}") - return JSONResponse({"error": "Invalid context_id"}, status_code=400) - - context = self.upsert_context(context_id) - + logging.info(f"{request.method} {request.url.path}") try: + if not self.file_watcher: + raise Exception("File watcher not initialized") + + context = self.upsert_context(context_id) + if not context: + return JSONResponse({"error": f"Invalid context: {context_id}"}, status_code=400) + data = await request.json() - dimensions = data.get("dimensions", 2) - except: - dimensions = 2 - try: + dimensions = data.get("dimensions", 2) result = self.file_watcher.umap_collection if dimensions == 2: logging.info("Returning 2D UMAP") @@ -489,6 +499,7 @@ class WebServer: @self.app.put("/api/similarity/{context_id}") async def put_similarity(context_id: str, request: Request): + logging.info(f"{request.method} {request.url.path}") if not self.file_watcher: return @@ -531,16 +542,17 @@ class WebServer: @self.app.put("/api/reset/{context_id}/{session_type}") async def put_reset(context_id: str, session_type: str, request: Request): + logging.info(f"{request.method} {request.url.path}") if not is_valid_uuid(context_id): logging.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) - if session_type not in context["sessions"]: - return JSONResponse({ "error": f"{session_type} is not recognized", "context": context }, status_code=404) + session = context.get_session(session_type) + if not session: + return JSONResponse({ "error": f"{session_type} is not recognized", "context": context.id }, status_code=404) data = await request.json() try: - session = context["sessions"][session_type] response = {} for reset_operation in data["reset"]: match reset_operation: @@ -556,16 +568,16 @@ class WebServer: case "fact_check": prompt = system_fact_check - session["system_prompt"] = prompt + session.system_prompt = prompt response["system_prompt"] = { "system_prompt": prompt } case "rags": logging.info(f"Resetting {reset_operation}") - context["rags"] = rags.copy() - response["rags"] = context["rags"] + context.rags = rags.copy() + response["rags"] = context.rags case "tools": logging.info(f"Resetting {reset_operation}") - context["tools"] = default_tools(tools) - response["tools"] = context["tools"] + context.tools = default_tools(tools) + response["tools"] = context.tools case "history": reset_map = { "job_description": ("job_description", "resume", "fact_check"), @@ -576,15 +588,17 @@ class WebServer: resets = reset_map.get(session_type, ()) for mode in resets: + tmp = context.get_session(mode) + if not tmp: + continue logging.info(f"Resetting {reset_operation} for {mode}") - context["sessions"][mode]["llm_history"] = [] - context["sessions"][mode]["user_history"] = [] - context["sessions"][mode]["context_tokens"] = round(len(str(context["sessions"][mode]["system_prompt"])) * 3 / 4) # Estimate context usage + context.conversation = [] + context.context_tokens = round(len(str(session.system_prompt)) * 3 / 4) # Estimate context usage response["history"] = [] - response["context_used"] = session["context_tokens"] + response["context_used"] = session.context_tokens case "message_history_length": logging.info(f"Resetting {reset_operation}") - context["message_history_length"] = DEFAULT_HISTORY_LENGTH + context.message_history_length = DEFAULT_HISTORY_LENGTH response["message_history_length"] = DEFAULT_HISTORY_LENGTH if not response: @@ -598,59 +612,106 @@ class WebServer: @self.app.put("/api/tunables/{context_id}") async def put_tunables(context_id: str, request: Request): - if not is_valid_uuid(context_id): - logging.warning(f"Invalid context_id: {context_id}") - return JSONResponse({"error": "Invalid context_id"}, status_code=400) - context = self.upsert_context(context_id) - data = await request.json() - session = context["sessions"]["chat"] - for k in data.keys(): - match k: - case "system_prompt": - system_prompt = data[k].strip() - if not system_prompt: - return JSONResponse({ "status": "error", "message": "System prompt can not be empty." }) - session["system_prompt"] = system_prompt - self.save_context(context_id) - return JSONResponse({ "system_prompt": system_prompt }) - case "message_history_length": - value = max(0, int(data[k])) - context["message_history_length"] = value - self.save_context(context_id) - return JSONResponse({ "message_history_length": value }) - case _: - return JSONResponse({ "error": f"Unrecognized tunable {k}"}, status_code=404) + logging.info(f"{request.method} {request.url.path}") + try: + context = self.upsert_context(context_id) + + data = await request.json() + session = context.get_session("chat") + if not session: + return JSONResponse({ "error": f"chat is not recognized", "context": context.id }, status_code=404) + for k in data.keys(): + match k: + case "tools": + # { "tools": [{ "tool": tool?.name, "enabled": tool.enabled }] } + tools : list[dict[str, Any]] = data[k] + if not tools: + return JSONResponse({ "status": "error", "message": "Tools can not be empty." }) + for tool in tools: + for context_tool in context.tools: + if context_tool["function"]["name"] == tool["name"]: + context_tool["enabled"] = tool["enabled"] + self.save_context(context_id) + return JSONResponse({ "tools": [ { + **t["function"], + "enabled": t["enabled"], + } for t in context.tools] }) + + case "rags": + # { "rags": [{ "tool": tool?.name, "enabled": tool.enabled }] } + rags : list[dict[str, Any]] = data[k] + if not rags: + return JSONResponse({ "status": "error", "message": "RAGs can not be empty." }) + for rag in rags: + for context_rag in context.rags: + if context_rag["name"] == rag["name"]: + context_rag["enabled"] = rag["enabled"] + self.save_context(context_id) + return JSONResponse({ "rags": context.rags }) + + case "system_prompt": + system_prompt = data[k].strip() + if not system_prompt: + return JSONResponse({ "status": "error", "message": "System prompt can not be empty." }) + session.system_prompt = system_prompt + self.save_context(context_id) + return JSONResponse({ "system_prompt": system_prompt }) + case "message_history_length": + value = max(0, int(data[k])) + context.message_history_length = value + self.save_context(context_id) + return JSONResponse({ "message_history_length": value }) + case _: + return JSONResponse({ "error": f"Unrecognized tunable {k}"}, status_code=404) + except Exception as e: + logging.error(f"Error in put_tunables: {e}") + return JSONResponse({"error": str(e)}, status_code=500) @self.app.get("/api/tunables/{context_id}") - async def get_tunables(context_id: str): + async def get_tunables(context_id: str, request: Request): + logging.info(f"{request.method} {request.url.path}") if not is_valid_uuid(context_id): logging.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) + session = context.get_session("chat") + if not session: + return JSONResponse({ "error": f"chat is not recognized", "context": context.id }, status_code=404) return JSONResponse({ - "system_prompt": context["sessions"]["chat"]["system_prompt"], - "message_history_length": context["message_history_length"] + "system_prompt": session.system_prompt, + "message_history_length": context.message_history_length, + "rags": context.rags, + "tools": [ { + **t["function"], + "enabled": t["enabled"], + } for t in context.tools ] }) @self.app.get("/api/system-info/{context_id}") - async def get_system_info(context_id: str): + async def get_system_info(context_id: str, request: Request): + logging.info(f"{request.method} {request.url.path}") return JSONResponse(system_info(self.model)) @self.app.post("/api/chat/{context_id}/{session_type}") - async def chat_endpoint(context_id: str, session_type: str, request: Request): + async def post_chat_endpoint(context_id: str, session_type: str, request: Request): + logging.info(f"{request.method} {request.url.path}") if not is_valid_uuid(context_id): logging.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) - if session_type not in context["sessions"]: - return JSONResponse({ "error": f"{session_type} is not recognized", "context": context }, status_code=404) + session = context.get_session(session_type) + if not session: + try: + session = context.create_session(session_type) + except Exception as e: + return JSONResponse({ "error": f"{session_type} is not recognized", "context": context.id }, status_code=404) data = await request.json() # Create a custom generator that ensures flushing async def flush_generator(): - async for message in self.chat(context=context, session_type=session_type, content=data["content"]): + async for message in self.generate_response(context=context, session_type=session_type, content=data["content"]): # Convert to JSON and add newline yield json.dumps(message) + "\n" # Save the history as its generated @@ -672,23 +733,33 @@ class WebServer: @self.app.post("/api/context") async def create_context(): context = self.create_context() - self.logging.info(f"Generated new session as {context['id']}") - return JSONResponse(context) + logging.info(f"Generated new session as {context.id}") + return JSONResponse({ "id": context.id }) @self.app.get("/api/history/{context_id}/{session_type}") - async def get_history(context_id: str, session_type: str): - context = self.upsert_context(context_id) - if session_type not in context["sessions"]: - return JSONResponse({ "error": f"{session_type} is not recognized", "context": context }, status_code=404) - return JSONResponse(context["sessions"][session_type]["user_history"]) - + async def get_history(context_id: str, session_type: str, request: Request): + logging.info(f"{request.method} {request.url.path}") + try: + context = self.upsert_context(context_id) + session = context.get_session(session_type) + if not session: + logging.info(f"Session {session_type} not found. Returning empty history.") + return JSONResponse({ "messages": [] }) + logging.info(f"History for {session_type} contains {len(session.conversation.messages)} entries.") + return session.conversation + except Exception as e: + logging.error(f"Error in get_history: {e}") + return JSONResponse({"error": str(e)}, status_code=404) + @self.app.get("/api/tools/{context_id}") - async def get_tools(context_id: str): + async def get_tools(context_id: str, request: Request): + logging.info(f"{request.method} {request.url.path}") context = self.upsert_context(context_id) - return JSONResponse(context["tools"]) + return JSONResponse(context.tools) @self.app.put("/api/tools/{context_id}") async def put_tools(context_id: str, request: Request): + logging.info(f"{request.method} {request.url.path}") if not is_valid_uuid(context_id): logging.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) @@ -697,48 +768,27 @@ class WebServer: data = await request.json() modify = data["tool"] enabled = data["enabled"] - for tool in context["tools"]: + for tool in context.tools: if modify == tool["function"]["name"]: tool["enabled"] = enabled self.save_context(context_id) - return JSONResponse(context["tools"]) + return JSONResponse(context.tools) return JSONResponse({ "status": f"{modify} not found in tools." }, status_code=404) except: - return JSONResponse({ "status": "error" }), 405 + return JSONResponse({ "status": "error" }, 405) - @self.app.get("/api/rags/{context_id}") - async def get_rags(context_id: str): - context = self.upsert_context(context_id) - return JSONResponse(context["rags"]) - - @self.app.put("/api/rags/{context_id}") - async def put_rags(context_id: str, request: Request): - if not is_valid_uuid(context_id): - logging.warning(f"Invalid context_id: {context_id}") - return JSONResponse({"error": "Invalid context_id"}, status_code=400) - context = self.upsert_context(context_id) - try: - data = await request.json() - modify = data["tool"] - enabled = data["enabled"] - for tool in context["rags"]: - if modify == tool["name"]: - tool["enabled"] = enabled - self.save_context(context_id) - return JSONResponse(context["rags"]) - return JSONResponse({ "status": f"{modify} not found in tools." }, status_code=404) - except: - return JSONResponse({ "status": "error" }), 405 @self.app.get("/api/context-status/{context_id}/{session_type}") - async def get_context_status(context_id, session_type: str): + async def get_context_status(context_id, session_type: str, request: Request): + logging.info(f"{request.method} {request.url.path}") if not is_valid_uuid(context_id): logging.warning(f"Invalid context_id: {context_id}") return JSONResponse({"error": "Invalid context_id"}, status_code=400) context = self.upsert_context(context_id) - if session_type not in context["sessions"]: - return JSONResponse({ "error": f"{session_type} is not recognized", "context": context }, status_code=404) - return JSONResponse({"context_used": context["sessions"][session_type]["context_tokens"], "max_context": defines.max_context}) + session = context.get_session(session_type) + if not session: + return JSONResponse({"context_used": 0, "max_context": defines.max_context}) + return JSONResponse({"context_used": session.context_tokens, "max_context": defines.max_context}) @self.app.get("/api/health") async def health_check(): @@ -748,9 +798,9 @@ class WebServer: async def serve_static(path: str): full_path = os.path.join(defines.static_content, path) if os.path.exists(full_path) and os.path.isfile(full_path): - self.logging.info(f"Serve static request for {full_path}") + logging.info(f"Serve static request for {full_path}") return FileResponse(full_path) - self.logging.info(f"Serve index.html for {path}") + logging.info(f"Serve index.html for {path}") return FileResponse(os.path.join(defines.static_content, "index.html")) def save_context(self, session_id): @@ -775,158 +825,79 @@ class WebServer: # Serialize the data to JSON and write to file with open(file_path, "w") as f: - json.dump(context, f) + f.write(context.model_dump_json()) return session_id - - - def migrate_context(self, context): - # No version - # context = { - # "id": context_id, - # "tools": default_tools(tools), - # "rags": rags.copy(), - # "context_tokens": round(len(str(system_context)) * 3 / 4), # Estimate context usage - # "message_history_length": 5, # Number of messages to supply in context - # "system": system_context, - # "system_generate_resume": system_generate_resume, - # "llm_history": [], - # "user_history": [], - # "resume_history": [], - # } - # Version 2: - # context = { - # "version": 2, - # "id": context_id, - # "sessions": { - # **session_type**: { # chat, job-description, resume, fact-check - # "system_prompt": **SYSTEM_MESSAGE**, - # "llm_history": [], - # "user_history": [], - # "context_tokens": round(len(str(**SYSTEM_MESSAGE**)) * 3 / 4), - # } - # }, - # "tools": default_tools(tools), - # "rags": rags.copy(), - # "message_history_length": 5 # Number of messages to supply in context - # } - if "version" not in context: - logging.info(f"Migrating {context['id']}") - context["version"] = CONTEXT_VERSION - context["sessions"] = { - "chat": { - "system_prompt": system_message, - "content_seed": None, - "llm_history": context["llm_history"], - "user_history": context["user_history"], - "context_tokens": round(len(str(create_system_message(system_message)))) - }, - "job_description": { - "system_prompt": system_job_description, - "content_seed": None, - "llm_history": [], - "user_history": [], - "context_tokens": round(len(str(create_system_message(system_job_description)))) - }, - "resume": { - "system_prompt": system_generate_resume, - "content_seed": None, - "llm_history": [], - "user_history": [], - "context_tokens": round(len(str(create_system_message(system_generate_resume)))) - }, - "fact_check": { - "system_prompt": system_fact_check, - "content_seed": None, - "llm_history": [], - "user_history": [], - "context_tokens": round(len(str(create_system_message(system_fact_check)))) - }, - } - del context["system"] - del context["system_generate_resume"] - del context["llm_history"] - del context["user_history"] - del context["resume_history"] - - return context - def load_context(self, session_id): + def load_context(self, session_id) -> Context: """ - Load a serialized Python dictionary from a file in the sessions directory. - + Load a context from a file in the sessions directory. Args: - session_id: UUID string for the filename - + session_id: UUID string for the context. If it doesn't exist, a new context is created. Returns: - The deserialized dictionary, or a new context if it doesn't exist on disk. + A Context object with the specified ID and default settings. """ + file_path = os.path.join(defines.session_dir, session_id) # Check if the file exists if not os.path.exists(file_path): - return self.create_context(session_id) - - # Read and deserialize the data - with open(file_path, "r") as f: - self.contexts[session_id] = json.load(f) + self.contexts[session_id] = self.create_context(session_id) + else: + # Read and deserialize the data + with open(file_path, "r") as f: + self.contexts[session_id] = Context.model_validate_json(f.read()) - return self.migrate_context(self.contexts[session_id]) + return self.contexts[session_id] - def create_context(self, context_id = None): - if not context_id: - context_id = str(uuid.uuid4()) - context = { - "id": context_id, - "version": CONTEXT_VERSION, - "sessions": { - "chat": { - "system_prompt": system_message, - "content_seed": None, - "llm_history": [], - "user_history": [], - "context_tokens": round(len(str(system_message)) * 3 / 4), # Estimate context usage - }, - "job_description": { - "system_prompt": system_job_description, - "content_seed": None, - "llm_history": [], - "user_history": [], - "context_tokens": round(len(str(system_job_description)) * 3 / 4), # Estimate context usage - }, - "resume": { - "system_prompt": system_generate_resume, - "content_seed": None, - "llm_history": [], - "user_history": [], - "context_tokens": round(len(str(system_generate_resume)) * 3 / 4), # Estimate context usage - }, - "fact_check": { - "system_prompt": system_fact_check, - "content_seed": None, - "llm_history": [], - "user_history": [], - "context_tokens": round(len(str(system_fact_check)) * 3 / 4), # Estimate context usage - }, - }, - "tools": default_tools(tools), - "rags": rags.copy(), - "message_history_length": 5 # Number of messages to supply in context - } - logging.info(f"{context_id} created and added to sessions.") - self.contexts[context_id] = context + def create_context(self, context_id = None) -> Context: + """ + Create a new context with a unique ID and default settings. + Args: + context_id: Optional UUID string for the context. If not provided, a new UUID is generated. + Returns: + A Context object with the specified ID and default settings. + """ + context = Context(id=context_id) + + if os.path.exists(defines.resume_doc): + context.user_resume = open(defines.resume_doc, "r").read() + context.add_session(Chat(system_prompt = system_message)) + # context.add_session(Resume(system_prompt = system_generate_resume)) + # context.add_session(JobDescription(system_prompt = system_job_description)) + # context.add_session(FactCheck(system_prompt = system_fact_check)) + context.tools = default_tools(tools) + context.rags = rags.copy() + + logging.info(f"{context.id} created and added to sessions.") + self.contexts[context.id] = context + self.save_context(context.id) return context def get_optimal_ctx_size(self, context, messages, ctx_buffer = 4096): ctx = round(context + len(str(messages)) * 3 / 4) return max(defines.max_context, min(2048, ctx + ctx_buffer)) - def upsert_context(self, context_id): + def upsert_context(self, context_id = None) -> Context: + """ + Upsert a context based on the provided context_id. + Args: + context_id: UUID string for the context. If it doesn't exist, a new context is created. + Returns: + A Context object with the specified ID and default settings. + """ + if not context_id: logging.warning("No context ID provided. Creating a new context.") return self.create_context() + + if not is_valid_uuid(context_id): + logging.info(f"User requested invalid context_id: {context_id}") + raise ValueError("Invalid context_id: {context_id}") + if context_id in self.contexts: return self.contexts[context_id] + logging.info(f"Context {context_id} not found. Creating new context.") return self.load_context(context_id) @@ -934,7 +905,7 @@ class WebServer: results_found = False if self.file_watcher: - for rag in context["rags"]: + for rag in context.rags: if rag["enabled"] and rag["name"] == "JPK": # Only support JPK rag right now... yield {"status": "processing", "message": f"Checking RAG context {rag['name']}..."} chroma_results = self.file_watcher.find_similar(query=content, top_k=10) @@ -985,354 +956,405 @@ class WebServer: # * First message sets Fact Check and is Q&A # * Has content # * Then Q&A of Fact Check - - async def chat(self, context, session_type, content): + async def generate_response(self, context : Context, session_type : str, content : str): if not self.file_watcher: return - - content = content.strip() if self.processing: - yield {"status": "error", "message": "Busy"} + logging.info("TODO: Implement delay queing; busy for same session, otherwise return queue size and estimated wait time") + yield {"status": "error", "message": "Busy processing another request."} return self.processing = True - try: - session = context["sessions"][session_type] - llm_history = session["llm_history"] - user_history = session["user_history"] - metadata = { - "origin": session_type, - "rag": { "documents": [] }, - "tools": [], - "eval_count": 0, - "eval_duration": 0, - "prompt_eval_count": 0, - "prompt_eval_duration": 0, - } + # Check if the session_type is valid + if not context.is_valid_session_type(session_type): + yield {"status": "error", "message": f"Session type {session_type} is not invalid."} + self.processing = False + return - # Default to not using tools - enable_tools = False + session = context.get_session(session_type) + if session is None: + session = context.create_session(session_type) - # Default to using RAG if there is content to check - if content: - enable_rag = True - else: - enable_rag = False + if session is None: + yield {"status": "error", "message": f"Session type {session_type} is not invalid."} + self.processing = False + return + + conversation : Conversation = session.conversation - # RAG is disabled when asking questions about the resume - if session_type == "resume": - enable_rag = False + message = Message(prompt=content) + del content # Prevent accidental use of content - # The first time through each session session_type a content_seed may be set for - # future chat sessions; use it once, then clear it - if session["content_seed"]: - preamble = f"{session['content_seed']}" - session["content_seed"] = None - else: - preamble = "" + # Default to not using tools + enable_tools = False - # After the first time a particular session session_type is used, it is handled as a chat. - # The number of messages indicating the session is ready for chat varies based on - # the session_type of session - process_type = session_type - match process_type: - case "job_description": - logging.info(f"job_description user_history len: {len(user_history)}") - if len(user_history) >= 2: # USER, ASSISTANT - process_type = "chat" - case "resume": - logging.info(f"resume user_history len: {len(user_history)}") - if len(user_history) >= 3: # USER, ASSISTANT, FACT_CHECK - process_type = "chat" - case "fact_check": - process_type = "chat" # Fact Check is always a chat session + # Default to using RAG if there is content to check + if message.prompt: + enable_rag = True + else: + enable_rag = False - match process_type: - # Normal chat interactions with context history - case "chat": - if not content: - yield {"status": "error", "message": "No query provided for chat."} - logging.info(f"user_history len: {len(user_history)}") - return + # RAG is disabled when asking questions about the resume + if session_type == "resume": + enable_rag = False - enable_tools = True + # The first time through each session session_type a content_seed may be set for + # future chat sessions; use it once, then clear it + message.preamble = session.get_and_reset_content_seed() - # Generate RAG content if enabled, based on the content - rag_context = "" - if enable_rag: - # Initialize metadata["rag"] to None or a default value - metadata["rag"] = None + # After the first time a particular session session_type is used, it is handled as a chat. + # The number of messages indicating the session is ready for chat varies based on + # the session_type of session + process_type = session_type + match process_type: + case "job_description": + logging.info(f"job_description user_history len: {len(conversation.messages)}") + if len(conversation.messages) >= 2: # USER, ASSISTANT + process_type = "chat" + case "resume": + logging.info(f"resume user_history len: {len(conversation.messages)}") + if len(conversation.messages) >= 3: # USER, ASSISTANT, FACT_CHECK + process_type = "chat" + case "fact_check": + process_type = "chat" # Fact Check is always a chat session - for value in self.generate_rag_results(context, content): - if "status" in value: - yield value - else: - if value.get("documents") or value.get("rag") is not None: - metadata["rag"] = value + match process_type: + # Normal chat interactions with context history + case "chat": + if not message.prompt: + yield {"status": "error", "message": "No query provided for chat."} + logging.info(f"user_history len: {len(conversation.messages)}") + self.processing = False + return - if metadata["rag"]: - for doc in metadata["rag"]["documents"]: - rag_context += f"{doc}\n" + logging.info("TODO: Re-enable tools...") + #enable_tools = True - if rag_context: - preamble = f""" + # Generate RAG content if enabled, based on the content + rag_context = "" + if enable_rag: + # Initialize metadata["rag"] to None or a default value + message.metadata["rag"] = None -<|rules|> -- If there is information in the <|context|>, <|job_description|>, <|work_history|>, or <|resume|> sections to enhance the answer, incorporate it seamlessly and refer to it using natural language instead of mentioning '<|job_description|>' (etc.) or quoting it directly. -- Avoid phrases like 'According to the <|context|>' or similar references to the <|context|>, <|job_description|>, or <|work_history|> tags. + for value in self.generate_rag_results(context, message.prompt): + if "status" in value: + yield value + else: + if value.get("documents") or value.get("rag") is not None: + message.metadata["rag"] = value + if message.metadata["rag"]: + for doc in message.metadata["rag"]["documents"]: + rag_context += f"{doc}\n" + + if rag_context: + message.preamble = f""" <|context|> {rag_context} +""" + if context.user_resume: + message.preamble += f""" +<|resume|> +{context.user_resume} +""" + + message.preamble += """ +<|rules|> +- If there is information in the <|context|> or <|resume|> sections to enhance the answer, incorporate it seamlessly and refer to it using natural language instead of mentioning '<|context|>' or '<|resume|> or quoting it directly. +- Avoid phrases like 'According to the <|context|>' or similar references to the <|context|> or <|resume|>. <|question|> Use that information to respond to:""" - # Use the mode specific system_prompt instead of 'chat' - system_prompt = context["sessions"][session_type]["system_prompt"] + # Use the mode specific system_prompt instead of 'chat' + logging.info("Fix this... reimplement") + #system_prompt = context.get_session(session_type).system_prompt - # On first entry, a single job_description is provided ("user") - # Generate a resume to append to RESUME history - case "job_description": - # Generate RAG content if enabled, based on the content - # Always force the full resume to be in context - resume_doc = open(defines.resume_doc, "r").read() - rag_context = f"{resume_doc}\n" - if enable_rag: - # Initialize metadata["rag"] to None or a default value - metadata["rag"] = None + # On first entry, a single job_description is provided ("user") + # Generate a resume to append to RESUME history + case "job_description": + # Generate RAG content if enabled, based on the content + rag_context = "" + if enable_rag: + # Initialize metadata["rag"] to None or a default value + message.metadata["rag"] = None - for value in self.generate_rag_results(context, content): - if "status" in value: - yield value - else: - if value.get("documents") or value.get("rag") is not None: - metadata["rag"] = value + for value in self.generate_rag_results(context, message.prompt): + if "status" in value: + yield value + else: + if value.get("documents") or value.get("rag") is not None: + message.metadata["rag"] = value - if metadata["rag"]: - for doc in metadata["rag"]["documents"]: - rag_context += f"{doc}\n" + if message.metadata["rag"]: + for doc in message.metadata["rag"]["documents"]: + rag_context += f"{doc}\n" - preamble = f""" -<|work_history|> + message.preamble = "" + if rag_context: + message.preamble += f""" +<|context|> {rag_context} - -<|job_description|> -{content} - """ - context["sessions"]["job_description"]["content_seed"] = preamble + "<|question|>\nUse the above information to answer this query: " - - preamble += f""" - -<|rules|> -1. Use the above <|<|work_history|> to create the resume for the <|job_description|>. -2. Do not use content from the <|job_description|> in the response unless the <|work_history|> mentions them. - -<|question|> + if context.user_resume: + message.preamble += f""" +<|resume|> +{context.user_resume} """ - # Seed the history for job_description - messages = [ { - "role": "user", "content": content - }, { - "role": "assistant", "content": "Job description stored to use in future queries.", "display": "hide" - } ] - # Strip out the 'display' key when adding to llm_history - llm_history.extend([{k: v for k, v in m.items() if k != 'display'} for m in messages]) - user_history.extend([{**m, "origin": "job_description"} for m in messages]) - - # Switch to resume session for LLM responses - metadata["origin"] = "resume" - session = context["sessions"]["resume"] - system_prompt = session["system_prompt"] - llm_history = session["llm_history"] = [] - user_history = session["user_history"] = [] + message.preamble += f""" +<|job_description|> +{message.prompt} +""" + + context.get_or_create_session("job_description").set_content_seed(message.preamble + "<|question|>\nUse the above information to respond to this prompt: ") - # Ignore the passed in content and invoke Fact Check - case "resume": - if len(context["sessions"]["resume"]["user_history"]) < 2: # USER, **ASSISTANT** - raise Exception(f"No resume found in user history.") - resume = context["sessions"]["resume"]["user_history"][1] + message.preamble += f""" +<|rules|> +1. Use the above <|resume|> and <|context|> to create the resume for the <|job_description|>. +2. Do not use content from the <|job_description|> in the response unless the <|context|> or <|resume|> mentions them. - # Generate RAG content if enabled, based on the content - # Always force the full resume to be in context - resume_doc = open(defines.resume_doc, "r").read() - rag_context = f"{resume_doc}\n" - if enable_rag: - # Initialize metadata["rag"] to None or a default value - metadata["rag"] = None +<|question|> +Respond to the above information to respond to this prompt: " +""" - for value in self.generate_rag_results(context, resume["content"]): - if "status" in value: - yield value - else: - if value.get("documents") or value.get("rag") is not None: - metadata["rag"] = value + # Seed the history for job_description + messages = [ { + "role": "user", "content": message.prompt + }, { + "role": "assistant", "content": "Job description stored to use in future queries.", "display": "hide" + } ] + # Strip out the 'display' key when adding to llm_history + conversation.extend([{**m, "origin": "job_description"} for m in messages]) - if metadata["rag"]: - for doc in metadata["rag"]["documents"]: - rag_context += f"{doc}\n" + # Switch to resume session for LLM responses + message.metadata["origin"] = "resume" + session = context.get_or_create_session("resume") + system_prompt = session.system_prompt + llm_history = session.llm_history = [] + user_history = session.user_history = [] - preamble = f""" -<|work_history|> + # Ignore the passed in content and invoke Fact Check + case "resume": + if len(context.get_or_create_session("resume").conversation.messages) < 2: # USER, **ASSISTANT** + raise Exception(f"No resume found in user history.") + resume = context.get_or_create_session("resume").conversation.messages[1] + + # Generate RAG content if enabled, based on the content + rag_context = "" + if enable_rag: + # Initialize metadata["rag"] to None or a default value + message.metadata["rag"] = None + + for value in self.generate_rag_results(context, resume["content"]): + if "status" in value: + yield value + else: + if value.get("documents") or value.get("rag") is not None: + message.metadata["rag"] = value + + if message.metadata["rag"]: + for doc in message.metadata["rag"]["documents"]: + rag_context += f"{doc}\n" + + + # This is being passed to Fact Check, so do not provide the <|job_description|> + message.preamble = f"" + + if rag_context: + message.preamble += f""" +<|context|> {rag_context} +""" + if context.user_resume: + # Do not prefix the resume with <|resume|>; just add to the <|context|> + message.preamble += f""" +{context.user_resume} +""" + message.preamble += f""" <|resume|> {resume['content']} - + <|rules|> -1. Do not invent or assume any information not explicitly present in the <|work_history|>. -2. Analyze the <|resume|> to identify any discrepancies or inaccuracies based on the <|work_history|>. +1. Do not invent or assume any information not explicitly present in the <|context|>. +2. Analyze the <|resume|> to identify any discrepancies or inaccuracies based on the <|context|>. <|question|> """ - - context["sessions"]["resume"]["content_seed"] = f""" + + context.get_or_create_session("resume").set_content_seed(f""" <|resume|> {resume["content"]} <|question|> -Use the above <|resume|> to answer this query: -""" +Use the above <|resume|> and <|job_description|> to answer this query: +""") - content = "Fact check the resume and report discrepancies." + message.prompt = "Fact check the resume and report discrepancies." - # Seed the history for resume - messages = [ { - "role": "user", "content": "Fact check resume", "origin": "resume", "display": "hide" - }, { - "role": "assistant", "content": "Resume fact checked.", "origin": "resume", "display": "hide" - } ] - # Do not add this to the LLM history; it is only used for UI presentation - user_history.extend(messages) - - # Switch to fact_check session for LLM responses - metadata["origin"] = "fact_check" - session = context["sessions"]["fact_check"] - system_prompt = session["system_prompt"] - llm_history = session["llm_history"] = [] - user_history = session["user_history"] = [] + # Seed the history for resume + messages = [ { + "role": "user", "content": "Fact check resume", "origin": "resume", "display": "hide" + }, { + "role": "assistant", "content": "Resume fact checked.", "origin": "resume", "display": "hide" + } ] + # Do not add this to the LLM history; it is only used for UI presentation + conversation.extend(messages) + + # Switch to fact_check session for LLM responses + message.metadata["origin"] = "fact_check" + session = context.get_or_create_session("fact_check") + system_prompt = session.system_prompt + llm_history = session.llm_history = [] + user_history = session.user_history = [] + case _: + raise Exception(f"Invalid chat session_type: {session_type}") + + conversation.add_message(message) + # llm_history.append({"role": "user", "content": message.preamble + content}) + # user_history.append({"role": "user", "content": content, "origin": message.metadata["origin"]}) + # message.metadata["full_query"] = llm_history[-1]["content"] + + messages = create_system_message(session.system_prompt) + if context.message_history_length: + to_add = conversation.messages[-context.message_history_length:] + else: + to_add = conversation.messages + for m in to_add: + messages.extend([ { + "role": "user", + "content": m.content, + }, { + "role": "assistant", + "content": m.response, + } ]) + messages.append({ + "role": "user", + "content": message.preamble + message.prompt, + }) + + # Estimate token length of new messages + ctx_size = self.get_optimal_ctx_size(context.get_or_create_session(process_type).context_tokens, messages=message.prompt) + + if len(conversation.messages) > 2: + processing_message = f"Processing {'RAG augmented ' if enable_rag else ''}query..." + else: + match session_type: + case "job_description": + processing_message = f"Generating {'RAG augmented ' if enable_rag else ''}resume..." + case "resume": + processing_message = f"Fact Checking {'RAG augmented ' if enable_rag else ''}resume..." case _: - raise Exception(f"Invalid chat session_type: {session_type}") - - llm_history.append({"role": "user", "content": preamble + content}) - user_history.append({"role": "user", "content": content, "origin": metadata["origin"]}) - metadata["full_query"] = llm_history[-1]["content"] - - if context["message_history_length"]: - messages = create_system_message(system_prompt) + llm_history[-context["message_history_length"]:] + processing_message = f"Processing {'RAG augmented ' if enable_rag else ''}query..." + + yield {"status": "processing", "message": processing_message, "num_ctx": ctx_size} + + # Use the async generator in an async for loop + try: + if enable_tools: + response = self.llm.chat(model=self.model, messages=messages, tools=llm_tools(context.tools), options={ "num_ctx": ctx_size }) else: - messages = create_system_message(system_prompt) + llm_history + response = self.llm.chat(model=self.model, messages=messages, options={ "num_ctx": ctx_size }) + except Exception as e: + logging.exception({ "model": self.model, "error": str(e) }) + yield {"status": "error", "message": f"An error occurred communicating with LLM"} + self.processing = False + return + + message.metadata["eval_count"] += response["eval_count"] + message.metadata["eval_duration"] += response["eval_duration"] + message.metadata["prompt_eval_count"] += response["prompt_eval_count"] + message.metadata["prompt_eval_duration"] += response["prompt_eval_duration"] + session.context_tokens = response["prompt_eval_count"] + response["eval_count"] + + tools_used = [] + + yield {"status": "processing", "message": "Initial response received..."} + + if "tool_calls" in response.get("message", {}): + yield {"status": "processing", "message": "Processing tool calls..."} + + tool_message = response["message"] + tool_result = None + + # Process all yielded items from the handler + async for item in handle_tool_calls(tool_message): + if isinstance(item, tuple) and len(item) == 2: + # This is the final result tuple (tool_result, tools_used) + tool_result, tools_used = item + else: + # This is a status update, forward it + yield item + + message_dict = { + "role": tool_message.get("role", "assistant"), + "content": tool_message.get("content", "") + } + + if "tool_calls" in tool_message: + message_dict["tool_calls"] = [ + {"function": {"name": tc["function"]["name"], "arguments": tc["function"]["arguments"]}} + for tc in tool_message["tool_calls"] + ] + + pre_add_index = len(messages) + messages.append(message_dict) + + if isinstance(tool_result, list): + messages.extend(tool_result) + else: + if tool_result: + messages.append(tool_result) + + message.metadata["tools"] = tools_used # Estimate token length of new messages - ctx_size = self.get_optimal_ctx_size(context["sessions"][process_type]["context_tokens"], messages=llm_history[-1]["content"]) - - if len(user_history) > 2: - processing_message = f"Processing {'RAG augmented ' if enable_rag else ''}query..." - else: - match session_type: - case "job_description": - processing_message = f"Generating {'RAG augmented ' if enable_rag else ''}resume..." - case "resume": - processing_message = f"Fact Checking {'RAG augmented ' if enable_rag else ''}resume..." - case _: - processing_message = f"Processing {'RAG augmented ' if enable_rag else ''}query..." - - yield {"status": "processing", "message": processing_message, "num_ctx": ctx_size} - - # Use the async generator in an async for loop - try: - if enable_tools: - response = self.client.chat(model=self.model, messages=messages, tools=llm_tools(context["tools"]), options={ "num_ctx": ctx_size }) - else: - response = self.client.chat(model=self.model, messages=messages, options={ "num_ctx": ctx_size }) - except Exception as e: - logging.exception({ "model": self.model, "error": str(e) }) - yield {"status": "error", "message": f"An error occurred communicating with LLM"} - return + ctx_size = self.get_optimal_ctx_size(session.context_tokens, messages=messages[pre_add_index:]) + yield {"status": "processing", "message": "Generating final response...", "num_ctx": ctx_size } + # Decrease creativity when processing tool call requests + response = self.llm.chat(model=self.model, messages=messages, stream=False, options={ "num_ctx": ctx_size }) #, "temperature": 0.5 }) + message.metadata["eval_count"] += response["eval_count"] + message.metadata["eval_duration"] += response["eval_duration"] + message.metadata["prompt_eval_count"] += response["prompt_eval_count"] + message.metadata["prompt_eval_duration"] += response["prompt_eval_duration"] + session.context_tokens = response["prompt_eval_count"] + response["eval_count"] - metadata["eval_count"] += response["eval_count"] - metadata["eval_duration"] += response["eval_duration"] - metadata["prompt_eval_count"] += response["prompt_eval_count"] - metadata["prompt_eval_duration"] += response["prompt_eval_duration"] - session["context_tokens"] = response["prompt_eval_count"] + response["eval_count"] + reply = response["message"]["content"] + message.response = reply + message.metadata["origin"] = session_type + # final_message = {"role": "assistant", "content": reply } - tools_used = [] - - yield {"status": "processing", "message": "Initial response received..."} + # # history is provided to the LLM and should not have additional metadata + # llm_history.append(final_message) - if "tool_calls" in response.get("message", {}): - yield {"status": "processing", "message": "Processing tool calls..."} - - message = response["message"] - tool_result = None - - # Process all yielded items from the handler - async for item in handle_tool_calls(message): - if isinstance(item, tuple) and len(item) == 2: - # This is the final result tuple (tool_result, tools_used) - tool_result, tools_used = item - else: - # This is a status update, forward it - yield item - - message_dict = { - "role": message.get("role", "assistant"), - "content": message.get("content", "") - } - - if "tool_calls" in message: - message_dict["tool_calls"] = [ - {"function": {"name": tc["function"]["name"], "arguments": tc["function"]["arguments"]}} - for tc in message["tool_calls"] - ] + # user_history is provided to the REST API and does not include CONTEXT + # It does include metadata + # final_message["metadata"] = message.metadata + # user_history.append({**final_message, "origin": message.metadata["origin"]}) - pre_add_index = len(messages) - messages.append(message_dict) + # Return the REST API with metadata + yield { + "status": "done", + "message": { + "role": "assistant", + "content": message.response, + "metadata": message.metadata + } + } - if isinstance(tool_result, list): - messages.extend(tool_result) - else: - if tool_result: - messages.append(tool_result) + # except Exception as e: + # logging.exception({ "model": self.model, "origin": session_type, "content": content, "error": str(e) }) + # yield {"status": "error", "message": f"An error occurred: {str(e)}"} - metadata["tools"] = tools_used - - # Estimate token length of new messages - ctx_size = self.get_optimal_ctx_size(session["context_tokens"], messages=messages[pre_add_index:]) - yield {"status": "processing", "message": "Generating final response...", "num_ctx": ctx_size } - # Decrease creativity when processing tool call requests - response = self.client.chat(model=self.model, messages=messages, stream=False, options={ "num_ctx": ctx_size }) #, "temperature": 0.5 }) - metadata["eval_count"] += response["eval_count"] - metadata["eval_duration"] += response["eval_duration"] - metadata["prompt_eval_count"] += response["prompt_eval_count"] - metadata["prompt_eval_duration"] += response["prompt_eval_duration"] - session["context_tokens"] = response["prompt_eval_count"] + response["eval_count"] - - reply = response["message"]["content"] - final_message = {"role": "assistant", "content": reply } - - # history is provided to the LLM and should not have additional metadata - llm_history.append(final_message) - - # user_history is provided to the REST API and does not include CONTEXT - # It does include metadata - final_message["metadata"] = metadata - user_history.append({**final_message, "origin": metadata["origin"]}) - - # Return the REST API with metadata - yield {"status": "done", "message": final_message } - - except Exception as e: - logging.exception({ "model": self.model, "origin": session_type, "content": content, "error": str(e) }) - yield {"status": "error", "message": f"An error occurred: {str(e)}"} - - finally: - self.processing = False + # finally: + # self.processing = False + self.processing = False + return def run(self, host="0.0.0.0", port=WEB_PORT, **kwargs): try: @@ -1342,6 +1364,7 @@ Use the above <|resume|> to answer this query: self.app, host=host, port=port, + log_config=None, ssl_keyfile=defines.key_path, ssl_certfile=defines.cert_path ) @@ -1350,7 +1373,8 @@ Use the above <|resume|> to answer this query: uvicorn.run( self.app, host=host, - port=port + port=port, + log_config=None ) except KeyboardInterrupt: if self.observer: @@ -1362,24 +1386,30 @@ Use the above <|resume|> to answer this query: # Main function to run everything def main(): - global client, model, web_server + global model + # Parse command-line arguments args = parse_args() # Setup logging based on the provided level setup_logging(args.level) - client = ollama.Client(host=args.ollama_server) + warnings.filterwarnings( + "ignore", + category=FutureWarning, + module="sklearn.*" + ) + + warnings.filterwarnings( + "ignore", + category=UserWarning, + module="umap.*" + ) + + llm = ollama.Client(host=args.ollama_server) model = args.ollama_model -# documents = Rag.load_text_files(defines.doc_dir) -# print(f"Documents loaded {len(documents)}") -# chunks = Rag.create_chunks_from_documents(documents) -# doc_types = set(chunk.metadata["doc_type"] for chunk in chunks) -# print(f"Document types: {doc_types}") -# print(f"Vectorstore created with {collection.count()} documents") - - web_server = WebServer(logging, client, model) + web_server = WebServer(llm, model) web_server.run(host=args.web_host, port=args.web_port, use_reloader=False) diff --git a/src/utils/__init__.py b/src/utils/__init__.py index 28953e0..3b6a2da 100644 --- a/src/utils/__init__.py +++ b/src/utils/__init__.py @@ -2,7 +2,9 @@ from . import defines # Import rest as `utils.*` accessible -from .rag import * +from .rag import ChromaDBFileWatcher, start_file_watcher -# Expose only public names (avoid importing hidden/internal names) -__all__ = [name for name in dir() if not name.startswith("_")] +from .message import Message +from .conversation import Conversation +from .session import Session, Chat, Resume, JobDescription, FactCheck +from .context import Context \ No newline at end of file diff --git a/src/utils/context.py b/src/utils/context.py new file mode 100644 index 0000000..6e01877 --- /dev/null +++ b/src/utils/context.py @@ -0,0 +1,98 @@ +from pydantic import BaseModel, Field, model_validator +from uuid import uuid4 +from typing import List, Optional +from typing_extensions import Annotated, Union +from .session import AnySession, Session + +class Context(BaseModel): + id: str = Field( + default_factory=lambda: str(uuid4()), + pattern=r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$" + ) + + sessions: List[Annotated[Union[*Session.__subclasses__()], Field(discriminator="session_type")]] = Field( + default_factory=list + ) + + user_resume: Optional[str] = None + user_job_description: Optional[str] = None + user_facts: Optional[str] = None + tools: List[dict] = [] + rags: List[dict] = [] + message_history_length: int = 5 + context_tokens: int = 0 + + def __init__(self, id: Optional[str] = None, **kwargs): + super().__init__(id=id if id is not None else str(uuid4()), **kwargs) + + @model_validator(mode="after") + def validate_unique_session_types(self): + """Ensure at most one session per session_type.""" + session_types = [session.session_type for session in self.sessions] + if len(session_types) != len(set(session_types)): + raise ValueError("Context cannot contain multiple sessions of the same session_type") + return self + + def get_or_create_session(self, session_type: str, **kwargs) -> Session: + """ + Get or create and append a new session of the specified type, ensuring only one session per type exists. + + Args: + session_type: The type of session to create (e.g., 'web', 'database'). + **kwargs: Additional fields required by the specific session subclass. + + Returns: + The created session instance. + + Raises: + ValueError: If no matching session type is found or if a session of this type already exists. + """ + # Check if a session with the given session_type already exists + for session in self.sessions: + if session.session_type == session_type: + return session + + # Find the matching subclass + for session_cls in Session.__subclasses__(): + if session_cls.__fields__["session_type"].default == session_type: + # Create the session instance with provided kwargs + session = session_cls(session_type=session_type, **kwargs) + self.sessions.append(session) + return session + + raise ValueError(f"No session class found for session_type: {session_type}") + + def add_session(self, session: AnySession) -> None: + """Add a Session to the context, ensuring no duplicate session_type.""" + if any(s.session_type == session.session_type for s in self.sessions): + raise ValueError(f"A session with session_type '{session.session_type}' already exists") + self.sessions.append(session) + + def get_session(self, session_type: str) -> Session | None: + """Return the Session with the given session_type, or None if not found.""" + for session in self.sessions: + if session.session_type == session_type: + return session + return None + + def is_valid_session_type(self, session_type: str) -> bool: + """Check if the given session_type is valid.""" + return session_type in Session.valid_session_types() + + def get_summary(self) -> str: + """Return a summary of the context.""" + if not self.sessions: + return f"Context {self.uuid}: No sessions." + summary = f"Context {self.uuid}:\n" + for i, session in enumerate(self.sessions, 1): + summary += f"\nSession {i} ({session.session_type}):\n" + summary += session.conversation.get_summary() + if session.session_type == "resume": + summary += f"\nResume: {session.get_resume()}\n" + elif session.session_type == "job_description": + summary += f"\nJob Description: {session.job_description}\n" + elif session.session_type == "fact_check": + summary += f"\nFacts: {session.facts}\n" + elif session.session_type == "chat": + summary += f"\nChat Name: {session.name}\n" + return summary \ No newline at end of file diff --git a/src/utils/conversation.py b/src/utils/conversation.py new file mode 100644 index 0000000..9d7d494 --- /dev/null +++ b/src/utils/conversation.py @@ -0,0 +1,23 @@ +from pydantic import BaseModel +from typing import List +from datetime import datetime, timezone +from .message import Message + +class Conversation(BaseModel): + messages: List[Message] = [] + + def add_message(self, message: Message | List[Message]) -> None: + """Add a Message(s) to the conversation.""" + if isinstance(message, Message): + self.messages.append(message) + else: + self.messages.extend(message) + + def get_summary(self) -> str: + """Return a summary of the conversation.""" + if not self.messages: + return "Conversation is empty." + summary = f"Conversation:\n" + for i, message in enumerate(self.messages, 1): + summary += f"\nMessage {i}:\n{message.get_summary()}\n" + return summary \ No newline at end of file diff --git a/src/utils/message.py b/src/utils/message.py new file mode 100644 index 0000000..782948b --- /dev/null +++ b/src/utils/message.py @@ -0,0 +1,31 @@ +from pydantic import BaseModel, model_validator +from typing import Dict, List, Optional, Any +from datetime import datetime, timezone + +class Message(BaseModel): + prompt: str + preamble: str = "" + content: str = "" + response: str = "" + metadata: dict[str, Any] = { + "rag": { "documents": [] }, + "tools": [], + "eval_count": 0, + "eval_duration": 0, + "prompt_eval_count": 0, + "prompt_eval_duration": 0, + } + actions: List[str] = [] + timestamp: datetime = datetime.now(timezone.utc) + + def get_summary(self) -> str: + """Return a summary of the message.""" + response_summary = ( + f"Response: {self.response} (Actions: {', '.join(self.actions)})" + if self.response else "No response yet" + ) + return ( + f"Message at {self.timestamp}:\n" + f"Query: {self.preamble}{self.content}\n" + f"{response_summary}" + ) \ No newline at end of file diff --git a/src/utils/session.py b/src/utils/session.py new file mode 100644 index 0000000..e150dc9 --- /dev/null +++ b/src/utils/session.py @@ -0,0 +1,78 @@ +from pydantic import BaseModel, Field, model_validator, PrivateAttr +from typing import Literal, TypeAlias, get_args +from .conversation import Conversation + +class Session(BaseModel): + session_type: Literal["resume", "job_description", "fact_check", "chat"] + system_prompt: str = "You are a helpful assistant." + conversation: Conversation = Conversation() + context_tokens: int = 0 + + _content_seed: str = PrivateAttr(default="") + + def get_and_reset_content_seed(self): + tmp = self._content_seed + self._content_seed = "" + return tmp + + def set_content_seed(self, content: str) -> None: + """Set the content seed for the session.""" + self._content_seed = content + + def get_content_seed(self) -> str: + """Get the content seed for the session.""" + return self._content_seed + + @classmethod + def valid_session_types(cls) -> set[str]: + """Return the set of valid session_type values.""" + return set(get_args(cls.__annotations__["session_type"])) + + +# Type alias for Session or any subclass +AnySession: TypeAlias = Session # BaseModel covers Session and subclasses + +class Resume(Session): + session_type: Literal["resume"] = "resume" + resume: str = "" + + @model_validator(mode="after") + def validate_resume(self): + if not self.resume.strip(): + raise ValueError("Resume content cannot be empty") + return self + + def get_resume(self) -> str: + """Get the resume content.""" + return self.resume + + def set_resume(self, resume: str) -> None: + """Set the resume content.""" + self.resume = resume + +class JobDescription(Session): + session_type: Literal["job_description"] = "job_description" + job_description: str = "" + + @model_validator(mode="after") + def validate_job_description(self): + if not self.job_description.strip(): + raise ValueError("Job description cannot be empty") + return self + +class FactCheck(Session): + session_type: Literal["fact_check"] = "fact_check" + facts: str = "" + + @model_validator(mode="after") + def validate_facts(self): + if not self.facts.strip(): + raise ValueError("Facts cannot be empty") + return self + +class Chat(Session): + session_type: Literal["chat"] = "chat" + + @model_validator(mode="after") + def validate_name(self): + return self \ No newline at end of file