from datetime import UTC, datetime import logging import uuid from typing import Any, Dict, List, Optional from models import Resume from .protocols import DatabaseProtocol from ..constants import KEY_PREFIXES logger = logging.getLogger(__name__) class ResumeMixin(DatabaseProtocol): """Mixin for resume-related database operations""" async def set_resume(self, user_id: str, resume_data: Dict) -> bool: """Save a resume for a user""" try: resume = Resume.model_validate(resume_data) # Store the resume data key = f"{KEY_PREFIXES['resumes']}{user_id}:{resume.id}" await self.redis.set(key, self._serialize(resume_data)) # Add resume_id to user's resume list user_resumes_key = f"{KEY_PREFIXES['user_resumes']}{user_id}" await self.redis.rpush(user_resumes_key, resume.id) # type: ignore logger.info(f"📄 Saved resume {resume.id} for user {user_id}") return True except Exception as e: logger.error(f"❌ Error saving resume for user {user_id}: {e}") return False async def get_resume(self, resume_id: str, user_id: Optional[str] = None) -> Optional[Resume]: """Get a specific resume by resume_id, optionally filtered by user_id""" try: if user_id: # If user_id is provided, use the original key structure key = f"{KEY_PREFIXES['resumes']}{user_id}:{resume_id}" data = await self.redis.get(key) if data: resume_data = self._deserialize(data) logger.info(f"📄 Retrieved resume {resume_id} for user {user_id}") return Resume.model_validate(resume_data) logger.info(f"📄 Resume {resume_id} not found for user {user_id}") return None else: # If no user_id provided, search for the resume across all users pattern = f"{KEY_PREFIXES['resumes']}*:{resume_id}" # Use SCAN to find matching keys matching_keys = [] async for key in self.redis.scan_iter(match=pattern): matching_keys.append(key) if not matching_keys: logger.info(f"📄 Resume {resume_id} not found") return None if len(matching_keys) > 1: logger.warning(f"📄 Multiple resumes found with ID {resume_id}, returning first match") # Get data from the first matching key target_key = matching_keys[0] data = await self.redis.get(target_key) if data: resume_data = self._deserialize(data) # Extract user_id from key for logging key_user_id = target_key.replace(KEY_PREFIXES["resumes"], "").split(":")[0] logger.info(f"📄 Retrieved resume {resume_id} for user {key_user_id}") return Resume.model_validate(resume_data) logger.info(f"📄 Resume {resume_id} data not found") return None except Exception as e: logger.error(f"❌ Error retrieving resume {resume_id}: {e}") return None async def get_all_resumes_for_user(self, user_id: str) -> List[Dict]: """Get all resumes for a specific user""" try: # Get all resume IDs for this user user_resumes_key = f"{KEY_PREFIXES['user_resumes']}{user_id}" resume_ids = await self.redis.lrange(user_resumes_key, 0, -1) # type: ignore if not resume_ids: logger.info(f"📄 No resumes found for user {user_id}") return [] # Get all resume data resumes = [] pipe = self.redis.pipeline() for resume_id in resume_ids: pipe.get(f"{KEY_PREFIXES['resumes']}{user_id}:{resume_id}") values = await pipe.execute() for resume_id, value in zip(resume_ids, values): if value: resume_data = self._deserialize(value) if resume_data: resumes.append(resume_data) else: # Clean up orphaned resume ID await self.redis.lrem(user_resumes_key, 0, resume_id) # type: ignore logger.warning(f"Removed orphaned resume ID {resume_id} for user {user_id}") # Sort by created_at timestamp (most recent first) resumes.sort(key=lambda x: x.get("created_at", ""), reverse=True) logger.info(f"📄 Retrieved {len(resumes)} resumes for user {user_id}") return resumes except Exception as e: logger.error(f"❌ Error retrieving resumes for user {user_id}: {e}") return [] async def delete_resume(self, user_id: str, resume_id: str) -> bool: """Delete a specific resume for a user""" try: # Delete the resume data key = f"{KEY_PREFIXES['resumes']}{user_id}:{resume_id}" result = await self.redis.delete(key) # Remove from user's resume list user_resumes_key = f"{KEY_PREFIXES['user_resumes']}{user_id}" await self.redis.lrem(user_resumes_key, 0, resume_id) # type: ignore # Clean up revision history await self._delete_resume_history(user_id, resume_id) if result > 0: logger.info(f"🗑️ Deleted resume {resume_id} for user {user_id}") return True else: logger.warning(f"⚠️ Resume {resume_id} not found for user {user_id}") return False except Exception as e: logger.error(f"❌ Error deleting resume {resume_id} for user {user_id}: {e}") return False async def delete_all_resumes_for_user(self, user_id: str) -> int: """Delete all resumes for a specific user and return count of deleted resumes""" try: # Get all resume IDs for this user user_resumes_key = f"{KEY_PREFIXES['user_resumes']}{user_id}" resume_ids = await self.redis.lrange(user_resumes_key, 0, -1) # type: ignore if not resume_ids: logger.info(f"📄 No resumes found for user {user_id}") return 0 deleted_count = 0 # Use pipeline for efficient batch operations pipe = self.redis.pipeline() # Delete each resume and its history for resume_id in resume_ids: pipe.delete(f"{KEY_PREFIXES['resumes']}{user_id}:{resume_id}") deleted_count += 1 # Delete revision history for this resume await self._delete_resume_history(user_id, resume_id) # Delete the user's resume list pipe.delete(user_resumes_key) # Execute all operations await pipe.execute() logger.info(f"🗑️ Successfully deleted {deleted_count} resumes for user {user_id}") return deleted_count except Exception as e: logger.error(f"❌ Error deleting all resumes for user {user_id}: {e}") raise async def get_all_resumes(self) -> Dict[str, List[Dict]]: """Get all resumes grouped by user (admin function)""" try: pattern = f"{KEY_PREFIXES['resumes']}*" keys = await self.redis.keys(pattern) if not keys: return {} # Group by user_id user_resumes = {} pipe = self.redis.pipeline() for key in keys: pipe.get(key) values = await pipe.execute() for key, value in zip(keys, values): if value: # Extract user_id from key format: resume:{user_id}:{resume_id} key_parts = key.replace(KEY_PREFIXES["resumes"], "").split(":", 1) if len(key_parts) >= 1: user_id = key_parts[0] resume_data = self._deserialize(value) if resume_data: if user_id not in user_resumes: user_resumes[user_id] = [] user_resumes[user_id].append(resume_data) # Sort each user's resumes by created_at for user_id in user_resumes: user_resumes[user_id].sort(key=lambda x: x.get("created_at", ""), reverse=True) return user_resumes except Exception as e: logger.error(f"❌ Error retrieving all resumes: {e}") return {} async def search_resumes_for_user(self, user_id: str, query: str) -> List[Resume]: """Search resumes for a user by content, job title, or candidate name""" try: all_resumes = await self.get_all_resumes_for_user(user_id) query_lower = query.lower() matching_resumes = [] for resume in all_resumes: # Search in resume content, job_id, candidate_id, etc. searchable_text = " ".join( [ resume.get("resume", ""), resume.get("job_id", ""), resume.get("candidate_id", ""), str(resume.get("created_at", "")), ] ).lower() if query_lower in searchable_text: matching_resumes.append(resume) logger.info(f"📄 Found {len(matching_resumes)} matching resumes for user {user_id}") return [Resume.model_validate(resume) for resume in matching_resumes] except Exception as e: logger.error(f"❌ Error searching resumes for user {user_id}: {e}") return [] async def get_resumes_by_candidate(self, user_id: str, candidate_id: str) -> List[Resume]: """Get all resumes for a specific candidate created by a user""" try: all_resumes = await self.get_all_resumes_for_user(user_id) candidate_resumes = [Resume.model_validate(resume) for resume in all_resumes if resume.get("candidate_id") == candidate_id] logger.info(f"📄 Found {len(candidate_resumes)} resumes for candidate {candidate_id} by user {user_id}") return candidate_resumes except Exception as e: logger.error(f"❌ Error retrieving resumes for candidate {candidate_id} by user {user_id}: {e}") return [] async def get_resumes_by_job(self, user_id: str, job_id: str) -> List[Resume]: """Get all resumes for a specific job created by a user""" try: all_resumes = await self.get_all_resumes_for_user(user_id) job_resumes = [Resume.model_validate(resume) for resume in all_resumes if resume.get("job_id") == job_id] logger.info(f"📄 Found {len(job_resumes)} resumes for job {job_id} by user {user_id}") return job_resumes except Exception as e: logger.error(f"❌ Error retrieving resumes for job {job_id} by user {user_id}: {e}") return [] async def get_resume_statistics(self, user_id: str) -> Dict[str, Any]: """Get resume statistics for a user""" try: all_resumes = await self.get_all_resumes_for_user(user_id) stats = { "total_resumes": len(all_resumes), "resumes_by_candidate": {}, "resumes_by_job": {}, "creation_timeline": {}, "recent_resumes": [], } for resume in all_resumes: # Count by candidate candidate_id = resume.get("candidate_id", "unknown") stats["resumes_by_candidate"][candidate_id] = stats["resumes_by_candidate"].get(candidate_id, 0) + 1 # Count by job job_id = resume.get("job_id", "unknown") stats["resumes_by_job"][job_id] = stats["resumes_by_job"].get(job_id, 0) + 1 # Timeline by date created_at = resume.get("created_at") if created_at: try: date_key = created_at[:10] # Extract date part stats["creation_timeline"][date_key] = stats["creation_timeline"].get(date_key, 0) + 1 except (IndexError, TypeError): pass # Get recent resumes (last 5) stats["recent_resumes"] = all_resumes[:5] return stats except Exception as e: logger.error(f"❌ Error getting resume statistics for user {user_id}: {e}") return { "total_resumes": 0, "resumes_by_candidate": {}, "resumes_by_job": {}, "creation_timeline": {}, "recent_resumes": [], } async def update_resume(self, user_id: str, resume_id: str, updates: Dict) -> Optional[Resume]: """Update specific fields of a resume and save the previous version to history""" try: # Get current resume data current_resume = await self.get_resume(resume_id, user_id) if not current_resume: logger.warning(f"📄 Resume {resume_id} not found for user {user_id}") return None # Save current version to history before updating await self._save_resume_revision(user_id, resume_id, current_resume.model_dump()) # Update the resume resume_dict = current_resume.model_dump() resume_dict.update(updates) resume_dict["updated_at"] = datetime.now(UTC).isoformat() # Save updated resume key = f"{KEY_PREFIXES['resumes']}{user_id}:{resume_id}" await self.redis.set(key, self._serialize(resume_dict)) logger.info(f"📄 Updated resume {resume_id} for user {user_id}") return Resume.model_validate(resume_dict) except Exception as e: logger.error(f"❌ Error updating resume {resume_id} for user {user_id}: {e}") return None async def get_resume_revisions(self, user_id: str, resume_id: str) -> List[Dict[str, str]]: """Get list of all revisions for a resume with their revision IDs and metadata""" try: revisions_key = f"{KEY_PREFIXES['resume_revisions']}{user_id}:{resume_id}" revision_ids = await self.redis.lrange(revisions_key, 0, -1) # type: ignore if not revision_ids: logger.info(f"📄 No revisions found for resume {resume_id} by user {user_id}") return [] # Get metadata for each revision revisions = [] pipe = self.redis.pipeline() for revision_id in revision_ids: history_key = f"{KEY_PREFIXES['resume_history']}{user_id}:{resume_id}:{revision_id}" pipe.get(history_key) values = await pipe.execute() for revision_id, value in zip(revision_ids, values): if value: revision_data = self._deserialize(value) if revision_data: revisions.append( { "revision_id": revision_id, "revision_timestamp": revision_data.get("revision_timestamp", ""), "updated_at": revision_data.get("updated_at", ""), "created_at": revision_data.get("created_at", ""), "candidate_id": revision_data.get("candidate_id", ""), "job_id": revision_data.get("job_id", ""), } ) # Sort by revision_timestamp (most recent first) revisions.sort(key=lambda x: x["revision_timestamp"], reverse=True) logger.info(f"📄 Found {len(revisions)} revisions for resume {resume_id} by user {user_id}") return revisions except Exception as e: logger.error(f"❌ Error retrieving revisions for resume {resume_id} by user {user_id}: {e}") return [] async def get_resume_revision(self, user_id: str, resume_id: str, revision_id: str) -> Optional[Resume]: """Get a specific revision of a resume by revision ID""" try: history_key = f"{KEY_PREFIXES['resume_history']}{user_id}:{resume_id}:{revision_id}" data = await self.redis.get(history_key) if data: revision_data = self._deserialize(data) logger.info(f"📄 Retrieved revision {revision_id} for resume {resume_id} by user {user_id}") return Resume.model_validate(revision_data) logger.warning(f"📄 Revision {revision_id} not found for resume {resume_id} by user {user_id}") return None except Exception as e: logger.error(f"❌ Error retrieving revision {revision_id} for resume {resume_id} by user {user_id}: {e}") return None async def get_resume_history(self, user_id: str, resume_id: str) -> List[Resume]: """Get all historical versions of a resume (full data)""" try: revisions_key = f"{KEY_PREFIXES['resume_revisions']}{user_id}:{resume_id}" revision_ids = await self.redis.lrange(revisions_key, 0, -1) # type: ignore if not revision_ids: logger.info(f"📄 No history found for resume {resume_id} by user {user_id}") return [] # Get all revision data history = [] pipe = self.redis.pipeline() for revision_id in revision_ids: history_key = f"{KEY_PREFIXES['resume_history']}{user_id}:{resume_id}:{revision_id}" pipe.get(history_key) values = await pipe.execute() for revision_id, value in zip(revision_ids, values): if value: revision_data = self._deserialize(value) if revision_data: history.append(Resume.model_validate(revision_data)) # Sort by revision_timestamp or updated_at (most recent first) history.sort(key=lambda x: x.updated_at or x.created_at, reverse=True) logger.info(f"📄 Retrieved {len(history)} historical versions for resume {resume_id} by user {user_id}") return history except Exception as e: logger.error(f"❌ Error retrieving history for resume {resume_id} by user {user_id}: {e}") return [] async def restore_resume_revision(self, user_id: str, resume_id: str, revision_id: str) -> Optional[Resume]: """Restore a resume to a specific revision""" try: # Get the revision data revision = await self.get_resume_revision(user_id, resume_id, revision_id) if not revision: logger.warning(f"📄 Revision {revision_id} not found for resume {resume_id} by user {user_id}") return None # Save current version to history before restoring current_resume = await self.get_resume(resume_id, user_id) if current_resume: await self._save_resume_revision(user_id, resume_id, current_resume.model_dump()) # Update the resume with revision data revision_dict = revision.model_dump() revision_dict["updated_at"] = datetime.now(UTC).isoformat() revision_dict["restored_from"] = revision_id # Save restored resume key = f"{KEY_PREFIXES['resumes']}{user_id}:{resume_id}" await self.redis.set(key, self._serialize(revision_dict)) logger.info(f"📄 Restored resume {resume_id} to revision {revision_id} for user {user_id}") return Resume.model_validate(revision_dict) except Exception as e: logger.error(f"❌ Error restoring resume {resume_id} to revision {revision_id} for user {user_id}: {e}") return None async def delete_resume_revision(self, user_id: str, resume_id: str, revision_id: str) -> bool: """Delete a specific revision from history""" try: # Delete the revision data history_key = f"{KEY_PREFIXES['resume_history']}{user_id}:{resume_id}:{revision_id}" result = await self.redis.delete(history_key) # Remove revision ID from revisions list revisions_key = f"{KEY_PREFIXES['resume_revisions']}{user_id}:{resume_id}" await self.redis.lrem(revisions_key, 0, revision_id) # type: ignore if result > 0: logger.info(f"🗑️ Deleted revision {revision_id} for resume {resume_id} by user {user_id}") return True else: logger.warning(f"⚠️ Revision {revision_id} not found for resume {resume_id} by user {user_id}") return False except Exception as e: logger.error(f"❌ Error deleting revision {revision_id} for resume {resume_id} by user {user_id}: {e}") return False # Private helper methods for revision management async def _save_resume_revision(self, user_id: str, resume_id: str, resume_data: Dict) -> str: """Save a resume revision to history""" revision_id = str(uuid.uuid4()) revision_timestamp = datetime.now(UTC).isoformat() # Add revision metadata to the data revision_data = resume_data.copy() revision_data["revision_timestamp"] = revision_timestamp revision_data["revision_id"] = revision_id # Store the revision history_key = f"{KEY_PREFIXES['resume_history']}{user_id}:{resume_id}:{revision_id}" await self.redis.set(history_key, self._serialize(revision_data)) # Add revision ID to revisions list revisions_key = f"{KEY_PREFIXES['resume_revisions']}{user_id}:{resume_id}" await self.redis.rpush(revisions_key, revision_id) # type: ignore logger.info(f"📄 Saved revision {revision_id} for resume {resume_id} by user {user_id}") return revision_id async def _delete_resume_history(self, user_id: str, resume_id: str) -> None: """Delete all revision history for a resume""" try: # Get all revision IDs revisions_key = f"{KEY_PREFIXES['resume_revisions']}{user_id}:{resume_id}" revision_ids = await self.redis.lrange(revisions_key, 0, -1) # type: ignore # Delete all revision data pipe = self.redis.pipeline() for revision_id in revision_ids: history_key = f"{KEY_PREFIXES['resume_history']}{user_id}:{resume_id}:{revision_id}" pipe.delete(history_key) # Delete the revisions list pipe.delete(revisions_key) await pipe.execute() logger.info(f"🗑️ Deleted all revision history for resume {resume_id} by user {user_id}") except Exception as e: logger.error(f"❌ Error deleting revision history for resume {resume_id} by user {user_id}: {e}")