backstory/src/backend/routes/candidates.py

2118 lines
87 KiB
Python

"""
Candidate routes
"""
import json
import pathlib
import re
import shutil
import secrets
import uuid
import os
from datetime import datetime, timezone, UTC
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, File, Form, Depends, Body, Path, Query, BackgroundTasks, UploadFile
from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
from pydantic import ValidationError
import backstory_traceback as backstory_traceback
from agents.generate_resume import GenerateResume
import agents.base as agents
from utils.rate_limiter import rate_limited
from utils.helpers import (
filter_and_paginate,
get_document_type_from_filename,
get_skill_cache_key,
get_requirements_list,
)
from database.manager import RedisDatabase
from email_service import email_service
from logger import logger
from models import (
MOCK_UUID,
ApiActivityType,
ApiMessageType,
ApiStatusType,
CandidateAI,
ChatContextType,
ChatMessageError,
ChatMessageRagSearch,
ChatMessageResume,
ChatMessageSkillAssessment,
ChatMessageStatus,
ChatMessageStreaming,
ChatMessageUser,
ChatSession,
Document,
DocumentContentResponse,
DocumentListResponse,
DocumentMessage,
DocumentOptions,
DocumentType,
DocumentUpdateRequest,
Job,
JobAnalysis,
JobRequirements,
CreateCandidateRequest,
Candidate,
RAGDocumentRequest,
RagContentMetadata,
RagContentResponse,
SkillAssessment,
SkillStrength,
UserType,
)
from utils.dependencies import (
get_current_admin,
get_database,
get_current_user,
get_current_user_or_guest,
prometheus_collector,
)
from utils.responses import create_paginated_response, create_success_response, create_error_response
import utils.llm_proxy as llm_manager
import entities.entity_manager as entities
import defines
from utils.auth_utils import AuthenticationManager
# Create router for authentication endpoints
router = APIRouter(prefix="/candidates", tags=["candidates"])
# ============================
# Candidate Endpoints
# ============================
@router.post("/ai")
async def create_candidate_ai(
background_tasks: BackgroundTasks,
user_message: ChatMessageUser = Body(...),
admin: Candidate = Depends(get_current_admin),
database: RedisDatabase = Depends(get_database),
):
"""Create a new candidate using AI-generated data"""
try:
async with entities.get_candidate_entity(candidate=admin) as admin_entity:
generate_agent = agents.get_or_create_agent(
agent_type=ChatContextType.GENERATE_PERSONA,
prometheus_collector=prometheus_collector,
user=admin_entity,
)
if not generate_agent:
logger.warning("⚠️ Unable to create AI generation agent.")
return JSONResponse(
status_code=400,
content=create_error_response("AGENT_NOT_FOUND", "Unable to create AI generation agent"),
)
persona_message = None
resume_message = None
state = 0 # 0 -- create persona, 1 -- create resume
async for generated_message in generate_agent.generate(
llm=llm_manager.get_llm(),
model=defines.model,
session_id=user_message.session_id,
prompt=user_message.content,
):
if isinstance(generated_message, ChatMessageError):
error_message: ChatMessageError = generated_message
logger.error(f"❌ AI generation error: {error_message.content}")
return JSONResponse(
status_code=500, content=create_error_response("AI_GENERATION_ERROR", error_message.content)
)
if isinstance(generated_message, ChatMessageRagSearch):
raise ValueError("AI generation returned a RAG search message instead of a persona")
if generated_message.status == ApiStatusType.DONE and state == 0:
persona_message = generated_message
state = 1 # Switch to resume generation
elif generated_message.status == ApiStatusType.DONE and state == 1:
resume_message = generated_message
if not persona_message:
logger.error("❌ AI generation failed: No message generated")
return JSONResponse(
status_code=500,
content=create_error_response("AI_GENERATION_FAILED", "Failed to generate AI candidate data"),
)
if not isinstance(persona_message, ChatMessageUser):
logger.error(f"❌ AI generation returned unexpected message type: {type(persona_message)}")
return JSONResponse(
status_code=500,
content=create_error_response(
"AI_GENERATION_ERROR", "AI generation did not return a valid user message"
),
)
if not isinstance(resume_message, ChatMessageUser):
logger.error(f"❌ AI generation returned unexpected resume message type: {type(resume_message)}")
return JSONResponse(
status_code=500,
content=create_error_response(
"AI_GENERATION_ERROR", "AI generation did not return a valid resume message"
),
)
try:
current_time = datetime.now(timezone.utc)
candidate_data = json.loads(persona_message.content)
candidate_data.update(
{
"user_type": "candidate",
"created_at": current_time.isoformat(),
"updated_at": current_time.isoformat(),
"status": "active", # Directly active for AI-generated candidates
"is_admin": False, # Default to non-admin
"is_AI": True, # Mark as AI-generated
}
)
candidate = CandidateAI.model_validate(candidate_data)
except ValidationError as e:
logger.error("❌ AI candidate data validation failed")
for lines in backstory_traceback.format_exc().splitlines():
logger.error(lines)
logger.error(json.dumps(persona_message.content, indent=2))
for error in e.errors():
print(f"Field: {error['loc'][0]}, Error: {error['msg']}")
return JSONResponse(
status_code=400,
content=create_error_response("AI_VALIDATION_FAILED", "AI-generated data validation failed"),
)
except Exception:
# Log the error and return a validation error response
for lines in backstory_traceback.format_exc().splitlines():
logger.error(lines)
logger.error(json.dumps(persona_message.content, indent=2))
return JSONResponse(
status_code=400,
content=create_error_response("AI_VALIDATION_FAILED", "AI-generated data validation failed"),
)
logger.info(f"🤖 AI-generated candidate {candidate.username} created with email {candidate.email}")
candidate_data = candidate.model_dump(by_alias=False, exclude_unset=False)
# Store in database
await database.set_candidate(candidate.id, candidate_data)
user_auth_data = {
"id": candidate.id,
"type": "candidate",
"email": candidate.email,
"username": candidate.username,
}
await database.set_user(candidate.email, user_auth_data)
await database.set_user(candidate.username, user_auth_data)
await database.set_user_by_id(candidate.id, user_auth_data)
document_content = None
if resume_message:
document_id = str(uuid.uuid4())
document_type = DocumentType.MARKDOWN
document_content = resume_message.content.encode("utf-8")
document_filename = "resume.md"
document_data = Document(
id=document_id,
filename=document_filename,
original_name=document_filename,
type=document_type,
size=len(document_content),
upload_date=datetime.now(UTC),
owner_id=candidate.id,
)
file_path = os.path.join(defines.user_dir, candidate.username, "rag-content", document_filename)
# Ensure the directory exists
rag_content_dir = pathlib.Path(defines.user_dir) / candidate.username / "rag-content"
rag_content_dir.mkdir(parents=True, exist_ok=True)
try:
with open(file_path, "wb") as f:
f.write(document_content)
logger.info(f"📁 File saved to disk: {file_path}")
except Exception as e:
logger.error(f"❌ Failed to save file to disk: {e}")
return JSONResponse(
status_code=500, content=create_error_response("FILE_SAVE_ERROR", "Failed to resume file to disk")
)
# Store document metadata in database
await database.set_document(document_id, document_data.model_dump())
await database.add_document_to_candidate(candidate.id, document_id)
logger.info(f"📄 Document metadata saved for candidate {candidate.id}: {document_id}")
logger.info(
f"✅ AI-generated candidate created: {candidate_data['email']}, resume is {len(document_content) if document_content else 0} bytes"
)
return create_success_response(
{
"message": "AI-generated candidate created successfully",
"candidate": candidate_data,
"resume": document_content,
}
)
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"❌ AI Candidate creation error: {e}")
return JSONResponse(
status_code=500,
content=create_error_response("AI_CREATION_FAILED", "Failed to create AI-generated candidate"),
)
@router.post("")
async def create_candidate_with_verification(
request: CreateCandidateRequest, background_tasks: BackgroundTasks, database: RedisDatabase = Depends(get_database)
):
"""Create a new candidate with email verification"""
try:
# Initialize authentication manager
auth_manager = AuthenticationManager(database)
# Check if user already exists
user_exists, conflict_field = await auth_manager.check_user_exists(request.email, request.username)
if user_exists and conflict_field:
logger.warning(
f"⚠️ Attempted to create user with existing {conflict_field}: {getattr(request, conflict_field)}"
)
return JSONResponse(
status_code=409,
content=create_error_response("USER_EXISTS", f"A user with this {conflict_field} already exists"),
)
# Generate candidate data (but don't activate yet)
candidate_id = str(uuid.uuid4())
current_time = datetime.now(timezone.utc)
all_candidates = await database.get_all_candidates()
is_admin = False
if len(all_candidates) == 0:
is_admin = True
candidate_data = {
"id": candidate_id,
"userType": "candidate",
"email": request.email,
"username": request.username,
"firstName": request.first_name,
"lastName": request.last_name,
"fullName": f"{request.first_name} {request.last_name}",
"phone": request.phone,
"createdAt": current_time.isoformat(),
"updatedAt": current_time.isoformat(),
"status": "pending", # Not active until email verified
"isAdmin": is_admin,
}
# Generate verification token
verification_token = secrets.token_urlsafe(32)
# Store verification token with user data
await database.store_email_verification_token(
request.email,
verification_token,
"candidate",
{
"candidate_data": candidate_data,
"password": request.password, # Store temporarily for verification
"username": request.username,
},
)
# Send verification email in background
background_tasks.add_task(
email_service.send_verification_email,
request.email,
verification_token,
f"{request.first_name} {request.last_name}",
)
logger.info(f"✅ Candidate registration initiated for: {request.email}")
return create_success_response(
{
"message": f"Registration successful! Please check your email to verify your account. {'As the first user on this sytem, you have admin priveledges.' if is_admin else ''}",
"email": request.email,
"verificationRequired": True,
}
)
except Exception as e:
logger.error(f"❌ Candidate creation error: {e}")
return JSONResponse(
status_code=500, content=create_error_response("CREATION_FAILED", "Failed to create candidate account")
)
@router.post("/documents/upload")
async def upload_candidate_document(
file: UploadFile = File(...),
options_data: str = Form(..., alias="options"),
current_user=Depends(get_current_user),
database: RedisDatabase = Depends(get_database),
):
try:
# Parse the JSON string and create DocumentOptions object
options_dict = json.loads(options_data)
options: DocumentOptions = DocumentOptions.model_validate(options_dict)
except (json.JSONDecodeError, ValidationError):
return StreamingResponse(
iter(
[
json.dumps(
ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="Invalid options format. Please provide valid JSON.",
).model_dump(mode="json", by_alias=True)
)
]
),
media_type="text/event-stream",
)
# Check file size (limit to 10MB)
max_size = 10 * 1024 * 1024 # 10MB
file_content = await file.read()
if len(file_content) > max_size:
logger.info(f"⚠️ File too large: {file.filename} ({len(file_content)} bytes)")
return StreamingResponse(
iter(
[
json.dumps(
ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="File size exceeds 10MB limit",
).model_dump(mode="json", by_alias=True)
)
]
),
media_type="text/event-stream",
)
if len(file_content) == 0:
logger.info(f"⚠️ File is empty: {file.filename}")
return StreamingResponse(
iter(
[
json.dumps(
ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="File is empty",
).model_dump(mode="json", by_alias=True)
)
]
),
media_type="text/event-stream",
)
"""Upload a document for the current candidate"""
async def upload_stream_generator(file_content):
# Verify user is a candidate
if current_user.user_type != "candidate":
logger.warning(f"⚠️ Unauthorized upload attempt by user type: {current_user.user_type}")
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="Only candidates can upload documents",
)
yield error_message
return
candidate: Candidate = current_user
file.filename = re.sub(r"^.*/", "", file.filename) if file.filename else "" # Sanitize filename
if not file.filename or file.filename.strip() == "":
logger.warning("⚠️ File upload attempt with missing filename")
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="File must have a valid filename",
)
yield error_message
return
logger.info(
f"📁 Received file upload: filename='{file.filename}', content_type='{file.content_type}', size='{len(file_content)} bytes'"
)
directory = "rag-content" if options.include_in_rag else "files"
directory = "jobs" if options.is_job_document else directory
# Ensure the file does not already exist either in 'files' or in 'rag-content'
dir_path = os.path.join(defines.user_dir, candidate.username, directory)
if not os.path.exists(dir_path):
os.makedirs(dir_path, exist_ok=True)
file_path = os.path.join(dir_path, file.filename)
if os.path.exists(file_path):
if not options.overwrite:
logger.warning(f"⚠️ File already exists: {file_path}")
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content=f"File with this name already exists in the '{directory}' directory",
)
yield error_message
return
else:
logger.info(f"🔄 Overwriting existing file: {file_path}")
status_message = ChatMessageStatus(
session_id=MOCK_UUID, # No session ID for document uploads
content=f"Overwriting existing file: {file.filename}",
activity=ApiActivityType.INFO,
)
yield status_message
# Validate file type
allowed_types = [".txt", ".md", ".docx", ".pdf", ".png", ".jpg", ".jpeg", ".gif"]
file_extension = pathlib.Path(file.filename).suffix.lower() if file.filename else ""
if file_extension not in allowed_types:
logger.warning(f"⚠️ Invalid file type: {file_extension} for file {file.filename}")
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content=f"File type {file_extension} not supported. Allowed types: {', '.join(allowed_types)}",
)
yield error_message
return
# Create document metadata
document_id = str(uuid.uuid4())
document_type = get_document_type_from_filename(file.filename or "unknown.txt")
document_data = Document(
id=document_id,
filename=file.filename or f"document_{document_id}",
original_name=file.filename or f"document_{document_id}",
type=document_type,
size=len(file_content),
upload_date=datetime.now(UTC),
options=options,
owner_id=candidate.id,
)
# Save file to disk
directory = os.path.join(defines.user_dir, candidate.username, directory)
file_path = os.path.join(directory, file.filename)
try:
with open(file_path, "wb") as f:
f.write(file_content)
logger.info(f"📁 File saved to disk: {file_path}")
except Exception as e:
logger.error(f"❌ Failed to save file to disk: {e}")
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="Failed to save file to disk",
)
yield error_message
return
converted = False
if document_type != DocumentType.MARKDOWN and document_type != DocumentType.TXT:
p = pathlib.Path(file_path)
p_as_md = p.with_suffix(".md")
# If file_path.md doesn't exist or file_path is newer than file_path.md,
# fire off markitdown
if (not p_as_md.exists()) or (p.stat().st_mtime > p_as_md.stat().st_mtime):
status_message = ChatMessageStatus(
session_id=MOCK_UUID, # No session ID for document uploads
content=f"Converting content from {document_type}...",
activity=ApiActivityType.CONVERTING,
)
yield status_message
try:
from markitdown import MarkItDown # type: ignore
md = MarkItDown(enable_plugins=False) # Set to True to enable plugins
result = md.convert(file_path, output_format="markdown")
p_as_md.write_text(result.text_content)
file_content = result.text_content
converted = True
logger.info(f"✅ Converted {file.filename} to Markdown format: {p_as_md}")
file_path = p_as_md
except Exception as e:
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content=f"Failed to convert {file.filename} to Markdown.",
)
yield error_message
logger.error(f"❌ Error converting {file_path} to Markdown: {e}")
return
# Store document metadata in database
await database.set_document(document_id, document_data.model_dump())
await database.add_document_to_candidate(candidate.id, document_id)
logger.info(f"📄 Document uploaded: {file.filename} for candidate {candidate.username}")
chat_message = DocumentMessage(
session_id=MOCK_UUID, # No session ID for document uploads
type=ApiMessageType.JSON,
status=ApiStatusType.DONE,
document=document_data,
converted=converted,
content=file_content,
)
yield chat_message
try:
async def to_json(method):
try:
async for message in method:
json_data = message.model_dump(mode="json", by_alias=True)
json_str = json.dumps(json_data)
yield f"data: {json_str}\n\n".encode("utf-8")
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"Error in to_json conversion: {e}")
return
return StreamingResponse(
to_json(upload_stream_generator(file_content)),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Nginx
"X-Content-Type-Options": "nosniff",
"Access-Control-Allow-Origin": "*", # Adjust for your CORS needs
"Transfer-Encoding": "chunked",
},
)
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"❌ Document upload error: {e}")
return StreamingResponse(
iter(
[
json.dumps(
ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="Failed to upload document",
).model_dump(mode="json", by_alias=True)
)
]
),
media_type="text/event-stream",
)
@router.post("/profile/upload")
async def upload_candidate_profile(
file: UploadFile = File(...),
current_user=Depends(get_current_user),
database: RedisDatabase = Depends(get_database),
):
"""Upload a document for the current candidate"""
try:
# Verify user is a candidate
if current_user.user_type != "candidate":
logger.warning(f"⚠️ Unauthorized upload attempt by user type: {current_user.user_type}")
return JSONResponse(
status_code=403, content=create_error_response("FORBIDDEN", "Only candidates can upload their profile")
)
candidate: Candidate = current_user
# Validate file type
allowed_types = [".png", ".jpg", ".jpeg", ".gif"]
file_extension = pathlib.Path(file.filename).suffix.lower() if file.filename else ""
if file_extension not in allowed_types:
logger.warning(f"⚠️ Invalid file type: {file_extension} for file {file.filename}")
return JSONResponse(
status_code=400,
content=create_error_response(
"INVALID_FILE_TYPE",
f"File type {file_extension} not supported. Allowed types: {', '.join(allowed_types)}",
),
)
# Check file size (limit to 5MB)
max_size = 5 * 1024 * 1024 # 2MB
file_content = await file.read()
if len(file_content) > max_size:
logger.info(f"⚠️ File too large: {file.filename} ({len(file_content)} bytes)")
return JSONResponse(
status_code=400, content=create_error_response("FILE_TOO_LARGE", "File size exceeds 10MB limit")
)
# Save file to disk as "profile.<extension>"
_, extension = os.path.splitext(file.filename or "")
file_path = os.path.join(defines.user_dir, candidate.username)
os.makedirs(file_path, exist_ok=True)
file_path = os.path.join(file_path, f"profile{extension}")
try:
with open(file_path, "wb") as f:
f.write(file_content)
logger.info(f"📁 File saved to disk: {file_path}")
except Exception as e:
logger.error(f"❌ Failed to save file to disk: {e}")
return JSONResponse(
status_code=500, content=create_error_response("FILE_SAVE_ERROR", "Failed to save file to disk")
)
updates = {"updated_at": datetime.now(UTC).isoformat(), "profile_image": f"profile{extension}"}
candidate_dict = candidate.model_dump()
candidate_dict.update(updates)
updated_candidate = Candidate.model_validate(candidate_dict)
await database.set_candidate(candidate.id, updated_candidate.model_dump())
logger.info(f"📄 Profile image uploaded: {updated_candidate.profile_image} for candidate {candidate.id}")
return create_success_response(True)
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"❌ Document upload error: {e}")
return JSONResponse(status_code=500, content=create_error_response("UPLOAD_ERROR", "Failed to upload document"))
@router.get("/profile/{username}")
async def get_candidate_profile_image(
username: str = Path(..., description="Username of the candidate"),
# current_user = Depends(get_current_user),
database: RedisDatabase = Depends(get_database),
):
"""Get profile image of a candidate by username"""
try:
all_candidates_data = await database.get_all_candidates()
candidates_list = [Candidate.model_validate(data) for data in all_candidates_data.values()]
# Normalize username to lowercase for case-insensitive search
query_lower = username.lower()
# Filter by search query
candidates_list = [
c for c in candidates_list if (query_lower == c.email.lower() or query_lower == c.username.lower())
]
if not len(candidates_list):
return JSONResponse(status_code=404, content=create_error_response("NOT_FOUND", "Candidate not found"))
candidate = Candidate.model_validate(candidates_list[0])
if not candidate.profile_image:
logger.warning(f"⚠️ Candidate {candidate.username} has no profile image set")
return JSONResponse(status_code=404, content=create_error_response("NOT_FOUND", "Profile image not found"))
file_path = os.path.join(defines.user_dir, candidate.username, candidate.profile_image)
file_path = pathlib.Path(file_path)
if not file_path.exists():
logger.error(f"❌ Profile image file not found on disk: {file_path}")
return JSONResponse(
status_code=404, content=create_error_response("FILE_NOT_FOUND", "Profile image file not found on disk")
)
return FileResponse(
file_path,
media_type=f"image/{file_path.suffix[1:]}", # Get extension without dot
filename=candidate.profile_image,
)
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"❌ Get candidate profile image failed: {str(e)}")
return JSONResponse(
status_code=500, content=create_error_response("FETCH_ERROR", "Failed to retrieve profile image")
)
@router.get("/qr-code/{candidate_id}/{job_id}")
async def get_candidate_qr_code(
candidate_id: str = Path(..., description="ID of the candidate"),
job_id: Optional[str] = Path(..., description="ID of the candidate"),
# current_user = Depends(get_current_user),
database: RedisDatabase = Depends(get_database),
):
"""Get profile image of a candidate by username"""
try:
all_candidates_data = await database.get_all_candidates()
candidates_list = [Candidate.model_validate(data) for data in all_candidates_data.values()]
# Normalize username to lowercase for case-insensitive search
query_lower = candidate_id.lower()
# Filter by search query
candidates_list = [c for c in candidates_list if query_lower == c.id.lower()]
if not len(candidates_list):
return JSONResponse(status_code=404, content=create_error_response("NOT_FOUND", "Candidate not found"))
candidate = Candidate.model_validate(candidates_list[0])
job = None
if job_id:
job_data = await database.get_job(job_id)
if not job_data:
logger.warning(f"⚠️ Job not found for ID: {job_id}")
return JSONResponse(
status_code=404,
content=create_error_response("JOB_NOT_FOUND", f"Job with id '{job_id}' not found"),
)
job = Job.model_validate(job_data)
file_name = f"{job.id}.png" if job else "qrcode.png"
file_path = pathlib.Path(defines.user_dir) / candidate.username / file_name
if not file_path.exists():
import pyqrcode
if job:
qrobj = pyqrcode.create(f"{defines.frontend_url}/job-analysis/{candidate.id}/{job.id}")
else:
qrobj = pyqrcode.create(f"{defines.frontend_url}/u/{candidate.id}")
with open(file_path, "wb") as f:
qrobj.png(f, scale=2)
return FileResponse(
file_path,
media_type=f"image/{file_path.suffix[1:]}", # Get extension without dot
filename="qrcode.png",
)
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"❌ Get candidate QR code failed: {str(e)}")
return JSONResponse(status_code=500, content=create_error_response("FETCH_ERROR", "Failed to retrieve QR code"))
@router.get("/documents")
async def get_candidate_documents(
current_user=Depends(get_current_user), database: RedisDatabase = Depends(get_database)
):
"""Get all documents for the current candidate"""
try:
# Verify user is a candidate
if current_user.user_type != "candidate":
logger.warning(f"⚠️ Unauthorized access attempt by user type: {current_user.user_type}")
return JSONResponse(
status_code=403, content=create_error_response("FORBIDDEN", "Only candidates can access documents")
)
candidate: Candidate = current_user
# Get documents from database
documents_data = await database.get_candidate_documents(candidate.id)
documents = [Document.model_validate(doc_data) for doc_data in documents_data]
# Sort by upload date (newest first)
documents.sort(key=lambda x: x.upload_date, reverse=True)
response_data = DocumentListResponse(documents=documents, total=len(documents))
return create_success_response(response_data.model_dump(by_alias=True))
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"❌ Get candidate documents error: {e}")
return JSONResponse(
status_code=500, content=create_error_response("FETCH_ERROR", "Failed to retrieve documents")
)
@router.get("/documents/{document_id}/content")
async def get_document_content(
document_id: str = Path(...),
current_user=Depends(get_current_user),
database: RedisDatabase = Depends(get_database),
):
"""Get document content by ID"""
try:
# Verify user is a candidate
if current_user.user_type != "candidate":
return JSONResponse(
status_code=403, content=create_error_response("FORBIDDEN", "Only candidates can access documents")
)
candidate: Candidate = current_user
# Get document metadata
document_data = await database.get_document(document_id)
if not document_data:
return JSONResponse(status_code=404, content=create_error_response("NOT_FOUND", "Document not found"))
document = Document.model_validate(document_data)
# Verify document belongs to current candidate
if document.owner_id != candidate.id:
return JSONResponse(
status_code=403,
content=create_error_response("FORBIDDEN", "Cannot access another candidate's document"),
)
file_path = os.path.join(
defines.user_dir,
candidate.username,
"rag-content" if document.options.include_in_rag else "files",
document.original_name,
)
file_path = pathlib.Path(file_path)
if document.type not in [DocumentType.TXT, DocumentType.MARKDOWN]:
file_path = file_path.with_suffix(".md")
if not file_path.exists():
logger.error(f"❌ Document file not found on disk: {file_path}")
return JSONResponse(
status_code=404, content=create_error_response("FILE_NOT_FOUND", "Document file not found on disk")
)
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
response = DocumentContentResponse(
document_id=document_id,
filename=document.filename,
type=document.type,
content=content,
size=document.size,
)
return create_success_response(response.model_dump(by_alias=True))
except Exception as e:
logger.error(f"❌ Failed to read document file: {e}")
return JSONResponse(
status_code=500, content=create_error_response("READ_ERROR", "Failed to read document content")
)
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"❌ Get document content error: {e}")
return JSONResponse(
status_code=500, content=create_error_response("FETCH_ERROR", "Failed to retrieve document content")
)
@router.patch("/documents/{document_id}")
async def update_document(
document_id: str = Path(...),
updates: DocumentUpdateRequest = Body(...),
current_user=Depends(get_current_user),
database: RedisDatabase = Depends(get_database),
):
"""Update document metadata (filename, RAG status)"""
try:
# Verify user is a candidate
if current_user.user_type != "candidate":
return JSONResponse(
status_code=403, content=create_error_response("FORBIDDEN", "Only candidates can update documents")
)
candidate: Candidate = current_user
# Get document metadata
document_data = await database.get_document(document_id)
if not document_data:
return JSONResponse(status_code=404, content=create_error_response("NOT_FOUND", "Document not found"))
document = Document.model_validate(document_data)
# Verify document belongs to current candidate
if document.owner_id != candidate.id:
return JSONResponse(
status_code=403,
content=create_error_response("FORBIDDEN", "Cannot update another candidate's document"),
)
update_options = updates.options if updates.options else DocumentOptions()
if document.options.include_in_rag != update_options.include_in_rag:
# If RAG status is changing, we need to handle file movement
rag_dir = os.path.join(defines.user_dir, candidate.username, "rag-content")
file_dir = os.path.join(defines.user_dir, candidate.username, "files")
os.makedirs(rag_dir, exist_ok=True)
os.makedirs(file_dir, exist_ok=True)
rag_path = os.path.join(rag_dir, document.original_name)
file_path = os.path.join(file_dir, document.original_name)
if update_options.include_in_rag:
src = pathlib.Path(file_path)
dst = pathlib.Path(rag_path)
# Move to RAG directory
src.rename(dst)
logger.info("📁 Moved file to RAG directory")
if document.type != DocumentType.MARKDOWN and document.type != DocumentType.TXT:
src = pathlib.Path(file_path)
src_as_md = src.with_suffix(".md")
if src_as_md.exists():
dst = pathlib.Path(rag_path).with_suffix(".md")
src_as_md.rename(dst)
else:
src = pathlib.Path(rag_path)
dst = pathlib.Path(file_path)
# Move to regular files directory
src.rename(dst)
logger.info("📁 Moved file to regular files directory")
if document.type != DocumentType.MARKDOWN and document.type != DocumentType.TXT:
src_as_md = src.with_suffix(".md")
if src_as_md.exists():
dst = pathlib.Path(file_path).with_suffix(".md")
src_as_md.rename(dst)
# Apply updates
update_dict = {}
if updates.filename is not None:
update_dict["filename"] = updates.filename.strip()
if update_options.include_in_rag is not None:
update_dict["include_in_rag"] = update_options.include_in_rag
if not update_dict:
return JSONResponse(
status_code=400, content=create_error_response("NO_UPDATES", "No valid updates provided")
)
# Add timestamp
update_dict["updatedAt"] = datetime.now(UTC).isoformat()
# Update in database
updated_data = await database.update_document(document_id, update_dict)
if not updated_data:
return JSONResponse(
status_code=500, content=create_error_response("UPDATE_FAILED", "Failed to update document")
)
updated_document = Document.model_validate(updated_data)
logger.info(f"📄 Document updated: {document_id} for candidate {candidate.username}")
return create_success_response(updated_document.model_dump(by_alias=True))
except Exception as e:
logger.error(f"❌ Update document error: {e}")
return JSONResponse(status_code=500, content=create_error_response("UPDATE_ERROR", "Failed to update document"))
@router.delete("/documents/{document_id}")
async def delete_document(
document_id: str = Path(...),
current_user=Depends(get_current_user),
database: RedisDatabase = Depends(get_database),
):
"""Delete a document and its file"""
try:
# Verify user is a candidate
if current_user.user_type != "candidate":
logger.warning(f"⚠️ Unauthorized delete attempt by user type: {current_user.user_type}")
return JSONResponse(
status_code=403, content=create_error_response("FORBIDDEN", "Only candidates can delete documents")
)
candidate: Candidate = current_user
# Get document metadata
document_data = await database.get_document(document_id)
if not document_data:
logger.warning(f"⚠️ Document not found for deletion: {document_id}")
return JSONResponse(status_code=404, content=create_error_response("NOT_FOUND", "Document not found"))
document = Document.model_validate(document_data)
# Verify document belongs to current candidate
if document.owner_id != candidate.id:
logger.warning(f"⚠️ Unauthorized delete attempt on document {document_id} by candidate {candidate.username}")
return JSONResponse(
status_code=403,
content=create_error_response("FORBIDDEN", "Cannot delete another candidate's document"),
)
# Delete file from disk
file_path = os.path.join(
defines.user_dir,
candidate.username,
"rag-content" if document.options.include_in_rag else "files",
document.original_name,
)
file_path = pathlib.Path(file_path)
try:
if file_path.exists():
file_path.unlink()
logger.info(f"🗑️ File deleted from disk: {file_path}")
else:
logger.warning(f"⚠️ File not found on disk during deletion: {file_path}")
# Delete side-car file if it exists
if document.type != DocumentType.MARKDOWN and document.type != DocumentType.TXT:
p = pathlib.Path(file_path)
p_as_md = p.with_suffix(".md")
if p_as_md.exists():
p_as_md.unlink()
except Exception as e:
logger.error(f"❌ Failed to delete file from disk: {e}")
# Continue with metadata deletion even if file deletion fails
# Remove from database
await database.remove_document_from_candidate(candidate.id, document_id)
await database.delete_document(document_id)
logger.info(f"🗑️ Document deleted: {document_id} for candidate {candidate.username}")
return create_success_response({"message": "Document deleted successfully", "documentId": document_id})
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"❌ Delete document error: {e}")
return JSONResponse(status_code=500, content=create_error_response("DELETE_ERROR", "Failed to delete document"))
@router.get("/documents/search")
async def search_candidate_documents(
query: str = Query(..., min_length=1),
current_user=Depends(get_current_user),
database: RedisDatabase = Depends(get_database),
):
"""Search candidate documents by filename"""
try:
# Verify user is a candidate
if current_user.user_type != "candidate":
return JSONResponse(
status_code=403, content=create_error_response("FORBIDDEN", "Only candidates can search documents")
)
candidate: Candidate = current_user
# Search documents
documents_data = await database.search_candidate_documents(candidate.id, query)
documents = [Document.model_validate(doc_data) for doc_data in documents_data]
# Sort by upload date (newest first)
documents.sort(key=lambda x: x.upload_date, reverse=True)
response_data = DocumentListResponse(documents=documents, total=len(documents))
return create_success_response(response_data.model_dump(by_alias=True))
except Exception as e:
logger.error(f"❌ Search documents error: {e}")
return JSONResponse(
status_code=500, content=create_error_response("SEARCH_ERROR", "Failed to search documents")
)
@router.post("/rag-content")
async def post_candidate_vector_content(
rag_document: RAGDocumentRequest = Body(...), current_user=Depends(get_current_user)
):
try:
if current_user.user_type != "candidate":
logger.warning(f"⚠️ Unauthorized access attempt by user type: {current_user.user_type}")
return JSONResponse(
status_code=403, content=create_error_response("FORBIDDEN", "Only candidates can access this endpoint")
)
candidate: Candidate = current_user
async with entities.get_candidate_entity(candidate=candidate) as candidate_entity:
collection = candidate_entity.umap_collection
if not collection:
logger.warning(f"⚠️ No UMAP collection found for candidate {candidate.username}")
return JSONResponse({"error": "No UMAP collection found"}, status_code=404)
for index, id in enumerate(collection.ids):
if id == rag_document.id:
metadata = collection.metadatas[index].copy()
rag_metadata = RagContentMetadata.model_validate(metadata)
content = candidate_entity.file_watcher.prepare_metadata(metadata)
if content:
rag_response = RagContentResponse(id=id, content=content, metadata=rag_metadata)
logger.info(f"✅ Fetched RAG content for document id {id} for candidate {candidate.username}")
else:
logger.warning(f"⚠️ No content found for document id {id} for candidate {candidate.username}")
return JSONResponse(f"No content found for document id {rag_document.id}.", 404)
return create_success_response(rag_response.model_dump(by_alias=True))
logger.warning(
f"⚠️ Document id {rag_document.id} not found in UMAP collection for candidate {candidate.username}"
)
return JSONResponse(f"Document id {rag_document.id} not found.", 404)
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"❌ Post candidate content error: {e}")
return JSONResponse(status_code=500, content=create_error_response("FETCH_ERROR", str(e)))
@router.post("/rag-vectors")
async def post_candidate_vectors(dimensions: int = Body(...), current_user=Depends(get_current_user)):
try:
if current_user.user_type != "candidate":
return JSONResponse(
status_code=403, content=create_error_response("FORBIDDEN", "Only candidates can access this endpoint")
)
candidate: Candidate = current_user
async with entities.get_candidate_entity(candidate=candidate) as candidate_entity:
collection = candidate_entity.umap_collection
if not collection:
results = {"ids": [], "metadatas": [], "documents": [], "embeddings": [], "size": 0}
return create_success_response(results)
if dimensions == 2:
umap_embedding = candidate_entity.file_watcher.umap_embedding_2d
else:
umap_embedding = candidate_entity.file_watcher.umap_embedding_3d
if len(umap_embedding) == 0:
results = {"ids": [], "metadatas": [], "documents": [], "embeddings": [], "size": 0}
return create_success_response(results)
result = {
"ids": collection.ids,
"metadatas": collection.metadatas,
"documents": collection.documents,
"embeddings": umap_embedding.tolist(),
"size": candidate_entity.file_watcher.collection.count(),
}
return create_success_response(result)
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"❌ Post candidate vectors error: {e}")
return JSONResponse(status_code=500, content=create_error_response("FETCH_ERROR", str(e)))
@router.delete("/{candidate_id}")
async def delete_candidate(
candidate_id: str = Path(...),
admin_user=Depends(get_current_admin),
database: RedisDatabase = Depends(get_database),
):
"""Delete a candidate"""
try:
# Check if admin user
if not admin_user.is_admin:
logger.warning(f"⚠️ Unauthorized delete attempt by user {admin_user.id}")
return JSONResponse(
status_code=403, content=create_error_response("FORBIDDEN", "Only admins can delete candidates")
)
# Get candidate data
candidate_data = await database.get_candidate(candidate_id)
if not candidate_data:
logger.warning(f"⚠️ Candidate not found for deletion: {candidate_id}")
return JSONResponse(status_code=404, content=create_error_response("NOT_FOUND", "Candidate not found"))
await entities.entity_manager.remove_entity(candidate_id)
# Delete candidate from database
await database.delete_candidate(candidate_id)
# Optionally delete files and documents associated with the candidate
await database.delete_all_candidate_documents(candidate_id)
file_path = os.path.join(defines.user_dir, candidate_data["username"])
if os.path.exists(file_path):
try:
shutil.rmtree(file_path)
logger.info(f"🗑️ Deleted candidate files directory: {file_path}")
except Exception as e:
logger.error(f"❌ Failed to delete candidate files directory: {e}")
logger.info(f"🗑️ Candidate deleted: {candidate_id} by admin {admin_user.id}")
return create_success_response({"message": "Candidate deleted successfully", "candidateId": candidate_id})
except Exception as e:
logger.error(f"❌ Delete candidate error: {e}")
return JSONResponse(
status_code=500, content=create_error_response("DELETE_ERROR", "Failed to delete candidate")
)
@router.patch("/{candidate_id}")
async def update_candidate(
candidate_id: str = Path(...),
updates: Dict[str, Any] = Body(...),
current_user=Depends(get_current_user),
database: RedisDatabase = Depends(get_database),
):
"""Update a candidate"""
try:
candidate_data = await database.get_candidate(candidate_id)
if not candidate_data:
logger.warning(f"⚠️ Candidate not found for update: {candidate_id}")
return JSONResponse(status_code=404, content=create_error_response("NOT_FOUND", "Candidate not found"))
is_AI = candidate_data.get("is_AI", False)
candidate = CandidateAI.model_validate(candidate_data) if is_AI else Candidate.model_validate(candidate_data)
# Check authorization (user can only update their own profile)
if current_user.is_admin is False and candidate.id != current_user.id:
logger.warning(f"⚠️ Unauthorized update attempt by user {current_user.id} on candidate {candidate_id}")
return JSONResponse(
status_code=403, content=create_error_response("FORBIDDEN", "Cannot update another user's profile")
)
# Apply updates
updates["updatedAt"] = datetime.now(UTC).isoformat()
logger.info(f"🔄 Updating candidate {candidate_id} with data: {updates}")
candidate_dict = candidate.model_dump()
candidate_dict.update(updates)
updated_candidate = (
CandidateAI.model_validate(candidate_dict) if is_AI else Candidate.model_validate(candidate_dict)
)
await database.set_candidate(candidate_id, updated_candidate.model_dump())
return create_success_response(updated_candidate.model_dump(by_alias=True))
except Exception as e:
logger.error(f"❌ Update candidate error: {e}")
return JSONResponse(status_code=400, content=create_error_response("UPDATE_FAILED", str(e)))
@router.get("")
async def get_candidates(
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
sortBy: Optional[str] = Query(None, alias="sortBy"),
sortOrder: str = Query("desc", pattern="^(asc|desc)$", alias="sortOrder"),
filters: Optional[str] = Query(None),
current_user=Depends(get_current_user_or_guest),
database: RedisDatabase = Depends(get_database),
):
"""Get paginated list of candidates"""
try:
# Parse filters if provided
filter_dict = None
if filters:
filter_dict = json.loads(filters)
# Get all candidates from Redis
all_candidates_data = await database.get_all_candidates()
candidates_list = [
Candidate.model_validate(data) if not data.get("is_AI") else CandidateAI.model_validate(data)
for data in all_candidates_data.values()
]
candidates_list = [
c
for c in candidates_list
if c.is_public or (current_user.user_type != UserType.GUEST and c.id == current_user.id)
]
paginated_candidates, total = filter_and_paginate(candidates_list, page, limit, sortBy, sortOrder, filter_dict)
paginated_response = create_paginated_response(
[c.model_dump(by_alias=True) for c in paginated_candidates], page, limit, total
)
return create_success_response(paginated_response)
except Exception as e:
logger.error(f"❌ Get candidates error: {e}")
return JSONResponse(status_code=400, content=create_error_response("FETCH_FAILED", str(e)))
@router.get("/search")
async def search_candidates(
query: str = Query(...),
filters: Optional[str] = Query(None),
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
database: RedisDatabase = Depends(get_database),
):
"""Search candidates"""
try:
# Parse filters
filter_dict = {}
if filters:
filter_dict = json.loads(filters)
# Get all candidates from Redis
all_candidates_data = await database.get_all_candidates()
candidates_list = [Candidate.model_validate(data) for data in all_candidates_data.values()]
# Filter by search query
if query:
query_lower = query.lower()
candidates_list = [
c
for c in candidates_list
if (
query_lower in c.first_name.lower()
or query_lower in c.last_name.lower()
or query_lower in c.email.lower()
or query_lower in c.username.lower()
or any(query_lower in skill.name.lower() for skill in c.skills or [])
)
]
paginated_candidates, total = filter_and_paginate(candidates_list, page, limit, filters=filter_dict)
paginated_response = create_paginated_response(
[c.model_dump(by_alias=True) for c in paginated_candidates], page, limit, total
)
return create_success_response(paginated_response)
except Exception as e:
logger.error(f"❌ Search candidates error: {e}")
return JSONResponse(status_code=400, content=create_error_response("SEARCH_FAILED", str(e)))
@router.post("/rag-search")
async def post_candidate_rag_search(query: str = Body(...), current_user=Depends(get_current_user)):
"""Get chat activity summary for a candidate"""
try:
if current_user.user_type != "candidate":
logger.warning(f"⚠️ Unauthorized RAG search attempt by user {current_user.id}")
return JSONResponse(
status_code=403, content=create_error_response("FORBIDDEN", "Only candidates can access this endpoint")
)
candidate: Candidate = current_user
chat_type = ChatContextType.RAG_SEARCH
# Get RAG search data
async with entities.get_candidate_entity(candidate=candidate) as candidate_entity:
# Entity automatically released when done
chat_agent = candidate_entity.get_or_create_agent(agent_type=chat_type)
if not chat_agent:
return JSONResponse(
status_code=400,
content=create_error_response("AGENT_NOT_FOUND", "No agent found for this chat type"),
)
user_message = ChatMessageUser(
sender_id=candidate.id, session_id=MOCK_UUID, content=query, timestamp=datetime.now(UTC)
)
rag_message: Any = None
async for generated_message in chat_agent.generate(
llm=llm_manager.get_llm(),
model=defines.model,
session_id=user_message.session_id,
prompt=user_message.content,
):
rag_message = generated_message
if not rag_message:
return JSONResponse(
status_code=500,
content=create_error_response("NO_RESPONSE", "No response generated for the RAG search"),
)
final_message: ChatMessageRagSearch = rag_message
return create_success_response(final_message.content[0].model_dump(by_alias=True))
except Exception as e:
logger.error(f"❌ Get candidate chat summary error: {e}")
return JSONResponse(status_code=500, content=create_error_response("SUMMARY_ERROR", str(e)))
# reference can be candidateId, username, or email
@router.get("/{reference}")
async def get_candidate(reference: str = Path(...), database: RedisDatabase = Depends(get_database)):
"""Get a candidate by username"""
try:
# Normalize reference to lowercase for case-insensitive search
query_lower = reference.lower()
all_candidates_data = await database.get_all_candidates()
if not all_candidates_data:
logger.warning("⚠️ No candidates found in database")
return JSONResponse(status_code=404, content=create_error_response("NOT_FOUND", "No candidates found"))
candidate_data = None
for candidate in all_candidates_data.values():
if (
candidate.get("id", "").lower() == query_lower
or candidate.get("username", "").lower() == query_lower
or candidate.get("email", "").lower() == query_lower
):
candidate_data = candidate
break
if not candidate_data:
logger.warning(f"⚠️ Candidate not found for reference: {reference}")
return JSONResponse(status_code=404, content=create_error_response("NOT_FOUND", "Candidate not found"))
candidate = (
Candidate.model_validate(candidate_data)
if not candidate_data.get("is_AI")
else CandidateAI.model_validate(candidate_data)
)
return create_success_response(candidate.model_dump(by_alias=True))
except Exception as e:
logger.error(f"❌ Get candidate error: {e}")
return JSONResponse(status_code=500, content=create_error_response("FETCH_ERROR", str(e)))
@router.get("/{username}/chat-summary")
async def get_candidate_chat_summary(
username: str = Path(...), current_user=Depends(get_current_user), database: RedisDatabase = Depends(get_database)
):
"""Get chat activity summary for a candidate"""
try:
# Find candidate by username
candidate_data = await database.find_candidate_by_username(username)
if not candidate_data:
return JSONResponse(
status_code=404,
content=create_error_response("CANDIDATE_NOT_FOUND", f"Candidate with username '{username}' not found"),
)
summary = await database.get_candidate_chat_summary(candidate_data["id"])
summary["candidate"] = {
"username": candidate_data.get("username"),
"fullName": candidate_data.get("fullName"),
"email": candidate_data.get("email"),
}
return create_success_response(summary)
except Exception as e:
logger.error(f"❌ Get candidate chat summary error: {e}")
return JSONResponse(status_code=500, content=create_error_response("SUMMARY_ERROR", str(e)))
@router.post("/job-analysis")
async def post_job_analysis(
request: JobAnalysis = Body(...),
current_user=Depends(get_current_user_or_guest),
database: RedisDatabase = Depends(get_database),
):
"""Get chat activity summary for a candidate"""
try:
candidate_id = request.candidate_id
candidate_data = await database.get_candidate(candidate_id)
if not candidate_data:
logger.warning(f"⚠️ Candidate not found for ID: {candidate_id}")
return JSONResponse(
status_code=404,
content=create_error_response("CANDIDATE_NOT_FOUND", f"Candidate with id '{candidate_id}' not found"),
)
candidate = Candidate.model_validate(candidate_data)
job_id = request.job_id
job_data = await database.get_job(job_id)
if not job_data:
logger.warning(f"⚠️ Job not found for ID: {job_id}")
return JSONResponse(
status_code=404,
content=create_error_response("JOB_NOT_FOUND", f"Job with id '{job_id}' not found"),
)
job = Job.model_validate(job_data)
requirements = get_requirements_list(job)
logger.info(
f"🔍 Checking skill match for candidate {candidate.username} against job {job.id}'s {len(requirements)} requirements."
)
matched_skills: List[SkillAssessment] = []
for req in requirements:
skill = req.get("requirement", None)
if not skill:
logger.warning(f"⚠️ No 'requirement' found in entry: {req}")
continue
cache_key = get_skill_cache_key(candidate.id, skill)
assessment: SkillAssessment | None = await database.get_cached_skill_match(cache_key)
# Determine if we need to regenerate the assessment
if assessment:
# Get the latest RAG data update time for the current user
user_rag_update_time = await database.get_user_rag_update_time(candidate.id)
updated = assessment.updated_at
# Check if cached result is still valid
# Regenerate if user's RAG data was updated after cache date
if user_rag_update_time and user_rag_update_time >= updated:
logger.info(f"🔄 Out-of-date cached entry for {candidate.username} skill {assessment.skill}")
assessment = None
else:
logger.info(
f"✅ Using cached skill match for {candidate.username} skill {assessment.skill}: {cache_key}"
)
else:
logger.info(f"💾 No cached skill match data: {cache_key}, {candidate.id}, {skill}")
if not assessment:
logger.info(f"💾 No cached skill match data: {cache_key}, {candidate.id}, {skill}")
continue
else:
if assessment.evidence_strength != SkillStrength.UNKNOWN:
logger.info(
f"✅ Assessment found for {candidate.username} skill {assessment.skill}: {assessment.evidence_strength}"
)
matched_skills.append(assessment)
else:
logger.info(
f"❌ Assessment for {candidate.username} skill {assessment.skill} is unknown, skipping."
)
request.skills = matched_skills
return create_success_response(request.model_dump(by_alias=True))
except Exception as e:
logger.error(f"❌ Get candidate job analysis error: {e}")
return JSONResponse(status_code=500, content=create_error_response("JOB_ANALYSIS_ERROR", str(e)))
@router.post("/{candidate_id}/skill-match")
async def get_candidate_skill_match(
candidate_id: str = Path(...),
skill: str = Body(...),
current_user=Depends(get_current_user_or_guest),
database: RedisDatabase = Depends(get_database),
) -> StreamingResponse:
"""Get skill match for a candidate against a skill with caching"""
async def message_stream_generator():
candidate_data = await database.get_candidate(candidate_id)
if not candidate_data:
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content=f"Candidate with ID '{candidate_id}' not found",
)
yield error_message
return
candidate = Candidate.model_validate(candidate_data)
cache_key = get_skill_cache_key(candidate.id, skill)
# Get cached assessment if it exists
assessment: SkillAssessment | None = await database.get_cached_skill_match(cache_key)
if assessment and assessment.skill.lower() != skill.lower():
logger.warning(
f"❌ Cached skill match for {candidate.username} does not match requested skill: {assessment.skill} != {skill} ({cache_key}). Regenerating..."
)
assessment = None
# Determine if we need to regenerate the assessment
if assessment:
# Get the latest RAG data update time for the current user
user_rag_update_time = await database.get_user_rag_update_time(candidate.id)
updated = assessment.updated_at
# Check if cached result is still valid
# Regenerate if user's RAG data was updated after cache date
if user_rag_update_time and user_rag_update_time >= updated:
logger.info(f"🔄 Out-of-date cached entry for {candidate.username} skill {assessment.skill}")
assessment = None
else:
logger.info(
f"✅ Using cached skill match for {candidate.username} skill {assessment.skill}: {cache_key}"
)
else:
logger.info(f"💾 No cached skill match data: {cache_key}, {candidate.id}, {skill}")
if assessment:
# Return cached assessment
skill_message = ChatMessageSkillAssessment(
session_id=MOCK_UUID, # No session ID for document uploads
content=f"Cached skill match found for {candidate.username}",
skill_assessment=assessment,
)
yield skill_message
return
logger.info(f"🔍 Generating skill match for candidate {candidate.username} for skill: {skill}")
async with entities.get_candidate_entity(candidate=candidate) as candidate_entity:
agent = candidate_entity.get_or_create_agent(agent_type=ChatContextType.SKILL_MATCH)
if not agent:
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="No skill match agent found for this candidate",
)
yield error_message
return
# Generate new skill match
final_message = None
async for generated_message in agent.generate(
llm=llm_manager.get_llm(),
model=defines.model,
session_id=MOCK_UUID,
prompt=skill,
):
if generated_message.status == ApiStatusType.ERROR:
if isinstance(generated_message, ChatMessageError):
content = generated_message.content
else:
content = "An error occurred during AI generation"
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content=f"AI generation error: {content}",
)
logger.error(f"{error_message.content}")
yield error_message
return
# If the message is not done, convert it to a ChatMessageBase to remove
# metadata and other unnecessary fields for streaming
if generated_message.status != ApiStatusType.DONE:
if not isinstance(generated_message, ChatMessageStreaming) and not isinstance(
generated_message, ChatMessageStatus
):
raise TypeError(
f"Expected ChatMessageStreaming or ChatMessageStatus, got {type(generated_message)}"
)
yield generated_message # Convert to ChatMessageBase for streaming
# Store reference to the complete AI message
if generated_message.status == ApiStatusType.DONE:
final_message = generated_message
break
if final_message is None:
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="No match found for the given skill",
)
yield error_message
return
if not isinstance(final_message, ChatMessageSkillAssessment):
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="Skill match response is not valid",
)
yield error_message
return
skill_match: ChatMessageSkillAssessment = final_message
assessment = skill_match.skill_assessment
if not assessment:
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="Skill assessment could not be generated",
)
yield error_message
return
await database.cache_skill_match(cache_key, assessment)
logger.info(f"💾 Cached new skill match for candidate {candidate.id} as {cache_key}")
logger.info(f"✅ Skill match: {assessment.evidence_strength} {skill}")
yield skill_match
return
try:
async def to_json(method):
try:
async for message in method:
json_data = message.model_dump(mode="json", by_alias=True)
json_str = json.dumps(json_data)
yield f"data: {json_str}\n\n".encode("utf-8")
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"Error in to_json conversion: {e}")
return
return StreamingResponse(
to_json(message_stream_generator()),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Nginx
"X-Content-Type-Options": "nosniff",
"Access-Control-Allow-Origin": "*", # Adjust for your CORS needs
"Transfer-Encoding": "chunked",
},
)
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"❌ Document upload error: {e}")
return StreamingResponse(
iter(
[
json.dumps(
ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="Failed to generate skill assessment",
).model_dump(mode="json", by_alias=True)
)
]
),
media_type="text/event-stream",
)
@router.post("/job-score")
async def get_candidate_job_score(
job_requirements: JobRequirements = Body(...),
skills: List[SkillAssessment] = Body(...),
current_user=Depends(get_current_user_or_guest),
database: RedisDatabase = Depends(get_database),
):
# Initialize counters
required_skills_total = 0
required_skills_matched = 0
preferred_skills_total = 0
preferred_skills_matched = 0
# Count required technical skills
tech_required = job_requirements.technical_skills.required
required_skills_total += len(tech_required)
# Count preferred technical skills
tech_preferred = job_requirements.technical_skills.preferred
preferred_skills_total += len(tech_preferred)
# Count required experience
exp_required = job_requirements.experience_requirements.required
required_skills_total += len(exp_required)
# Count preferred experience
exp_preferred = job_requirements.experience_requirements.preferred
preferred_skills_total += len(exp_preferred)
# Education requirements count toward required
edu_required = job_requirements.education or []
required_skills_total += len(edu_required)
# Soft skills count toward preferred
soft_skills = job_requirements.soft_skills or []
preferred_skills_total += len(soft_skills)
# Industry knowledge counts toward preferred
certifications = job_requirements.certifications or []
preferred_skills_total += len(certifications)
preferred_attributes = job_requirements.preferred_attributes or []
preferred_skills_total += len(preferred_attributes)
# Check matches in assessment results
for assessment in skills:
evidence_found = assessment.evidence_found
evidence_strength = assessment.evidence_strength
# Consider STRONG and MODERATE evidence as matches
is_match = evidence_found and evidence_strength in ["STRONG", "MODERATE"]
if not is_match:
continue
# Loop through each of the job requirements categories
# and see if the skill matches the required or preferred skills
if assessment.skill in tech_required:
required_skills_matched += 1
elif assessment.skill in tech_preferred:
preferred_skills_matched += 1
elif assessment.skill in exp_required:
required_skills_matched += 1
elif assessment.skill in exp_preferred:
preferred_skills_matched += 1
elif assessment.skill in edu_required:
required_skills_matched += 1
elif assessment.skill in soft_skills:
preferred_skills_matched += 1
elif assessment.skill in certifications:
preferred_skills_matched += 1
elif assessment.skill in preferred_attributes:
preferred_skills_matched += 1
# If no skills were found, return empty statistics
if required_skills_total == 0 and preferred_skills_total == 0:
return create_success_response(
{
"required_skills": {
"total": 0,
"matched": 0,
"percentage": 0.0,
},
"preferred_skills": {
"total": 0,
"matched": 0,
"percentage": 0.0,
},
"overall_match": {
"total": 0,
"matched": 0,
"percentage": 0.0,
},
}
)
# Calculate percentages
required_match_percent = (required_skills_matched / required_skills_total * 100) if required_skills_total > 0 else 0
preferred_match_percent = (
(preferred_skills_matched / preferred_skills_total * 100) if preferred_skills_total > 0 else 0
)
overall_total = required_skills_total + preferred_skills_total
overall_matched = required_skills_matched + preferred_skills_matched
overall_match_percent = (overall_matched / overall_total * 100) if overall_total > 0 else 0
return create_success_response(
{
"required_skills": {
"total": required_skills_total,
"matched": required_skills_matched,
"percentage": round(required_match_percent, 1),
},
"preferred_skills": {
"total": preferred_skills_total,
"matched": preferred_skills_matched,
"percentage": round(preferred_match_percent, 1),
},
"overall_match": {
"total": overall_total,
"matched": overall_matched,
"percentage": round(overall_match_percent, 1),
},
}
)
@router.post("/{candidate_id}/{job_id}/generate-resume")
async def generate_resume(
candidate_id: str = Path(...),
job_id: str = Path(...),
current_user=Depends(get_current_user_or_guest),
database: RedisDatabase = Depends(get_database),
) -> StreamingResponse:
skills: List[SkillAssessment] = []
"""Get skill match for a candidate against a requirement with caching"""
async def message_stream_generator():
logger.info(f"🔍 Looking up candidate and job details for {candidate_id}/{job_id}")
candidate_data = await database.get_candidate(candidate_id)
if not candidate_data:
logger.error(f"❌ Candidate with ID '{candidate_id}' not found")
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content=f"Candidate with ID '{candidate_id}' not found",
)
yield error_message
return
candidate = Candidate.model_validate(candidate_data)
job_data = await database.get_job(job_id)
if not job_data:
logger.error(f"❌ Job with ID '{job_id}' not found")
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content=f"Job with ID '{job_id}' not found",
)
yield error_message
return
job = Job.model_validate(job_data)
uninitalized = False
requirements = get_requirements_list(job)
logger.info(
f"🔍 Checking skill match for candidate {candidate.username} against job {job.id}'s {len(requirements)} requirements."
)
for req in requirements:
skill = req.get("requirement", None)
if not skill:
logger.warning(f"⚠️ No 'requirement' found in entry: {req}")
continue
cache_key = get_skill_cache_key(candidate.id, skill)
assessment: SkillAssessment | None = await database.get_cached_skill_match(cache_key)
if not assessment:
logger.info(f"💾 No cached skill match data: {cache_key}, {candidate.id}, {skill}")
uninitalized = True
break
if assessment and assessment.skill.lower() != skill.lower():
logger.warning(
f"❌ Cached skill match for {candidate.username} does not match requested skill: {assessment.skill} != {skill} ({cache_key})."
)
uninitalized = True
break
logger.info(f"✅ Assessment found for {candidate.username} skill {assessment.skill}: {cache_key}")
skills.append(assessment)
if uninitalized:
logger.error("❌ Uninitialized skill match data, cannot generate resume")
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="Uninitialized skill match data, cannot generate resume",
)
yield error_message
return
logger.info(
f"🔍 Generating resume for candidate {candidate.username}, job {job.id}, with {len(skills)} skills."
)
async with entities.get_candidate_entity(candidate=candidate) as candidate_entity:
agent = candidate_entity.get_or_create_agent(agent_type=ChatContextType.GENERATE_RESUME)
if not agent:
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="No skill match agent found for this candidate",
)
yield error_message
return
final_message = None
if not isinstance(agent, GenerateResume):
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="Agent is not a GenerateResume instance",
)
yield error_message
return
async for generated_message in agent.generate_resume(
llm=llm_manager.get_llm(),
model=defines.model,
session_id=MOCK_UUID,
skills=skills,
):
if generated_message.status == ApiStatusType.ERROR:
if isinstance(generated_message, ChatMessageError):
content = generated_message.content
else:
content = "An error occurred during AI generation"
logger.error(f"❌ AI generation error: {content}")
yield f"data: {json.dumps({'status': 'error'})}\n\n"
return
# If the message is not done, convert it to a ChatMessageBase to remove
# metadata and other unnecessary fields for streaming
if generated_message.status != ApiStatusType.DONE:
if not isinstance(generated_message, ChatMessageStreaming) and not isinstance(
generated_message, ChatMessageStatus
):
raise TypeError(
f"Expected ChatMessageStreaming or ChatMessageStatus, got {type(generated_message)}"
)
yield generated_message # Convert to ChatMessageBase for streaming
# Store reference to the complete AI message
if generated_message.status == ApiStatusType.DONE:
final_message = generated_message
break
if final_message is None:
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="No skill match found for the given requirement",
)
yield error_message
return
if not isinstance(final_message, ChatMessageResume):
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="Skill match response is not valid",
)
yield error_message
return
resume: ChatMessageResume = final_message
resume.resume.job_id = job.id
yield resume
return
try:
async def to_json(method):
try:
async for message in method:
json_data = message.model_dump(mode="json", by_alias=True)
json_str = json.dumps(json_data)
yield f"data: {json_str}\n\n".encode("utf-8")
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"Error in to_json conversion: {e}")
return
return StreamingResponse(
to_json(message_stream_generator()),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Nginx
"X-Content-Type-Options": "nosniff",
"Access-Control-Allow-Origin": "*", # Adjust for your CORS needs
"Transfer-Encoding": "chunked",
},
)
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"❌ Document upload error: {e}")
return StreamingResponse(
iter(
[
json.dumps(
ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="Failed to generate skill assessment",
).model_dump(mode="json", by_alias=True)
)
]
),
media_type="text/event-stream",
)
@rate_limited(guest_per_minute=5, user_per_minute=30, admin_per_minute=100)
@router.get("/{username}/chat-sessions")
async def get_candidate_chat_sessions(
username: str = Path(...),
current_user=Depends(get_current_user_or_guest),
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
database: RedisDatabase = Depends(get_database),
):
"""Get all chat sessions related to a specific candidate"""
try:
logger.info(f"🔍 Fetching chat sessions for candidate with username: {username}")
# Find candidate by username
all_candidates_data = await database.get_all_candidates()
candidates_list = [Candidate.model_validate(data) for data in all_candidates_data.values()]
matching_candidates = [c for c in candidates_list if c.username.lower() == username.lower()]
if not matching_candidates:
return JSONResponse(
status_code=404,
content=create_error_response("CANDIDATE_NOT_FOUND", f"Candidate with username '{username}' not found"),
)
candidate = matching_candidates[0]
# Get all chat sessions
all_sessions_data = await database.get_all_chat_sessions()
sessions_list = []
for index, session_data in enumerate(all_sessions_data.values()):
try:
session = ChatSession.model_validate(session_data)
if session.user_id != current_user.id:
# User can only access their own sessions
logger.info(
f"🔗 Skipping session {session.id} - not owned by user {current_user.id} (created by {session.user_id})"
)
continue
# Check if this session is related to the candidate
context = session.context
if context and context.related_entity_type == "candidate" and context.related_entity_id == candidate.id:
sessions_list.append(session)
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"❌ Failed to validate session ({index}): {e}")
logger.error(f"❌ Session data: {session_data}")
continue
# Sort by last activity (most recent first)
sessions_list.sort(key=lambda x: x.last_activity, reverse=True)
# Apply pagination
total = len(sessions_list)
start = (page - 1) * limit
end = start + limit
paginated_sessions = sessions_list[start:end]
paginated_response = create_paginated_response(
[s.model_dump(by_alias=True) for s in paginated_sessions], page, limit, total
)
return create_success_response(
{
"candidate": {
"id": candidate.id,
"username": candidate.username,
"fullName": candidate.full_name,
"email": candidate.email,
},
"sessions": paginated_response,
}
)
except Exception as e:
logger.error(f"❌ Get candidate chat sessions error: {e}")
return JSONResponse(status_code=500, content=create_error_response("FETCH_ERROR", str(e)))