backstory/src/backend/utils/helpers.py

347 lines
13 KiB
Python

"""
Helper functions shared across routes
"""
import asyncio
import hashlib
import json
import pathlib
from datetime import datetime, UTC
from typing import Any, Dict, List, Optional, Tuple
from fastapi.responses import StreamingResponse
import defines
from logger import logger
from models import DocumentType
from models import Job, ChatMessage, ApiStatusType
import utils.llm_proxy as llm_manager
async def get_last_item(generator):
"""Get the last item from an async generator"""
last_item = None
async for item in generator:
last_item = item
return last_item
def filter_and_paginate(
items: List[Any],
page: int = 1,
limit: int = 20,
sort_by: Optional[str] = None,
sort_order: str = "desc",
filters: Optional[Dict] = None,
) -> Tuple[List[Any], int]:
"""Filter, sort, and paginate items"""
filtered_items = items.copy()
# Apply filters (simplified filtering logic)
if filters:
for key, value in filters.items():
if isinstance(filtered_items[0], dict) and key in filtered_items[0]:
filtered_items = [item for item in filtered_items if item.get(key) == value]
elif hasattr(filtered_items[0], key) if filtered_items else False:
filtered_items = [item for item in filtered_items if getattr(item, key, None) == value]
# Sort items
if sort_by and filtered_items:
reverse = sort_order.lower() == "desc"
try:
if isinstance(filtered_items[0], dict):
filtered_items.sort(key=lambda x: x.get(sort_by, ""), reverse=reverse)
else:
filtered_items.sort(key=lambda x: getattr(x, sort_by, ""), reverse=reverse)
except (AttributeError, TypeError):
pass # Skip sorting if attribute doesn't exist or isn't comparable
# Paginate
total = len(filtered_items)
start = (page - 1) * limit
end = start + limit
paginated_items = filtered_items[start:end]
return paginated_items, total
async def stream_agent_response(chat_agent, user_message, chat_session_data=None, database=None) -> StreamingResponse:
"""Stream agent response with proper formatting"""
async def message_stream_generator():
"""Generator to stream messages with persistence"""
final_message = None
import utils.llm_proxy as llm_manager
async for generated_message in chat_agent.generate(
llm=llm_manager.get_llm(),
model=defines.model,
session_id=user_message.session_id,
prompt=user_message.content,
):
if generated_message.status == ApiStatusType.ERROR:
logger.error(f"❌ AI generation error: {generated_message.content}")
yield f"data: {json.dumps({'status': 'error'})}\n\n"
return
# Store reference to the complete AI message
if generated_message.status == ApiStatusType.DONE:
final_message = generated_message
# If the message is not done, convert it to a ChatMessageBase to remove
# metadata and other unnecessary fields for streaming
if generated_message.status != ApiStatusType.DONE:
from models import ChatMessageStreaming, ChatMessageStatus
if not isinstance(generated_message, ChatMessageStreaming) and not isinstance(
generated_message, ChatMessageStatus
):
raise TypeError(
f"Expected ChatMessageStreaming or ChatMessageStatus, got {type(generated_message)}"
)
json_data = generated_message.model_dump(mode="json", by_alias=True)
json_str = json.dumps(json_data)
yield f"data: {json_str}\n\n"
# After streaming is complete, persist the final AI message to database
if final_message and final_message.status == ApiStatusType.DONE:
try:
if database and chat_session_data:
await database.add_chat_message(final_message.session_id, final_message.model_dump())
logger.info(f"🤖 Message saved to database for session {final_message.session_id}")
# Update session last activity again
chat_session_data["lastActivity"] = datetime.now(UTC).isoformat()
await database.set_chat_session(final_message.session_id, chat_session_data)
except Exception as e:
logger.error(f"❌ Failed to save message to database: {e}")
return StreamingResponse(
message_stream_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Nginx
"X-Content-Type-Options": "nosniff",
"Access-Control-Allow-Origin": "*", # Adjust for your CORS needs
"Transfer-Encoding": "chunked",
},
)
def get_candidate_files_dir(username: str) -> pathlib.Path:
"""Get the files directory for a candidate"""
files_dir = pathlib.Path(defines.user_dir) / username / "files"
files_dir.mkdir(parents=True, exist_ok=True)
return files_dir
def get_document_type_from_filename(filename: str) -> DocumentType:
"""Determine document type from filename extension"""
extension = pathlib.Path(filename).suffix.lower()
type_mapping = {
".pdf": DocumentType.PDF,
".docx": DocumentType.DOCX,
".doc": DocumentType.DOCX,
".txt": DocumentType.TXT,
".md": DocumentType.MARKDOWN,
".markdown": DocumentType.MARKDOWN,
".png": DocumentType.IMAGE,
".jpg": DocumentType.IMAGE,
".jpeg": DocumentType.IMAGE,
".gif": DocumentType.IMAGE,
}
return type_mapping.get(extension, DocumentType.TXT)
def get_skill_cache_key(candidate_id: str, skill: str) -> str:
"""Generate a unique cache key for skill match"""
# Create cache key for this specific candidate + skill combination
skill_hash = hashlib.md5(skill.lower().encode()).hexdigest()[:8]
return f"skill_match:{candidate_id}:{skill_hash}"
async def reformat_as_markdown(database, candidate_entity, content: str):
"""Reformat content as markdown using AI agent"""
from models import ChatContextType, MOCK_UUID, ChatMessageError, ChatMessageStatus, ApiActivityType, ChatMessage
import utils.llm_proxy as llm_manager
chat_agent = candidate_entity.get_or_create_agent(agent_type=ChatContextType.JOB_REQUIREMENTS)
if not chat_agent:
error_message = ChatMessageError(session_id=MOCK_UUID, content="No agent found for job requirements chat type")
yield error_message
return
status_message = ChatMessageStatus(
session_id=MOCK_UUID, content="Reformatting job description as markdown...", activity=ApiActivityType.CONVERTING
)
yield status_message
message = None
async for message in chat_agent.llm_one_shot(
llm=llm_manager.get_llm(),
model=defines.model,
session_id=MOCK_UUID,
prompt=content,
system_prompt="""
You are a document editor. Take the provided job description and reformat as legible markdown.
Return only the markdown content, no other text. Make sure all content is included.
""",
):
pass
if not message or not isinstance(message, ChatMessage):
logger.error("❌ Failed to reformat job description to markdown")
error_message = ChatMessageError(session_id=MOCK_UUID, content="Failed to reformat job description")
yield error_message
return
chat_message: ChatMessage = message
try:
chat_message.content = chat_agent.extract_markdown_from_text(chat_message.content)
except Exception:
pass
logger.info("✅ Successfully converted content to markdown")
yield chat_message
return
async def create_job_from_content(database, current_user, content: str):
"""Create a job from content using AI analysis"""
from models import (
MOCK_UUID,
ApiStatusType,
ChatMessageError,
ChatMessageStatus,
ApiActivityType,
ChatContextType,
JobRequirementsMessage,
)
status_message = ChatMessageStatus(
session_id=MOCK_UUID,
content=f"Initiating connection with {current_user.first_name}'s AI agent...",
activity=ApiActivityType.INFO,
)
yield status_message
await asyncio.sleep(0) # Let the status message propagate
import entities.entity_manager as entities
async with entities.get_candidate_entity(candidate=current_user) as candidate_entity:
message = None
async for message in reformat_as_markdown(database, candidate_entity, content):
# Only yield one final DONE message
if message.status != ApiStatusType.DONE:
yield message
if not message or not isinstance(message, ChatMessage):
error_message = ChatMessageError(session_id=MOCK_UUID, content="Failed to reformat job description")
yield error_message
return
markdown_message = message
chat_agent = candidate_entity.get_or_create_agent(agent_type=ChatContextType.JOB_REQUIREMENTS)
if not chat_agent:
error_message = ChatMessageError(
session_id=MOCK_UUID, content="No agent found for job requirements chat type"
)
yield error_message
return
status_message = ChatMessageStatus(
session_id=MOCK_UUID,
content="Analyzing document for company and requirement details...",
activity=ApiActivityType.SEARCHING,
)
yield status_message
message = None
async for message in chat_agent.generate(
llm=llm_manager.get_llm(), model=defines.model, session_id=MOCK_UUID, prompt=markdown_message.content
):
if message.status != ApiStatusType.DONE:
yield message
if not message or not isinstance(message, JobRequirementsMessage):
error_message = ChatMessageError(
session_id=MOCK_UUID, content="Job extraction did not convert successfully"
)
yield error_message
return
job_requirements: JobRequirementsMessage = message
logger.info(f"✅ Successfully generated job requirements for job {job_requirements.id}")
yield job_requirements
return
def get_requirements_list(job: Job) -> List[Dict[str, str]]:
requirements: List[Dict[str, str]] = []
if job.requirements:
if job.requirements.technical_skills:
if job.requirements.technical_skills.required:
requirements.extend(
[
{"requirement": req, "domain": "Technical Skills (required)"}
for req in job.requirements.technical_skills.required
]
)
if job.requirements.technical_skills.preferred:
requirements.extend(
[
{"requirement": req, "domain": "Technical Skills (preferred)"}
for req in job.requirements.technical_skills.preferred
]
)
if job.requirements.experience_requirements:
if job.requirements.experience_requirements.required:
requirements.extend(
[
{"requirement": req, "domain": "Experience (required)"}
for req in job.requirements.experience_requirements.required
]
)
if job.requirements.experience_requirements.preferred:
requirements.extend(
[
{"requirement": req, "domain": "Experience (preferred)"}
for req in job.requirements.experience_requirements.preferred
]
)
if job.requirements.soft_skills:
requirements.extend([{"requirement": req, "domain": "Soft Skills"} for req in job.requirements.soft_skills])
if job.requirements.experience:
requirements.extend([{"requirement": req, "domain": "Experience"} for req in job.requirements.experience])
if job.requirements.education:
requirements.extend([{"requirement": req, "domain": "Education"} for req in job.requirements.education])
if job.requirements.certifications:
requirements.extend(
[{"requirement": req, "domain": "Certifications"} for req in job.requirements.certifications]
)
if job.requirements.preferred_attributes:
requirements.extend(
[
{"requirement": req, "domain": "Preferred Attributes"}
for req in job.requirements.preferred_attributes
]
)
return requirements