601 lines
23 KiB
Python

"""
Job Routes
"""
import asyncio
import io
import json
import pathlib
import re
import uuid
from datetime import datetime, UTC
from typing import Any, Dict, Optional
from fastapi import APIRouter, File, Depends, Body, Path, Query, UploadFile
from fastapi.responses import JSONResponse, StreamingResponse
from markitdown import MarkItDown, StreamInfo
import backstory_traceback as backstory_traceback
import defines
from agents.base import CandidateEntity
from utils.helpers import filter_and_paginate, get_document_type_from_filename
from database.manager import RedisDatabase
from logger import logger
from models import (
MOCK_UUID,
ApiActivityType,
ApiStatusType,
ChatContextType,
ChatMessage,
ChatMessageError,
ChatMessageStatus,
DocumentType,
Job,
JobRequirementsMessage,
Candidate,
Employer,
)
from utils.dependencies import get_current_admin, get_database, get_current_user
from utils.responses import create_paginated_response, create_success_response, create_error_response
import utils.llm_proxy as llm_manager
import entities.entity_manager as entities
# Create router for job endpoints
router = APIRouter(prefix="/jobs", tags=["jobs"])
async def reformat_as_markdown(database: RedisDatabase, candidate_entity: CandidateEntity, content: str):
chat_agent = candidate_entity.get_or_create_agent(agent_type=ChatContextType.JOB_REQUIREMENTS)
if not chat_agent:
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="No agent found for job requirements chat type",
)
yield error_message
return
status_message = ChatMessageStatus(
session_id=MOCK_UUID, # No session ID for document uploads
content="Reformatting job description as markdown...",
activity=ApiActivityType.CONVERTING,
)
yield status_message
message = None
async for message in chat_agent.llm_one_shot(
llm=llm_manager.get_llm(),
model=defines.model,
session_id=MOCK_UUID,
prompt=content,
system_prompt="""
You are a document editor. Take the provided job description and reformat as legible markdown.
Return only the markdown content, no other text. Make sure all content is included. If the
content is already in markdown format, return it as is.
""",
):
pass
if not message or not isinstance(message, ChatMessage):
logger.error("❌ Failed to reformat job description to markdown")
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="Failed to reformat job description",
)
yield error_message
return
chat_message: ChatMessage = message
try:
chat_message.content = chat_agent.extract_markdown_from_text(chat_message.content)
except Exception:
pass
logger.info("✅ Successfully converted content to markdown")
yield chat_message
return
async def create_job_from_content(database: RedisDatabase, current_user: Candidate, content: str):
status_message = ChatMessageStatus(
session_id=MOCK_UUID, # No session ID for document uploads
content=f"Initiating connection with {current_user.first_name}'s AI agent...",
activity=ApiActivityType.INFO,
)
yield status_message
await asyncio.sleep(0) # Let the status message propagate
async with entities.get_candidate_entity(candidate=current_user) as candidate_entity:
message = None
async for message in reformat_as_markdown(database, candidate_entity, content):
# Only yield one final DONE message
if message.status != ApiStatusType.DONE:
yield message
if not message or not isinstance(message, ChatMessage):
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="Failed to reformat job description",
)
yield error_message
return
markdown_message = message
chat_agent = candidate_entity.get_or_create_agent(agent_type=ChatContextType.JOB_REQUIREMENTS)
if not chat_agent:
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="No agent found for job requirements chat type",
)
yield error_message
return
status_message = ChatMessageStatus(
session_id=MOCK_UUID, # No session ID for document uploads
content="Analyzing document for company and requirement details...",
activity=ApiActivityType.SEARCHING,
)
yield status_message
message = None
async for message in chat_agent.generate(
llm=llm_manager.get_llm(), model=defines.model, session_id=MOCK_UUID, prompt=markdown_message.content
):
if message.status != ApiStatusType.DONE:
yield message
if not message or not isinstance(message, JobRequirementsMessage):
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="Job extraction did not convert successfully",
)
yield error_message
return
job_requirements: JobRequirementsMessage = message
logger.info(f"✅ Successfully generated job requirements for job {job_requirements.id}")
yield job_requirements
return
@router.post("")
async def create_job(
job_data: Dict[str, Any] = Body(...),
current_user=Depends(get_current_user),
database: RedisDatabase = Depends(get_database),
):
"""Create a new job"""
try:
job = Job.model_validate(job_data)
# Add required fields
job.id = str(uuid.uuid4())
job.owner_id = current_user.id
job.owner = current_user
await database.set_job(job.id, job.model_dump())
return create_success_response(job.model_dump(by_alias=True))
except Exception as e:
logger.error(f"❌ Job creation error: {e}")
return JSONResponse(status_code=400, content=create_error_response("CREATION_FAILED", str(e)))
@router.post("")
async def create_candidate_job(
job_data: Dict[str, Any] = Body(...),
current_user=Depends(get_current_user),
database: RedisDatabase = Depends(get_database),
):
"""Create a new job"""
isinstance(current_user, Employer)
try:
job = Job.model_validate(job_data)
# Add required fields
job.id = str(uuid.uuid4())
job.owner_id = current_user.id
job.owner = current_user
await database.set_job(job.id, job.model_dump())
return create_success_response(job.model_dump(by_alias=True))
except Exception as e:
logger.error(f"❌ Job creation error: {e}")
return JSONResponse(status_code=400, content=create_error_response("CREATION_FAILED", str(e)))
@router.patch("/{job_id}")
async def update_job(
job_id: str = Path(...),
updates: Dict[str, Any] = Body(...),
current_user=Depends(get_current_user),
database: RedisDatabase = Depends(get_database),
):
"""Update a candidate"""
try:
job_data = await database.get_job(job_id)
if not job_data:
logger.warning(f"⚠️ Job not found for update: {job_data}")
return JSONResponse(status_code=404, content=create_error_response("NOT_FOUND", "Job not found"))
job = Job.model_validate(job_data)
# Check authorization (user can only update their own profile)
if current_user.is_admin is False and job.owner_id != current_user.id:
logger.warning(f"⚠️ Unauthorized update attempt by user {current_user.id} on job {job_id}")
return JSONResponse(
status_code=403, content=create_error_response("FORBIDDEN", "Cannot update another user's job")
)
# Apply updates
updates["updatedAt"] = datetime.now(UTC).isoformat()
logger.info(f"🔄 Updating job {job_id} with data: {updates}")
job_dict = job.model_dump()
job_dict.update(updates)
updated_job = Job.model_validate(job_dict)
await database.set_job(job_id, updated_job.model_dump())
return create_success_response(updated_job.model_dump(by_alias=True))
except Exception as e:
logger.error(f"❌ Update job error: {e}")
return JSONResponse(status_code=400, content=create_error_response("UPDATE_FAILED", str(e)))
@router.post("/from-content")
async def create_job_from_description(
content: str = Body(...), current_user=Depends(get_current_user), database: RedisDatabase = Depends(get_database)
):
"""Upload a document for the current candidate"""
async def content_stream_generator(content):
# Verify user is a candidate
if current_user.user_type != "candidate":
logger.warning(f"⚠️ Unauthorized upload attempt by user type: {current_user.user_type}")
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="Only candidates can upload documents",
)
yield error_message
return
logger.info(f"📁 Received file content: size='{len(content)} bytes'")
last_yield_was_streaming = False
async for message in create_job_from_content(database=database, current_user=current_user, content=content):
if message.status != ApiStatusType.STREAMING:
last_yield_was_streaming = False
else:
if last_yield_was_streaming:
continue
last_yield_was_streaming = True
logger.info(f"📄 Yielding job creation message status: {message.status}")
yield message
return
try:
async def to_json(method):
try:
async for message in method:
json_data = message.model_dump(mode="json", by_alias=True)
json_str = json.dumps(json_data)
yield f"data: {json_str}\n\n".encode("utf-8")
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"Error in to_json conversion: {e}")
return
return StreamingResponse(
to_json(content_stream_generator(content)),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Nginx
"X-Content-Type-Options": "nosniff",
"Access-Control-Allow-Origin": "*", # Adjust for your CORS needs
"Transfer-Encoding": "chunked",
},
)
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"❌ Document upload error: {e}")
return StreamingResponse(
iter(
[
json.dumps(
ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="Failed to upload document",
).model_dump(by_alias=True)
).encode("utf-8")
]
),
media_type="text/event-stream",
)
@router.post("/upload")
async def create_job_from_file(
file: UploadFile = File(...),
current_user=Depends(get_current_user),
database: RedisDatabase = Depends(get_database),
):
"""Upload a job document for the current candidate and create a Job"""
# Check file size (limit to 10MB)
max_size = 10 * 1024 * 1024 # 10MB
file_content = await file.read()
if len(file_content) > max_size:
logger.info(f"⚠️ File too large: {file.filename} ({len(file_content)} bytes)")
return StreamingResponse(
iter(
[
json.dumps(
ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="File size exceeds 10MB limit",
).model_dump(by_alias=True)
).encode("utf-8")
]
),
media_type="text/event-stream",
)
if len(file_content) == 0:
logger.info(f"⚠️ File is empty: {file.filename}")
return StreamingResponse(
iter(
[
json.dumps(
ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="File is empty",
).model_dump(by_alias=True)
).encode("utf-8")
]
),
media_type="text/event-stream",
)
"""Upload a document for the current candidate"""
async def upload_stream_generator(file_content):
# Verify user is a candidate
if current_user.user_type != "candidate":
logger.warning(f"⚠️ Unauthorized upload attempt by user type: {current_user.user_type}")
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="Only candidates can upload documents",
)
yield error_message
return
file.filename = re.sub(r"^.*/", "", file.filename) if file.filename else "" # Sanitize filename
if not file.filename or file.filename.strip() == "":
logger.warning("⚠️ File upload attempt with missing filename")
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="File must have a valid filename",
)
yield error_message
return
logger.info(
f"📁 Received file upload: filename='{file.filename}', content_type='{file.content_type}', size='{len(file_content)} bytes'"
)
# Validate file type
allowed_types = [".txt", ".md", ".docx", ".pdf", ".png", ".jpg", ".jpeg", ".gif"]
file_extension = pathlib.Path(file.filename).suffix.lower() if file.filename else ""
if file_extension not in allowed_types:
logger.warning(f"⚠️ Invalid file type: {file_extension} for file {file.filename}")
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content=f"File type {file_extension} not supported. Allowed types: {', '.join(allowed_types)}",
)
yield error_message
return
document_type = get_document_type_from_filename(file.filename or "unknown.txt")
if document_type != DocumentType.MARKDOWN and document_type != DocumentType.TXT:
status_message = ChatMessageStatus(
session_id=MOCK_UUID, # No session ID for document uploads
content=f"Converting content from {document_type}...",
activity=ApiActivityType.CONVERTING,
)
yield status_message
try:
md = MarkItDown(enable_plugins=False) # Set to True to enable plugins
stream = io.BytesIO(file_content)
stream_info = StreamInfo(
extension=file_extension, # e.g., ".pdf"
url=file.filename, # optional, helps with logging and guessing
)
result = md.convert_stream(stream, stream_info=stream_info, output_format="markdown")
file_content = result.text_content
logger.info(f"✅ Converted {file.filename} to Markdown format")
except Exception as e:
error_message = ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content=f"Failed to convert {file.filename} to Markdown.",
)
yield error_message
logger.error(f"❌ Error converting {file.filename} to Markdown: {e}")
return
async for message in create_job_from_content(
database=database, current_user=current_user, content=file_content
):
yield message
return
try:
async def to_json(method):
try:
async for message in method:
json_data = message.model_dump(mode="json", by_alias=True)
json_str = json.dumps(json_data)
yield f"data: {json_str}\n\n".encode("utf-8")
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"Error in to_json conversion: {e}")
return
return StreamingResponse(
to_json(upload_stream_generator(file_content)),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Nginx
"X-Content-Type-Options": "nosniff",
"Access-Control-Allow-Origin": "*", # Adjust for your CORS needs
"Transfer-Encoding": "chunked",
},
)
except Exception as e:
logger.error(backstory_traceback.format_exc())
logger.error(f"❌ Document upload error: {e}")
return StreamingResponse(
iter(
[
json.dumps(
ChatMessageError(
session_id=MOCK_UUID, # No session ID for document uploads
content="Failed to upload document",
).model_dump(mode="json", by_alias=True)
).encode("utf-8")
]
),
media_type="text/event-stream",
)
@router.get("")
async def get_jobs(
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
sortBy: Optional[str] = Query(None, alias="sortBy"),
sortOrder: str = Query("desc", pattern="^(asc|desc)$", alias="sortOrder"),
filters: Optional[str] = Query(None),
database: RedisDatabase = Depends(get_database),
):
"""Get paginated list of jobs"""
logger.info("📄 Fetching jobs...")
try:
filter_dict = None
if filters:
filter_dict = json.loads(filters)
# Get all jobs from Redis
all_jobs_data = await database.get_all_jobs()
jobs_list = []
for job in all_jobs_data.values():
jobs_list.append(Job.model_validate(job))
paginated_jobs, total = filter_and_paginate(jobs_list, page, limit, sortBy, sortOrder, filter_dict)
paginated_response = create_paginated_response(
[j.model_dump(by_alias=True) for j in paginated_jobs], page, limit, total
)
return create_success_response(paginated_response)
except Exception as e:
logger.error(f"❌ Get jobs error: {e}")
return JSONResponse(status_code=400, content=create_error_response("FETCH_FAILED", str(e)))
@router.get("/search")
async def search_jobs(
query: str = Query(...),
filters: Optional[str] = Query(None),
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
current_user=Depends(get_current_user),
database: RedisDatabase = Depends(get_database),
):
"""Search jobs"""
logger.info("🔍 Searching jobs...")
try:
filter_dict = {}
if filters:
filter_dict = json.loads(filters)
# Get all jobs from Redis
all_jobs_data = await database.get_all_jobs()
jobs_list = [Job.model_validate(data) for data in all_jobs_data.values() if data.get("is_active", True)]
if query:
query_lower = query.lower()
jobs_list = [
j
for j in jobs_list
if (
(j.title and query_lower in j.title.lower())
or (j.description and query_lower in j.description.lower())
or any(query_lower in skill.lower() for skill in getattr(j, "skills", []) or [])
)
]
paginated_jobs, total = filter_and_paginate(jobs_list, page, limit, filters=filter_dict)
paginated_response = create_paginated_response(
[j.model_dump(by_alias=True) for j in paginated_jobs], page, limit, total
)
return create_success_response(paginated_response)
except Exception as e:
logger.error(f"❌ Search jobs error: {e}")
return JSONResponse(status_code=400, content=create_error_response("SEARCH_FAILED", str(e)))
@router.get("/{job_id}")
async def get_job(job_id: str = Path(...), database: RedisDatabase = Depends(get_database)):
"""Get a job by ID"""
try:
job_data = await database.get_job(job_id)
if not job_data:
return JSONResponse(status_code=404, content=create_error_response("NOT_FOUND", "Job not found"))
# Increment view count
job_data["views"] = job_data.get("views", 0) + 1
await database.set_job(job_id, job_data)
job = Job.model_validate(job_data)
return create_success_response(job.model_dump(by_alias=True))
except Exception as e:
logger.error(f"❌ Get job error: {e}")
return JSONResponse(status_code=500, content=create_error_response("FETCH_ERROR", str(e)))
@router.delete("/{job_id}")
async def delete_job(
job_id: str = Path(...), admin_user=Depends(get_current_admin), database: RedisDatabase = Depends(get_database)
):
"""Delete a Job"""
try:
# Check if admin user
if not admin_user.is_admin:
logger.warning(f"⚠️ Unauthorized delete attempt by user {admin_user.id}")
return JSONResponse(status_code=403, content=create_error_response("FORBIDDEN", "Only admins can delete"))
# Get candidate data
job_data = await database.get_job(job_id)
if not job_data:
logger.warning(f"⚠️ Candidate not found for deletion: {job_id}")
return JSONResponse(status_code=404, content=create_error_response("NOT_FOUND", "Job not found"))
# Delete job from database
await database.delete_job(job_id)
logger.info(f"🗑️ Job deleted: {job_id} by admin {admin_user.id}")
return create_success_response({"message": "Job deleted successfully", "jobId": job_id})
except Exception as e:
logger.error(f"❌ Delete job error: {e}")
return JSONResponse(status_code=500, content=create_error_response("DELETE_ERROR", "Failed to delete job"))