diff --git a/src/backend/agents/base.py b/src/backend/agents/base.py index 0138bf8..7e3c8e3 100644 --- a/src/backend/agents/base.py +++ b/src/backend/agents/base.py @@ -44,7 +44,7 @@ import defines from logger import logger from models import (Tunables, CandidateQuestion, ChatMessageUser, ChatMessage, RagEntry, ChatMessageMetaData, ApiStatusType, Candidate, ChatContextType) import utils.llm_proxy as llm_manager -from database import RedisDatabase +from database.manager import RedisDatabase from models import ChromaDBGetResponse from utils.metrics import Metrics diff --git a/src/backend/background_tasks.py b/src/backend/background_tasks.py index eee7648..5f7f2aa 100644 --- a/src/backend/background_tasks.py +++ b/src/backend/background_tasks.py @@ -7,7 +7,7 @@ import asyncio from datetime import datetime, timedelta, UTC from typing import Optional, List, Dict, Any, Callable from logger import logger -from database import DatabaseManager +from database.manager import DatabaseManager class BackgroundTaskManager: """Manages background tasks for the application using asyncio instead of threading""" @@ -85,7 +85,7 @@ class BackgroundTaskManager: database = self.database_manager.get_database() # Get Redis client safely (using the event loop safe method) - from database import redis_manager + from backend.database.manager import redis_manager redis = await redis_manager.get_client() # Clean up rate limit keys older than specified days diff --git a/src/backend/database/__init__.py b/src/backend/database/__init__.py new file mode 100644 index 0000000..b320fb4 --- /dev/null +++ b/src/backend/database/__init__.py @@ -0,0 +1,4 @@ +from .core import RedisDatabase +from .manager import DatabaseManager, redis_manager + +__all__ = ['RedisDatabase', 'DatabaseManager', 'redis_manager'] \ No newline at end of file diff --git a/src/backend/database/constants.py b/src/backend/database/constants.py new file mode 100644 index 0000000..e9dfd3d --- /dev/null +++ b/src/backend/database/constants.py @@ -0,0 +1,15 @@ +KEY_PREFIXES = { + 'viewers': 'viewer:', + 'candidates': 'candidate:', + 'employers': 'employer:', + 'jobs': 'job:', + 'job_applications': 'job_application:', + 'chat_sessions': 'chat_session:', + 'chat_messages': 'chat_messages:', + 'ai_parameters': 'ai_parameters:', + 'users': 'user:', + 'candidate_documents': 'candidate_documents:', + 'job_requirements': 'job_requirements:', + 'resumes': 'resume:', + 'user_resumes': 'user_resumes:', +} diff --git a/src/backend/database/core.py b/src/backend/database/core.py new file mode 100644 index 0000000..1fd7303 --- /dev/null +++ b/src/backend/database/core.py @@ -0,0 +1,31 @@ +from redis.asyncio import Redis +from .mixins.base import BaseMixin +from .mixins.resume import ResumeMixin +from .mixins.document import DocumentMixin +from .mixins.user import UserMixin +from .mixins.chat import ChatMixin +from .mixins.auth import AuthMixin +from .mixins.analytics import AnalyticsMixin +from .mixins.job import JobMixin +from .mixins.skill import SkillMixin +from .mixins.ai import AIMixin + +class RedisDatabase( + AIMixin, + BaseMixin, + ResumeMixin, + DocumentMixin, + UserMixin, + ChatMixin, + AuthMixin, + AnalyticsMixin, + JobMixin, + SkillMixin, +): + """ + Main Redis database class combining all mixins + """ + + def __init__(self, redis: Redis): + self.redis = redis + super().__init__() \ No newline at end of file diff --git a/src/backend/database.py b/src/backend/database/manager.py similarity index 78% rename from src/backend/database.py rename to src/backend/database/manager.py index ce9c9eb..8a9042f 100644 --- a/src/backend/database.py +++ b/src/backend/database/manager.py @@ -11,7 +11,8 @@ from models import ( Candidate, Employer, BaseUser, EvidenceDetail, Guest, Authentication, AuthResponse, SkillAssessment, ) import backstory_traceback as traceback - +from .constants import KEY_PREFIXES +from .core import RedisDatabase logger = logging.getLogger(__name__) class _RedisManager: @@ -174,26 +175,11 @@ class _RedisManager: logger.error(f"Failed to get Redis info: {e}") return None -class RedisDatabase: +class RedisDatabase2: def __init__(self, redis: Redis): self.redis = redis # Redis key prefixes for different data types - self.KEY_PREFIXES = { - 'viewers': 'viewer:', - 'candidates': 'candidate:', - 'employers': 'employer:', - 'jobs': 'job:', - 'job_applications': 'job_application:', - 'chat_sessions': 'chat_session:', - 'chat_messages': 'chat_messages:', - 'ai_parameters': 'ai_parameters:', - 'users': 'user:', - 'candidate_documents': 'candidate_documents:', - 'job_requirements': 'job_requirements:', - 'resumes': 'resume:', - 'user_resumes': 'user_resumes:', - } def _serialize(self, data: Any) -> str: """Serialize data to JSON string for Redis storage""" @@ -223,11 +209,11 @@ class RedisDatabase: resume_id = resume_data['id'] # Store the resume data - key = f"{self.KEY_PREFIXES['resumes']}{user_id}:{resume_id}" + key = f"{KEY_PREFIXES['resumes']}{user_id}:{resume_id}" await self.redis.set(key, self._serialize(resume_data)) # Add resume_id to user's resume list - user_resumes_key = f"{self.KEY_PREFIXES['user_resumes']}{user_id}" + user_resumes_key = f"{KEY_PREFIXES['user_resumes']}{user_id}" await self.redis.rpush(user_resumes_key, resume_id) # type: ignore logger.info(f"๐Ÿ“„ Saved resume {resume_id} for user {user_id}") @@ -239,7 +225,7 @@ class RedisDatabase: async def get_resume(self, user_id: str, resume_id: str) -> Optional[Dict]: """Get a specific resume for a user""" try: - key = f"{self.KEY_PREFIXES['resumes']}{user_id}:{resume_id}" + key = f"{KEY_PREFIXES['resumes']}{user_id}:{resume_id}" data = await self.redis.get(key) if data: resume_data = self._deserialize(data) @@ -255,7 +241,7 @@ class RedisDatabase: """Get all resumes for a specific user""" try: # Get all resume IDs for this user - user_resumes_key = f"{self.KEY_PREFIXES['user_resumes']}{user_id}" + user_resumes_key = f"{KEY_PREFIXES['user_resumes']}{user_id}" resume_ids = await self.redis.lrange(user_resumes_key, 0, -1)# type: ignore if not resume_ids: @@ -266,7 +252,7 @@ class RedisDatabase: resumes = [] pipe = self.redis.pipeline() for resume_id in resume_ids: - pipe.get(f"{self.KEY_PREFIXES['resumes']}{user_id}:{resume_id}") + pipe.get(f"{KEY_PREFIXES['resumes']}{user_id}:{resume_id}") values = await pipe.execute() for resume_id, value in zip(resume_ids, values): @@ -292,11 +278,11 @@ class RedisDatabase: """Delete a specific resume for a user""" try: # Delete the resume data - key = f"{self.KEY_PREFIXES['resumes']}{user_id}:{resume_id}" + key = f"{KEY_PREFIXES['resumes']}{user_id}:{resume_id}" result = await self.redis.delete(key) # Remove from user's resume list - user_resumes_key = f"{self.KEY_PREFIXES['user_resumes']}{user_id}" + user_resumes_key = f"{KEY_PREFIXES['user_resumes']}{user_id}" await self.redis.lrem(user_resumes_key, 0, resume_id)# type: ignore if result > 0: @@ -313,7 +299,7 @@ class RedisDatabase: """Delete all resumes for a specific user and return count of deleted resumes""" try: # Get all resume IDs for this user - user_resumes_key = f"{self.KEY_PREFIXES['user_resumes']}{user_id}" + user_resumes_key = f"{KEY_PREFIXES['user_resumes']}{user_id}" resume_ids = await self.redis.lrange(user_resumes_key, 0, -1)# type: ignore if not resume_ids: @@ -327,7 +313,7 @@ class RedisDatabase: # Delete each resume for resume_id in resume_ids: - pipe.delete(f"{self.KEY_PREFIXES['resumes']}{user_id}:{resume_id}") + pipe.delete(f"{KEY_PREFIXES['resumes']}{user_id}:{resume_id}") deleted_count += 1 # Delete the user's resume list @@ -346,7 +332,7 @@ class RedisDatabase: async def get_all_resumes(self) -> Dict[str, List[Dict]]: """Get all resumes grouped by user (admin function)""" try: - pattern = f"{self.KEY_PREFIXES['resumes']}*" + pattern = f"{KEY_PREFIXES['resumes']}*" keys = await self.redis.keys(pattern) if not keys: @@ -362,7 +348,7 @@ class RedisDatabase: for key, value in zip(keys, values): if value: # Extract user_id from key format: resume:{user_id}:{resume_id} - key_parts = key.replace(self.KEY_PREFIXES['resumes'], '').split(':', 1) + key_parts = key.replace(KEY_PREFIXES['resumes'], '').split(':', 1) if len(key_parts) >= 1: user_id = key_parts[0] resume_data = self._deserialize(value) @@ -482,7 +468,7 @@ class RedisDatabase: resume_data.update(updates) resume_data["updated_at"] = datetime.now(UTC).isoformat() - key = f"{self.KEY_PREFIXES['resumes']}{user_id}:{resume_id}" + key = f"{KEY_PREFIXES['resumes']}{user_id}:{resume_id}" await self.redis.set(key, self._serialize(resume_data)) logger.info(f"๐Ÿ“„ Updated resume {resume_id} for user {user_id}") @@ -513,7 +499,7 @@ class RedisDatabase: """Delete all documents for a specific candidate and return count of deleted documents""" try: # Get all document IDs for this candidate - key = f"{self.KEY_PREFIXES['candidate_documents']}{candidate_id}" + key = f"{KEY_PREFIXES['candidate_documents']}{candidate_id}" document_ids = await self.redis.lrange(key, 0, -1)# type: ignore if not document_ids: @@ -528,7 +514,7 @@ class RedisDatabase: # Delete each document's metadata for doc_id in document_ids: pipe.delete(f"document:{doc_id}") - pipe.delete(f"{self.KEY_PREFIXES['job_requirements']}{doc_id}") + pipe.delete(f"{KEY_PREFIXES['job_requirements']}{doc_id}") deleted_count += 1 # Delete the candidate's document list @@ -656,7 +642,7 @@ class RedisDatabase: async def get_candidate_documents(self, candidate_id: str) -> List[Dict]: """Get all documents for a specific candidate""" - key = f"{self.KEY_PREFIXES['candidate_documents']}{candidate_id}" + key = f"{KEY_PREFIXES['candidate_documents']}{candidate_id}" document_ids = await self.redis.lrange(key, 0, -1) # type: ignore if not document_ids: @@ -683,12 +669,12 @@ class RedisDatabase: async def add_document_to_candidate(self, candidate_id: str, document_id: str): """Add a document ID to a candidate's document list""" - key = f"{self.KEY_PREFIXES['candidate_documents']}{candidate_id}" + key = f"{KEY_PREFIXES['candidate_documents']}{candidate_id}" await self.redis.rpush(key, document_id)# type: ignore async def remove_document_from_candidate(self, candidate_id: str, document_id: str): """Remove a document ID from a candidate's document list""" - key = f"{self.KEY_PREFIXES['candidate_documents']}{candidate_id}" + key = f"{KEY_PREFIXES['candidate_documents']}{candidate_id}" await self.redis.lrem(key, 0, document_id)# type: ignore async def update_document(self, document_id: str, updates: Dict): @@ -720,7 +706,7 @@ class RedisDatabase: async def get_document_count_for_candidate(self, candidate_id: str) -> int: """Get total number of documents for a candidate""" - key = f"{self.KEY_PREFIXES['candidate_documents']}{candidate_id}" + key = f"{KEY_PREFIXES['candidate_documents']}{candidate_id}" return await self.redis.llen(key)# type: ignore async def search_candidate_documents(self, candidate_id: str, query: str) -> List[Dict]: @@ -737,7 +723,7 @@ class RedisDatabase: async def get_job_requirements(self, document_id: str) -> Optional[Dict]: """Get cached job requirements analysis for a document""" try: - key = f"{self.KEY_PREFIXES['job_requirements']}{document_id}" + key = f"{KEY_PREFIXES['job_requirements']}{document_id}" data = await self.redis.get(key) if data: requirements_data = self._deserialize(data) @@ -752,7 +738,7 @@ class RedisDatabase: async def save_job_requirements(self, document_id: str, requirements: Dict) -> bool: """Save job requirements analysis results for a document""" try: - key = f"{self.KEY_PREFIXES['job_requirements']}{document_id}" + key = f"{KEY_PREFIXES['job_requirements']}{document_id}" # Add metadata to the requirements requirements_with_meta = { @@ -775,7 +761,7 @@ class RedisDatabase: async def delete_job_requirements(self, document_id: str) -> bool: """Delete cached job requirements for a document""" try: - key = f"{self.KEY_PREFIXES['job_requirements']}{document_id}" + key = f"{KEY_PREFIXES['job_requirements']}{document_id}" result = await self.redis.delete(key) if result > 0: logger.info(f"๐Ÿ“‹ Deleted job requirements for document {document_id}") @@ -788,7 +774,7 @@ class RedisDatabase: async def get_all_job_requirements(self) -> Dict[str, Any]: """Get all cached job requirements""" try: - pattern = f"{self.KEY_PREFIXES['job_requirements']}*" + pattern = f"{KEY_PREFIXES['job_requirements']}*" keys = await self.redis.keys(pattern) if not keys: @@ -801,7 +787,7 @@ class RedisDatabase: result = {} for key, value in zip(keys, values): - document_id = key.replace(self.KEY_PREFIXES['job_requirements'], '') + document_id = key.replace(KEY_PREFIXES['job_requirements'], '') if value: result[document_id] = self._deserialize(value) @@ -848,7 +834,7 @@ class RedisDatabase: pipe = self.redis.pipeline() for doc_id in document_ids: - key = f"{self.KEY_PREFIXES['job_requirements']}{doc_id}" + key = f"{KEY_PREFIXES['job_requirements']}{doc_id}" pipe.delete(key) deleted_count += 1 @@ -864,18 +850,18 @@ class RedisDatabase: # Viewer operations async def get_viewer(self, viewer_id: str) -> Optional[Dict]: """Get viewer by ID""" - key = f"{self.KEY_PREFIXES['viewers']}{viewer_id}" + key = f"{KEY_PREFIXES['viewers']}{viewer_id}" data = await self.redis.get(key) return self._deserialize(data) if data else None async def set_viewer(self, viewer_id: str, viewer_data: Dict): """Set viewer data""" - key = f"{self.KEY_PREFIXES['viewers']}{viewer_id}" + key = f"{KEY_PREFIXES['viewers']}{viewer_id}" await self.redis.set(key, self._serialize(viewer_data)) async def get_all_viewers(self) -> Dict[str, Any]: """Get all viewers""" - pattern = f"{self.KEY_PREFIXES['viewers']}*" + pattern = f"{KEY_PREFIXES['viewers']}*" keys = await self.redis.keys(pattern) if not keys: @@ -889,405 +875,31 @@ class RedisDatabase: result = {} for key, value in zip(keys, values): - viewer_id = key.replace(self.KEY_PREFIXES['viewers'], '') + viewer_id = key.replace(KEY_PREFIXES['viewers'], '') result[viewer_id] = self._deserialize(value) return result async def delete_viewer(self, viewer_id: str): """Delete viewer""" - key = f"{self.KEY_PREFIXES['viewers']}{viewer_id}" + key = f"{KEY_PREFIXES['viewers']}{viewer_id}" await self.redis.delete(key) - # Candidates operations - async def get_candidate(self, candidate_id: str) -> Optional[Dict]: - """Get candidate by ID""" - key = f"{self.KEY_PREFIXES['candidates']}{candidate_id}" - data = await self.redis.get(key) - return self._deserialize(data) if data else None - - async def set_candidate(self, candidate_id: str, candidate_data: Dict): - """Set candidate data""" - key = f"{self.KEY_PREFIXES['candidates']}{candidate_id}" - await self.redis.set(key, self._serialize(candidate_data)) - - async def get_all_candidates(self) -> Dict[str, Any]: - """Get all candidates""" - pattern = f"{self.KEY_PREFIXES['candidates']}*" - keys = await self.redis.keys(pattern) - - if not keys: - return {} - - # Use pipeline for efficiency - pipe = self.redis.pipeline() - for key in keys: - pipe.get(key) - values = await pipe.execute() - - result = {} - for key, value in zip(keys, values): - candidate_id = key.replace(self.KEY_PREFIXES['candidates'], '') - result[candidate_id] = self._deserialize(value) - - return result - - async def delete_candidate(self, candidate_id: str) -> Dict[str, int]: - """ - Delete candidate and all related records in a cascading manner - Returns a dictionary with counts of deleted items for each category - """ - try: - deletion_stats = { - "documents": 0, - "chat_sessions": 0, - "chat_messages": 0, - "job_applications": 0, - "user_records": 0, - "auth_records": 0, - "security_logs": 0, - "ai_parameters": 0, - "candidate_record": 0, - "resumes": 0 - } - - logger.info(f"๐Ÿ—‘๏ธ Starting cascading delete for candidate {candidate_id}") - - # 1. Get candidate data first to retrieve associated information - candidate_data = await self.get_candidate(candidate_id) - if not candidate_data: - logger.warning(f"โš ๏ธ Candidate {candidate_id} not found") - return deletion_stats - - candidate_email = candidate_data.get("email", "").lower() - candidate_username = candidate_data.get("username", "").lower() - - # 2. Delete all candidate documents and their metadata - try: - documents_deleted = await self.delete_all_candidate_documents(candidate_id) - deletion_stats["documents"] = documents_deleted - logger.info(f"๐Ÿ—‘๏ธ Deleted {documents_deleted} documents for candidate {candidate_id}") - except Exception as e: - logger.error(f"โŒ Error deleting candidate documents: {e}") - - # 3. Delete all chat sessions related to this candidate - try: - candidate_sessions = await self.get_chat_sessions_by_candidate(candidate_id) - messages_deleted = 0 - - for session in candidate_sessions: - session_id = session.get("id") - if session_id: - # Count messages before deletion - message_count = await self.get_chat_message_count(session_id) - messages_deleted += message_count - - # Delete chat session and its messages - await self.delete_chat_session_completely(session_id) - - deletion_stats["chat_sessions"] = len(candidate_sessions) - deletion_stats["chat_messages"] = messages_deleted - logger.info(f"๐Ÿ—‘๏ธ Deleted {len(candidate_sessions)} chat sessions and {messages_deleted} messages for candidate {candidate_id}") - except Exception as e: - logger.error(f"โŒ Error deleting chat sessions: {e}") - - # 4. Delete job applications from this candidate - try: - all_applications = await self.get_all_job_applications() - candidate_applications = [] - - for app_id, app_data in all_applications.items(): - if app_data.get("candidateId") == candidate_id: - candidate_applications.append(app_id) - - # Delete each application - for app_id in candidate_applications: - await self.delete_job_application(app_id) - - deletion_stats["job_applications"] = len(candidate_applications) - logger.info(f"๐Ÿ—‘๏ธ Deleted {len(candidate_applications)} job applications for candidate {candidate_id}") - except Exception as e: - logger.error(f"โŒ Error deleting job applications: {e}") - - # 5. Delete user records (by email and username if they exist) - try: - user_records_deleted = 0 - - # Delete by email - if candidate_email and await self.user_exists_by_email(candidate_email): - await self.delete_user(candidate_email) - user_records_deleted += 1 - logger.info(f"๐Ÿ—‘๏ธ Deleted user record by email: {candidate_email}") - - # Delete by username (if different from email) - if (candidate_username and - candidate_username != candidate_email and - await self.user_exists_by_username(candidate_username)): - await self.delete_user(candidate_username) - user_records_deleted += 1 - logger.info(f"๐Ÿ—‘๏ธ Deleted user record by username: {candidate_username}") - - # Delete user by ID if exists - user_by_id = await self.get_user_by_id(candidate_id) - if user_by_id: - key = f"user_by_id:{candidate_id}" - await self.redis.delete(key) - user_records_deleted += 1 - logger.info(f"๐Ÿ—‘๏ธ Deleted user record by ID: {candidate_id}") - - deletion_stats["user_records"] = user_records_deleted - logger.info(f"๐Ÿ—‘๏ธ Deleted {user_records_deleted} user records for candidate {candidate_id}") - except Exception as e: - logger.error(f"โŒ Error deleting user records: {e}") - - # 6. Delete authentication records - try: - auth_deleted = await self.delete_authentication(candidate_id) - if auth_deleted: - deletion_stats["auth_records"] = 1 - logger.info(f"๐Ÿ—‘๏ธ Deleted authentication record for candidate {candidate_id}") - except Exception as e: - logger.error(f"โŒ Error deleting authentication records: {e}") - - # 7. Revoke all refresh tokens for this user - try: - await self.revoke_all_user_tokens(candidate_id) - logger.info(f"๐Ÿ—‘๏ธ Revoked all refresh tokens for candidate {candidate_id}") - except Exception as e: - logger.error(f"โŒ Error revoking refresh tokens: {e}") - - # 8. Delete security logs for this user - try: - security_logs_deleted = 0 - # Security logs are stored by date, so we need to scan for them - pattern = f"security_log:{candidate_id}:*" - cursor = 0 - - while True: - cursor, keys = await self.redis.scan(cursor, match=pattern, count=100) - - if keys: - await self.redis.delete(*keys) - security_logs_deleted += len(keys) - - if cursor == 0: - break - - deletion_stats["security_logs"] = security_logs_deleted - if security_logs_deleted > 0: - logger.info(f"๐Ÿ—‘๏ธ Deleted {security_logs_deleted} security log entries for candidate {candidate_id}") - except Exception as e: - logger.error(f"โŒ Error deleting security logs: {e}") - - # 9. Delete AI parameters that might be specific to this candidate - try: - all_ai_params = await self.get_all_ai_parameters() - candidate_ai_params = [] - - for param_id, param_data in all_ai_params.items(): - if (param_data.get("candidateId") == candidate_id or - param_data.get("userId") == candidate_id): - candidate_ai_params.append(param_id) - - # Delete each AI parameter set - for param_id in candidate_ai_params: - await self.delete_ai_parameters(param_id) - - deletion_stats["ai_parameters"] = len(candidate_ai_params) - if len(candidate_ai_params) > 0: - logger.info(f"๐Ÿ—‘๏ธ Deleted {len(candidate_ai_params)} AI parameter sets for candidate {candidate_id}") - except Exception as e: - logger.error(f"โŒ Error deleting AI parameters: {e}") - - # 10. Delete email verification tokens if any exist - try: - if candidate_email: - # Clean up any pending verification tokens - pattern = "email_verification:*" - cursor = 0 - tokens_deleted = 0 - - while True: - cursor, keys = await self.redis.scan(cursor, match=pattern, count=100) - - for key in keys: - token_data = await self.redis.get(key) - if token_data: - verification_info = json.loads(token_data) - if verification_info.get("email", "").lower() == candidate_email: - await self.redis.delete(key) - tokens_deleted += 1 - - if cursor == 0: - break - - if tokens_deleted > 0: - logger.info(f"๐Ÿ—‘๏ธ Deleted {tokens_deleted} email verification tokens for candidate {candidate_id}") - except Exception as e: - logger.error(f"โŒ Error deleting email verification tokens: {e}") - - # 11. Delete password reset tokens if any exist - try: - if candidate_email: - pattern = "password_reset:*" - cursor = 0 - tokens_deleted = 0 - - while True: - cursor, keys = await self.redis.scan(cursor, match=pattern, count=100) - - for key in keys: - token_data = await self.redis.get(key) - if token_data: - reset_info = json.loads(token_data) - if reset_info.get("email", "").lower() == candidate_email: - await self.redis.delete(key) - tokens_deleted += 1 - - if cursor == 0: - break - - if tokens_deleted > 0: - logger.info(f"๐Ÿ—‘๏ธ Deleted {tokens_deleted} password reset tokens for candidate {candidate_id}") - except Exception as e: - logger.error(f"โŒ Error deleting password reset tokens: {e}") - - # 12. Delete MFA codes if any exist - try: - if candidate_email: - pattern = f"mfa_code:{candidate_email}:*" - cursor = 0 - mfa_codes_deleted = 0 - - while True: - cursor, keys = await self.redis.scan(cursor, match=pattern, count=100) - - if keys: - await self.redis.delete(*keys) - mfa_codes_deleted += len(keys) - - if cursor == 0: - break - - if mfa_codes_deleted > 0: - logger.info(f"๐Ÿ—‘๏ธ Deleted {mfa_codes_deleted} MFA codes for candidate {candidate_id}") - except Exception as e: - logger.error(f"โŒ Error deleting MFA codes: {e}") - - # 13. Finally, delete the candidate record itself - try: - key = f"{self.KEY_PREFIXES['candidates']}{candidate_id}" - result = await self.redis.delete(key) - deletion_stats["candidate_record"] = result - logger.info(f"๐Ÿ—‘๏ธ Deleted candidate record for {candidate_id}") - except Exception as e: - logger.error(f"โŒ Error deleting candidate record: {e}") - - # 14. Delete resumes associated with this candidate across all users - try: - all_resumes = await self.get_all_resumes() - candidate_resumes_deleted = 0 - - for user_id, user_resumes in all_resumes.items(): - resumes_to_delete = [] - for resume in user_resumes: - if resume.get("candidate_id") == candidate_id: - resumes_to_delete.append(resume.get("resume_id")) - - # Delete each resume for this candidate - for resume_id in resumes_to_delete: - if resume_id: - await self.delete_resume(user_id, resume_id) - candidate_resumes_deleted += 1 - - deletion_stats["resumes"] = candidate_resumes_deleted - if candidate_resumes_deleted > 0: - logger.info(f"๐Ÿ—‘๏ธ Deleted {candidate_resumes_deleted} resumes for candidate {candidate_id}") - except Exception as e: - logger.error(f"โŒ Error deleting resumes for candidate {candidate_id}: {e}") - - # 15. Log the deletion as a security event (if we have admin/system user context) - try: - total_items_deleted = sum(deletion_stats.values()) - logger.info(f"โœ… Completed cascading delete for candidate {candidate_id}. " - f"Total items deleted: {total_items_deleted}") - logger.info(f"๐Ÿ“Š Deletion breakdown: {deletion_stats}") - except Exception as e: - logger.error(f"โŒ Error logging deletion summary: {e}") - - return deletion_stats - - except Exception as e: - logger.error(f"โŒ Critical error during candidate deletion {candidate_id}: {e}") - raise - - async def delete_candidate_batch(self, candidate_ids: List[str]) -> Dict[str, Dict[str, int]]: - """ - Delete multiple candidates in batch with detailed reporting - Returns deletion stats for each candidate - """ - try: - batch_results = {} - total_stats = { - "documents": 0, - "chat_sessions": 0, - "chat_messages": 0, - "job_applications": 0, - "user_records": 0, - "auth_records": 0, - "security_logs": 0, - "ai_parameters": 0, - "candidate_record": 0, - "resumes": 0, - } - - logger.info(f"๐Ÿ—‘๏ธ Starting batch deletion for {len(candidate_ids)} candidates") - - for candidate_id in candidate_ids: - try: - deletion_stats = await self.delete_candidate(candidate_id) - batch_results[candidate_id] = deletion_stats - - # Add to totals - for key, value in deletion_stats.items(): - total_stats[key] += value - - except Exception as e: - logger.error(f"โŒ Failed to delete candidate {candidate_id}: {e}") - batch_results[candidate_id] = {"error": str(e)} - - logger.info(f"โœ… Completed batch deletion. Total items deleted: {sum(total_stats.values())}") - logger.info(f"๐Ÿ“Š Batch totals: {total_stats}") - - return { - "individual_results": batch_results, - "totals": total_stats, - "summary": { - "total_candidates_processed": len(candidate_ids), - "successful_deletions": len([r for r in batch_results.values() if "error" not in r]), - "failed_deletions": len([r for r in batch_results.values() if "error" in r]), - "total_items_deleted": sum(total_stats.values()) - } - } - - except Exception as e: - logger.error(f"โŒ Critical error during batch candidate deletion: {e}") - raise # Employers operations async def get_employer(self, employer_id: str) -> Optional[Dict]: """Get employer by ID""" - key = f"{self.KEY_PREFIXES['employers']}{employer_id}" + key = f"{KEY_PREFIXES['employers']}{employer_id}" data = await self.redis.get(key) return self._deserialize(data) if data else None async def set_employer(self, employer_id: str, employer_data: Dict): """Set employer data""" - key = f"{self.KEY_PREFIXES['employers']}{employer_id}" + key = f"{KEY_PREFIXES['employers']}{employer_id}" await self.redis.set(key, self._serialize(employer_data)) async def get_all_employers(self) -> Dict[str, Any]: """Get all employers""" - pattern = f"{self.KEY_PREFIXES['employers']}*" + pattern = f"{KEY_PREFIXES['employers']}*" keys = await self.redis.keys(pattern) if not keys: @@ -1300,31 +912,31 @@ class RedisDatabase: result = {} for key, value in zip(keys, values): - employer_id = key.replace(self.KEY_PREFIXES['employers'], '') + employer_id = key.replace(KEY_PREFIXES['employers'], '') result[employer_id] = self._deserialize(value) return result async def delete_employer(self, employer_id: str): """Delete employer""" - key = f"{self.KEY_PREFIXES['employers']}{employer_id}" + key = f"{KEY_PREFIXES['employers']}{employer_id}" await self.redis.delete(key) # Jobs operations async def get_job(self, job_id: str) -> Optional[Dict]: """Get job by ID""" - key = f"{self.KEY_PREFIXES['jobs']}{job_id}" + key = f"{KEY_PREFIXES['jobs']}{job_id}" data = await self.redis.get(key) return self._deserialize(data) if data else None async def set_job(self, job_id: str, job_data: Dict): """Set job data""" - key = f"{self.KEY_PREFIXES['jobs']}{job_id}" + key = f"{KEY_PREFIXES['jobs']}{job_id}" await self.redis.set(key, self._serialize(job_data)) async def get_all_jobs(self) -> Dict[str, Any]: """Get all jobs""" - pattern = f"{self.KEY_PREFIXES['jobs']}*" + pattern = f"{KEY_PREFIXES['jobs']}*" keys = await self.redis.keys(pattern) if not keys: @@ -1337,14 +949,14 @@ class RedisDatabase: result = {} for key, value in zip(keys, values): - job_id = key.replace(self.KEY_PREFIXES['jobs'], '') + job_id = key.replace(KEY_PREFIXES['jobs'], '') result[job_id] = self._deserialize(value) return result async def delete_job(self, job_id: str): """Delete job""" - key = f"{self.KEY_PREFIXES['jobs']}{job_id}" + key = f"{KEY_PREFIXES['jobs']}{job_id}" await self.redis.delete(key) # MFA and Email Verification operations @@ -1633,18 +1245,18 @@ class RedisDatabase: # Job Applications operations async def get_job_application(self, application_id: str) -> Optional[Dict]: """Get job application by ID""" - key = f"{self.KEY_PREFIXES['job_applications']}{application_id}" + key = f"{KEY_PREFIXES['job_applications']}{application_id}" data = await self.redis.get(key) return self._deserialize(data) if data else None async def set_job_application(self, application_id: str, application_data: Dict): """Set job application data""" - key = f"{self.KEY_PREFIXES['job_applications']}{application_id}" + key = f"{KEY_PREFIXES['job_applications']}{application_id}" await self.redis.set(key, self._serialize(application_data)) async def get_all_job_applications(self) -> Dict[str, Any]: """Get all job applications""" - pattern = f"{self.KEY_PREFIXES['job_applications']}*" + pattern = f"{KEY_PREFIXES['job_applications']}*" keys = await self.redis.keys(pattern) if not keys: @@ -1657,31 +1269,31 @@ class RedisDatabase: result = {} for key, value in zip(keys, values): - app_id = key.replace(self.KEY_PREFIXES['job_applications'], '') + app_id = key.replace(KEY_PREFIXES['job_applications'], '') result[app_id] = self._deserialize(value) return result async def delete_job_application(self, application_id: str): """Delete job application""" - key = f"{self.KEY_PREFIXES['job_applications']}{application_id}" + key = f"{KEY_PREFIXES['job_applications']}{application_id}" await self.redis.delete(key) # Chat Sessions operations async def get_chat_session(self, session_id: str) -> Optional[Dict]: """Get chat session by ID""" - key = f"{self.KEY_PREFIXES['chat_sessions']}{session_id}" + key = f"{KEY_PREFIXES['chat_sessions']}{session_id}" data = await self.redis.get(key) return self._deserialize(data) if data else None async def set_chat_session(self, session_id: str, session_data: Dict): """Set chat session data""" - key = f"{self.KEY_PREFIXES['chat_sessions']}{session_id}" + key = f"{KEY_PREFIXES['chat_sessions']}{session_id}" await self.redis.set(key, self._serialize(session_data)) async def get_all_chat_sessions(self) -> Dict[str, Any]: """Get all chat sessions""" - pattern = f"{self.KEY_PREFIXES['chat_sessions']}*" + pattern = f"{KEY_PREFIXES['chat_sessions']}*" keys = await self.redis.keys(pattern) if not keys: @@ -1694,7 +1306,7 @@ class RedisDatabase: result = {} for key, value in zip(keys, values): - session_id = key.replace(self.KEY_PREFIXES['chat_sessions'], '') + session_id = key.replace(KEY_PREFIXES['chat_sessions'], '') result[session_id] = self._deserialize(value) return result @@ -1712,7 +1324,7 @@ class RedisDatabase: '''Delete a specific chat message from Redis''' try: # Remove from the session's message list - key = f"{self.KEY_PREFIXES['chat_messages']}{session_id}" + key = f"{KEY_PREFIXES['chat_messages']}{session_id}" await self.redis.lrem(key, 0, message_id)# type: ignore # Delete the message data itself result = await self.redis.delete(f"chat_message:{message_id}") @@ -1724,18 +1336,18 @@ class RedisDatabase: # Chat Messages operations (stored as lists) async def get_chat_messages(self, session_id: str) -> List[Dict]: """Get chat messages for a session""" - key = f"{self.KEY_PREFIXES['chat_messages']}{session_id}" + key = f"{KEY_PREFIXES['chat_messages']}{session_id}" messages = await self.redis.lrange(key, 0, -1)# type: ignore return [self._deserialize(msg) for msg in messages if msg] async def add_chat_message(self, session_id: str, message_data: Dict): """Add a chat message to a session""" - key = f"{self.KEY_PREFIXES['chat_messages']}{session_id}" + key = f"{KEY_PREFIXES['chat_messages']}{session_id}" await self.redis.rpush(key, self._serialize(message_data))# type: ignore async def set_chat_messages(self, session_id: str, messages: List[Dict]): """Set all chat messages for a session (replaces existing)""" - key = f"{self.KEY_PREFIXES['chat_messages']}{session_id}" + key = f"{KEY_PREFIXES['chat_messages']}{session_id}" # Clear existing messages await self.redis.delete(key) @@ -1747,7 +1359,7 @@ class RedisDatabase: async def get_all_chat_messages(self) -> Dict[str, List[Dict]]: """Get all chat messages grouped by session""" - pattern = f"{self.KEY_PREFIXES['chat_messages']}*" + pattern = f"{KEY_PREFIXES['chat_messages']}*" keys = await self.redis.keys(pattern) if not keys: @@ -1755,7 +1367,7 @@ class RedisDatabase: result = {} for key in keys: - session_id = key.replace(self.KEY_PREFIXES['chat_messages'], '') + session_id = key.replace(KEY_PREFIXES['chat_messages'], '') messages = await self.redis.lrange(key, 0, -1)# type: ignore result[session_id] = [self._deserialize(msg) for msg in messages if msg] @@ -1763,7 +1375,7 @@ class RedisDatabase: async def delete_chat_messages(self, session_id: str): """Delete all chat messages for a session""" - key = f"{self.KEY_PREFIXES['chat_messages']}{session_id}" + key = f"{KEY_PREFIXES['chat_messages']}{session_id}" await self.redis.delete(key) # Enhanced Chat Session Methods @@ -1810,7 +1422,7 @@ class RedisDatabase: async def get_chat_message_count(self, session_id: str) -> int: """Get the total number of messages in a chat session""" - key = f"{self.KEY_PREFIXES['chat_messages']}{session_id}" + key = f"{KEY_PREFIXES['chat_messages']}{session_id}" return await self.redis.llen(key)# type: ignore async def search_chat_messages(self, session_id: str, query: str) -> List[Dict]: @@ -1859,23 +1471,6 @@ class RedisDatabase: return archived_count - # User Operations - async def get_user_by_username(self, username: str) -> Optional[Dict]: - """Get user by username specifically""" - username_key = f"{self.KEY_PREFIXES['users']}{username.lower()}" - data = await self.redis.get(username_key) - return self._deserialize(data) if data else None - - async def find_candidate_by_username(self, username: str) -> Optional[Dict]: - """Find candidate by username""" - all_candidates = await self.get_all_candidates() - username_lower = username.lower() - - for candidate_data in all_candidates.values(): - if candidate_data.get("username", "").lower() == username_lower: - return candidate_data - - return None # Analytics and Reporting async def get_chat_statistics(self) -> Dict[str, Any]: @@ -1947,19 +1542,6 @@ class RedisDatabase: "recent_sessions": sessions[:5] # Last 5 sessions } - # Batch Operations - async def get_multiple_candidates_by_usernames(self, usernames: List[str]) -> Dict[str, Dict]: - """Get multiple candidates by their usernames efficiently""" - all_candidates = await self.get_all_candidates() - username_set = {username.lower() for username in usernames} - - result = {} - for candidate_data in all_candidates.values(): - candidate_username = candidate_data.get("username", "").lower() - if candidate_username in username_set: - result[candidate_username] = candidate_data - - return result async def bulk_update_chat_sessions(self, session_updates: Dict[str, Dict]): """Bulk update multiple chat sessions""" @@ -1970,7 +1552,7 @@ class RedisDatabase: if session_data: session_data.update(updates) session_data["updatedAt"] = datetime.now(UTC).isoformat() - key = f"{self.KEY_PREFIXES['chat_sessions']}{session_id}" + key = f"{KEY_PREFIXES['chat_sessions']}{session_id}" pipe.set(key, self._serialize(session_data)) await pipe.execute() @@ -1979,18 +1561,18 @@ class RedisDatabase: # AI Parameters operations async def get_ai_parameters(self, param_id: str) -> Optional[Dict]: """Get AI parameters by ID""" - key = f"{self.KEY_PREFIXES['ai_parameters']}{param_id}" + key = f"{KEY_PREFIXES['ai_parameters']}{param_id}" data = await self.redis.get(key) return self._deserialize(data) if data else None async def set_ai_parameters(self, param_id: str, param_data: Dict): """Set AI parameters data""" - key = f"{self.KEY_PREFIXES['ai_parameters']}{param_id}" + key = f"{KEY_PREFIXES['ai_parameters']}{param_id}" await self.redis.set(key, self._serialize(param_data)) async def get_all_ai_parameters(self) -> Dict[str, Any]: """Get all AI parameters""" - pattern = f"{self.KEY_PREFIXES['ai_parameters']}*" + pattern = f"{KEY_PREFIXES['ai_parameters']}*" keys = await self.redis.keys(pattern) if not keys: @@ -2003,20 +1585,20 @@ class RedisDatabase: result = {} for key, value in zip(keys, values): - param_id = key.replace(self.KEY_PREFIXES['ai_parameters'], '') + param_id = key.replace(KEY_PREFIXES['ai_parameters'], '') result[param_id] = self._deserialize(value) return result async def delete_ai_parameters(self, param_id: str): """Delete AI parameters""" - key = f"{self.KEY_PREFIXES['ai_parameters']}{param_id}" + key = f"{KEY_PREFIXES['ai_parameters']}{param_id}" await self.redis.delete(key) async def get_all_users(self) -> Dict[str, Any]: """Get all users""" - pattern = f"{self.KEY_PREFIXES['users']}*" + pattern = f"{KEY_PREFIXES['users']}*" keys = await self.redis.keys(pattern) if not keys: @@ -2029,7 +1611,7 @@ class RedisDatabase: result = {} for key, value in zip(keys, values): - email = key.replace(self.KEY_PREFIXES['users'], '') + email = key.replace(KEY_PREFIXES['users'], '') logger.info(f"๐Ÿ” Found user key: {key}, type: {type(value)}") if type(value) == str: result[email] = value @@ -2040,13 +1622,13 @@ class RedisDatabase: async def delete_user(self, email: str): """Delete user""" - key = f"{self.KEY_PREFIXES['users']}{email}" + key = f"{KEY_PREFIXES['users']}{email}" await self.redis.delete(key) async def get_job_requirements_stats(self) -> Dict[str, Any]: """Get statistics about cached job requirements""" try: - pattern = f"{self.KEY_PREFIXES['job_requirements']}*" + pattern = f"{KEY_PREFIXES['job_requirements']}*" keys = await self.redis.keys(pattern) stats = { @@ -2066,7 +1648,7 @@ class RedisDatabase: if value: requirements_data = self._deserialize(value) if requirements_data: - document_id = key.replace(self.KEY_PREFIXES['job_requirements'], '') + document_id = key.replace(KEY_PREFIXES['job_requirements'], '') stats["documents_with_requirements"].append(document_id) # Track cache dates @@ -2097,7 +1679,7 @@ class RedisDatabase: document_exists = await self.get_document(document_id) if not document_exists: # Document no longer exists, delete its job requirements - key = f"{self.KEY_PREFIXES['job_requirements']}{document_id}" + key = f"{KEY_PREFIXES['job_requirements']}{document_id}" pipe.delete(key) orphaned_count += 1 logger.info(f"๐Ÿ“‹ Queued orphaned job requirements for deletion: {document_id}") @@ -2112,23 +1694,6 @@ class RedisDatabase: return 0 # Utility methods - async def clear_all_data(self): - """Clear all data from Redis (use with caution!)""" - for prefix in self.KEY_PREFIXES.values(): - pattern = f"{prefix}*" - keys = await self.redis.keys(pattern) - if keys: - await self.redis.delete(*keys) - - async def get_stats(self) -> Dict[str, int]: - """Get statistics about stored data""" - stats = {} - for data_type, prefix in self.KEY_PREFIXES.items(): - pattern = f"{prefix}*" - keys = await self.redis.keys(pattern) - stats[data_type] = len(keys) - return stats - # Authentication Record Methods async def set_authentication(self, user_id: str, auth_data: Dict[str, Any]) -> bool: """Store authentication record for a user""" diff --git a/src/backend/database/mixins/ai.py b/src/backend/database/mixins/ai.py new file mode 100644 index 0000000..0589e40 --- /dev/null +++ b/src/backend/database/mixins/ai.py @@ -0,0 +1,50 @@ +from datetime import UTC, datetime, timedelta, timezone +import logging +import json +from typing import List, Optional, Any, Dict, TYPE_CHECKING, Self + + +from .protocols import DatabaseProtocol + +from ..constants import KEY_PREFIXES + +logger = logging.getLogger(__name__) + +class AIMixin(DatabaseProtocol): + """Mixin for AI operations""" + async def get_ai_parameters(self, param_id: str) -> Optional[Dict]: + """Get AI parameters by ID""" + key = f"{KEY_PREFIXES['ai_parameters']}{param_id}" + data = await self.redis.get(key) + return self._deserialize(data) if data else None + + async def set_ai_parameters(self, param_id: str, param_data: Dict): + """Set AI parameters data""" + key = f"{KEY_PREFIXES['ai_parameters']}{param_id}" + await self.redis.set(key, self._serialize(param_data)) + + async def get_all_ai_parameters(self) -> Dict[str, Any]: + """Get all AI parameters""" + pattern = f"{KEY_PREFIXES['ai_parameters']}*" + keys = await self.redis.keys(pattern) + + if not keys: + return {} + + pipe = self.redis.pipeline() + for key in keys: + pipe.get(key) + values = await pipe.execute() + + result = {} + for key, value in zip(keys, values): + param_id = key.replace(KEY_PREFIXES['ai_parameters'], '') + result[param_id] = self._deserialize(value) + + return result + + async def delete_ai_parameters(self, param_id: str): + """Delete AI parameters""" + key = f"{KEY_PREFIXES['ai_parameters']}{param_id}" + await self.redis.delete(key) + diff --git a/src/backend/database/mixins/analytics.py b/src/backend/database/mixins/analytics.py new file mode 100644 index 0000000..9b23a8a --- /dev/null +++ b/src/backend/database/mixins/analytics.py @@ -0,0 +1,13 @@ +import logging +from typing import Any, Dict, TYPE_CHECKING + +if TYPE_CHECKING: + from .protocols import DatabaseProtocol + +from ..constants import KEY_PREFIXES + +logger = logging.getLogger(__name__) + +class AnalyticsMixin: + """Mixin for analytics-related database operations""" + diff --git a/src/backend/database/mixins/auth.py b/src/backend/database/mixins/auth.py new file mode 100644 index 0000000..cf6fd48 --- /dev/null +++ b/src/backend/database/mixins/auth.py @@ -0,0 +1,513 @@ +from datetime import datetime, timedelta, timezone +import json +import logging +from typing import Any, Dict, TYPE_CHECKING, List, Optional + +from .protocols import DatabaseProtocol + +from ..constants import KEY_PREFIXES + +logger = logging.getLogger(__name__) + +class AuthMixin(DatabaseProtocol): + """Mixin for auth-related database operations""" + + async def find_verification_token_by_email(self, email: str) -> Optional[Dict[str, Any]]: + """Find pending verification token by email address""" + try: + pattern = "email_verification:*" + cursor = 0 + email_lower = email.lower() + + while True: + cursor, keys = await self.redis.scan(cursor, match=pattern, count=100) + + for key in keys: + token_data = await self.redis.get(key) + if token_data: + verification_info = json.loads(token_data) + if (verification_info.get("email", "").lower() == email_lower and + not verification_info.get("verified", False)): + # Extract token from key + token = key.replace("email_verification:", "") + verification_info["token"] = token + return verification_info + + if cursor == 0: + break + + return None + + except Exception as e: + logger.error(f"โŒ Error finding verification token by email {email}: {e}") + return None + + async def get_pending_verifications_count(self) -> int: + """Get count of pending email verifications (admin function)""" + try: + pattern = "email_verification:*" + cursor = 0 + count = 0 + + while True: + cursor, keys = await self.redis.scan(cursor, match=pattern, count=100) + + for key in keys: + token_data = await self.redis.get(key) + if token_data: + verification_info = json.loads(token_data) + if not verification_info.get("verified", False): + count += 1 + + if cursor == 0: + break + + return count + + except Exception as e: + logger.error(f"โŒ Error counting pending verifications: {e}") + return 0 + + async def cleanup_expired_verification_tokens(self) -> int: + """Clean up expired verification tokens and return count of cleaned tokens""" + try: + pattern = "email_verification:*" + cursor = 0 + cleaned_count = 0 + current_time = datetime.now(timezone.utc) + + while True: + cursor, keys = await self.redis.scan(cursor, match=pattern, count=100) + + for key in keys: + token_data = await self.redis.get(key) + if token_data: + verification_info = json.loads(token_data) + expires_at = datetime.fromisoformat(verification_info.get("expires_at", "")) + + if current_time > expires_at: + await self.redis.delete(key) + cleaned_count += 1 + logger.info(f"๐Ÿงน Cleaned expired verification token for {verification_info.get('email')}") + + if cursor == 0: + break + + if cleaned_count > 0: + logger.info(f"๐Ÿงน Cleaned up {cleaned_count} expired verification tokens") + + return cleaned_count + + except Exception as e: + logger.error(f"โŒ Error cleaning up expired verification tokens: {e}") + return 0 + + async def get_verification_attempts_count(self, email: str) -> int: + """Get the number of verification emails sent for an email in the last 24 hours""" + try: + key = f"verification_attempts:{email.lower()}" + data = await self.redis.get(key) + + if not data: + return 0 + + attempts_data = json.loads(data) + current_time = datetime.now(timezone.utc) + window_start = current_time - timedelta(hours=24) + + # Filter out old attempts + recent_attempts = [ + attempt for attempt in attempts_data + if datetime.fromisoformat(attempt) > window_start + ] + + return len(recent_attempts) + + except Exception as e: + logger.error(f"โŒ Error getting verification attempts count for {email}: {e}") + return 0 + + async def record_verification_attempt(self, email: str) -> bool: + """Record a verification email attempt""" + try: + key = f"verification_attempts:{email.lower()}" + current_time = datetime.now(timezone.utc) + + # Get existing attempts + data = await self.redis.get(key) + attempts_data = json.loads(data) if data else [] + + # Add current attempt + attempts_data.append(current_time.isoformat()) + + # Keep only last 24 hours of attempts + window_start = current_time - timedelta(hours=24) + recent_attempts = [ + attempt for attempt in attempts_data + if datetime.fromisoformat(attempt) > window_start + ] + + # Store with 24 hour expiration + await self.redis.setex( + key, + 24 * 60 * 60, # 24 hours + json.dumps(recent_attempts) + ) + + return True + + except Exception as e: + logger.error(f"โŒ Error recording verification attempt for {email}: {e}") + return False + + async def store_email_verification_token(self, email: str, token: str, user_type: str, user_data: dict) -> bool: + """Store email verification token with user data""" + try: + key = f"email_verification:{token}" + verification_data = { + "email": email.lower(), + "user_type": user_type, + "user_data": user_data, + "expires_at": (datetime.now(timezone.utc) + timedelta(hours=24)).isoformat(), + "created_at": datetime.now(timezone.utc).isoformat(), + "verified": False + } + + # Store with 24 hour expiration + await self.redis.setex( + key, + 24 * 60 * 60, # 24 hours in seconds + json.dumps(verification_data, default=str) + ) + + logger.info(f"๐Ÿ“ง Stored email verification token for {email}") + return True + except Exception as e: + logger.error(f"โŒ Error storing email verification token: {e}") + return False + + async def get_email_verification_token(self, token: str) -> Optional[Dict[str, Any]]: + """Retrieve email verification token data""" + try: + key = f"email_verification:{token}" + data = await self.redis.get(key) + if data: + return json.loads(data) + return None + except Exception as e: + logger.error(f"โŒ Error retrieving email verification token: {e}") + return None + + async def mark_email_verified(self, token: str) -> bool: + """Mark email verification token as used""" + try: + key = f"email_verification:{token}" + token_data = await self.get_email_verification_token(token) + if token_data: + token_data["verified"] = True + token_data["verified_at"] = datetime.now(timezone.utc).isoformat() + await self.redis.setex( + key, + 24 * 60 * 60, # Keep for remaining TTL + json.dumps(token_data, default=str) + ) + return True + return False + except Exception as e: + logger.error(f"โŒ Error marking email verified: {e}") + return False + + async def store_mfa_code(self, email: str, code: str, device_id: str) -> bool: + """Store MFA code for verification""" + try: + logger.info("๐Ÿ” Storing MFA code for email: %s", email ) + key = f"mfa_code:{email.lower()}:{device_id}" + mfa_data = { + "code": code, + "email": email.lower(), + "device_id": device_id, + "expires_at": (datetime.now(timezone.utc) + timedelta(minutes=10)).isoformat(), + "created_at": datetime.now(timezone.utc).isoformat(), + "attempts": 0, + "verified": False + } + + # Store with 10 minute expiration + await self.redis.setex( + key, + 10 * 60, # 10 minutes in seconds + json.dumps(mfa_data, default=str) + ) + + logger.info(f"๐Ÿ” Stored MFA code for {email}") + return True + except Exception as e: + logger.error(f"โŒ Error storing MFA code: {e}") + return False + + async def get_mfa_code(self, email: str, device_id: str) -> Optional[Dict[str, Any]]: + """Retrieve MFA code data""" + try: + key = f"mfa_code:{email.lower()}:{device_id}" + data = await self.redis.get(key) + if data: + return json.loads(data) + return None + except Exception as e: + logger.error(f"โŒ Error retrieving MFA code: {e}") + return None + + async def increment_mfa_attempts(self, email: str, device_id: str) -> int: + """Increment MFA verification attempts""" + try: + key = f"mfa_code:{email.lower()}:{device_id}" + mfa_data = await self.get_mfa_code(email, device_id) + if mfa_data: + mfa_data["attempts"] += 1 + await self.redis.setex( + key, + 10 * 60, # Keep original TTL + json.dumps(mfa_data, default=str) + ) + return mfa_data["attempts"] + return 0 + except Exception as e: + logger.error(f"โŒ Error incrementing MFA attempts: {e}") + return 0 + + async def mark_mfa_verified(self, email: str, device_id: str) -> bool: + """Mark MFA code as verified""" + try: + key = f"mfa_code:{email.lower()}:{device_id}" + mfa_data = await self.get_mfa_code(email, device_id) + if mfa_data: + mfa_data["verified"] = True + mfa_data["verified_at"] = datetime.now(timezone.utc).isoformat() + await self.redis.setex( + key, + 10 * 60, # Keep for remaining TTL + json.dumps(mfa_data, default=str) + ) + return True + return False + except Exception as e: + logger.error(f"โŒ Error marking MFA verified: {e}") + return False + + async def set_authentication(self, user_id: str, auth_data: Dict[str, Any]) -> bool: + """Store authentication record for a user""" + try: + key = f"auth:{user_id}" + await self.redis.set(key, json.dumps(auth_data, default=str)) + logger.info(f"๐Ÿ” Stored authentication record for user {user_id}") + return True + except Exception as e: + logger.error(f"โŒ Error storing authentication record for {user_id}: {e}") + return False + + async def get_authentication(self, user_id: str) -> Optional[Dict[str, Any]]: + """Retrieve authentication record for a user""" + try: + key = f"auth:{user_id}" + data = await self.redis.get(key) + if data: + return json.loads(data) + return None + except Exception as e: + logger.error(f"โŒ Error retrieving authentication record for {user_id}: {e}") + return None + + async def delete_authentication(self, user_id: str) -> bool: + """Delete authentication record for a user""" + try: + key = f"auth:{user_id}" + result = await self.redis.delete(key) + logger.info(f"๐Ÿ” Deleted authentication record for user {user_id}") + return result > 0 + except Exception as e: + logger.error(f"โŒ Error deleting authentication record for {user_id}: {e}") + return False + + async def store_refresh_token(self, user_id: str, token: str, expires_at: datetime, device_info: Dict[str, str]) -> bool: + """Store refresh token for a user""" + try: + key = f"refresh_token:{token}" + token_data = { + "user_id": user_id, + "expires_at": expires_at.isoformat(), + "device": device_info.get("device", "unknown"), + "ip_address": device_info.get("ip_address", "unknown"), + "is_revoked": False, + "created_at": datetime.now(timezone.utc).isoformat() + } + + # Store with expiration + ttl_seconds = int((expires_at - datetime.now(timezone.utc)).total_seconds()) + if ttl_seconds > 0: + await self.redis.setex(key, ttl_seconds, json.dumps(token_data, default=str)) + logger.info(f"๐Ÿ” Stored refresh token for user {user_id}") + return True + else: + logger.warning(f"โš ๏ธ Attempted to store expired refresh token for user {user_id}") + return False + except Exception as e: + logger.error(f"โŒ Error storing refresh token for {user_id}: {e}") + return False + + async def get_refresh_token(self, token: str) -> Optional[Dict[str, Any]]: + """Retrieve refresh token data""" + try: + key = f"refresh_token:{token}" + data = await self.redis.get(key) + if data: + return json.loads(data) + return None + except Exception as e: + logger.error(f"โŒ Error retrieving refresh token: {e}") + return None + + async def revoke_refresh_token(self, token: str) -> bool: + """Revoke a refresh token""" + try: + key = f"refresh_token:{token}" + token_data = await self.get_refresh_token(token) + if token_data: + token_data["is_revoked"] = True + token_data["revoked_at"] = datetime.now(timezone.utc).isoformat() + await self.redis.set(key, json.dumps(token_data, default=str)) + logger.info(f"๐Ÿ” Revoked refresh token") + return True + return False + except Exception as e: + logger.error(f"โŒ Error revoking refresh token: {e}") + return False + + async def revoke_all_user_tokens(self, user_id: str) -> bool: + """Revoke all refresh tokens for a user""" + try: + # This requires scanning all refresh tokens - consider using a user token index for efficiency + pattern = "refresh_token:*" + cursor = 0 + revoked_count = 0 + + while True: + cursor, keys = await self.redis.scan(cursor, match=pattern, count=100) + + for key in keys: + token_data = await self.redis.get(key) + if token_data: + token_info = json.loads(token_data) + if token_info.get("user_id") == user_id and not token_info.get("is_revoked"): + token_info["is_revoked"] = True + token_info["revoked_at"] = datetime.now(timezone.utc).isoformat() + await self.redis.set(key, json.dumps(token_info, default=str)) + revoked_count += 1 + + if cursor == 0: + break + + logger.info(f"๐Ÿ” Revoked {revoked_count} refresh tokens for user {user_id}") + return True + except Exception as e: + logger.error(f"โŒ Error revoking all tokens for user {user_id}: {e}") + return False + + # Password Reset Token Methods + async def store_password_reset_token(self, email: str, token: str, expires_at: datetime) -> bool: + """Store password reset token""" + try: + key = f"password_reset:{token}" + token_data = { + "email": email.lower(), + "expires_at": expires_at.isoformat(), + "used": False, + "created_at": datetime.now(timezone.utc).isoformat() + } + + # Store with expiration + ttl_seconds = int((expires_at - datetime.now(timezone.utc)).total_seconds()) + if ttl_seconds > 0: + await self.redis.setex(key, ttl_seconds, json.dumps(token_data, default=str)) + logger.info(f"๐Ÿ” Stored password reset token for {email}") + return True + else: + logger.warning(f"โš ๏ธ Attempted to store expired password reset token for {email}") + return False + except Exception as e: + logger.error(f"โŒ Error storing password reset token for {email}: {e}") + return False + + async def get_password_reset_token(self, token: str) -> Optional[Dict[str, Any]]: + """Retrieve password reset token data""" + try: + key = f"password_reset:{token}" + data = await self.redis.get(key) + if data: + return json.loads(data) + return None + except Exception as e: + logger.error(f"โŒ Error retrieving password reset token: {e}") + return None + + async def mark_password_reset_token_used(self, token: str) -> bool: + """Mark password reset token as used""" + try: + key = f"password_reset:{token}" + token_data = await self.get_password_reset_token(token) + if token_data: + token_data["used"] = True + token_data["used_at"] = datetime.now(timezone.utc).isoformat() + await self.redis.set(key, json.dumps(token_data, default=str)) + logger.info(f"๐Ÿ” Marked password reset token as used") + return True + return False + except Exception as e: + logger.error(f"โŒ Error marking password reset token as used: {e}") + return False + + # User Activity and Security Logging + async def log_security_event(self, user_id: str, event_type: str, details: Dict[str, Any]) -> bool: + """Log security events for audit purposes""" + try: + key = f"security_log:{user_id}:{datetime.now(timezone.utc).strftime('%Y-%m-%d')}" + event_data = { + "timestamp": datetime.now(timezone.utc).isoformat(), + "user_id": user_id, + "event_type": event_type, + "details": details + } + + # Add to list (latest events first) + await self.redis.lpush(key, json.dumps(event_data, default=str))# type: ignore + + # Keep only last 100 events per day + await self.redis.ltrim(key, 0, 99)# type: ignore + + # Set expiration for 30 days + await self.redis.expire(key, 30 * 24 * 60 * 60) + + logger.info(f"๐Ÿ”’ Logged security event {event_type} for user {user_id}") + return True + except Exception as e: + logger.error(f"โŒ Error logging security event for {user_id}: {e}") + return False + + async def get_user_security_log(self, user_id: str, days: int = 7) -> List[Dict[str, Any]]: + """Retrieve security log for a user""" + try: + events = [] + for i in range(days): + date = (datetime.now(timezone.utc) - timedelta(days=i)).strftime('%Y-%m-%d') + key = f"security_log:{user_id}:{date}" + + daily_events = await self.redis.lrange(key, 0, -1)# type: ignore + for event_json in daily_events: + events.append(json.loads(event_json)) + + # Sort by timestamp (most recent first) + events.sort(key=lambda x: x["timestamp"], reverse=True) + return events + except Exception as e: + logger.error(f"โŒ Error retrieving security log for {user_id}: {e}") + return [] + diff --git a/src/backend/database/mixins/base.py b/src/backend/database/mixins/base.py new file mode 100644 index 0000000..41bd32b --- /dev/null +++ b/src/backend/database/mixins/base.py @@ -0,0 +1,51 @@ +from datetime import UTC, datetime +import json +import logging +from typing import Any, Dict, TYPE_CHECKING, Optional + +from .protocols import DatabaseProtocol + +from ..constants import KEY_PREFIXES +logger = logging.getLogger(__name__) + +if TYPE_CHECKING: + from ..core import RedisDatabase + +class BaseMixin(DatabaseProtocol): + """Base mixin with core Redis operations and utilities""" + + def _serialize(self, data: Any) -> str: + """Serialize data to JSON string for Redis storage""" + if data is None: + return "" + return json.dumps(data, default=str) + + def _deserialize(self, data: str) -> Any: + """Deserialize JSON string from Redis""" + if not data: + return None + try: + return json.loads(data) + except json.JSONDecodeError: + logger.error(f"Failed to deserialize data: {data}") + return None + + async def get_stats(self) -> Dict[str, int]: + """Get statistics about stored data""" + stats = {} + for data_type, prefix in KEY_PREFIXES.items(): + pattern = f"{prefix}*" + keys = await self.redis.keys(pattern) + stats[data_type] = len(keys) + return stats + + async def clear_all_data(self): + """Clear all data from Redis (use with caution!)""" + for prefix in KEY_PREFIXES.values(): + pattern = f"{prefix}*" + keys = await self.redis.keys(pattern) + if keys: + await self.redis.delete(*keys) + + + \ No newline at end of file diff --git a/src/backend/database/mixins/chat.py b/src/backend/database/mixins/chat.py new file mode 100644 index 0000000..0ee95c2 --- /dev/null +++ b/src/backend/database/mixins/chat.py @@ -0,0 +1,289 @@ +from datetime import datetime, UTC, timedelta +import logging +from typing import Any, Dict, TYPE_CHECKING, List, Optional + +from .protocols import DatabaseProtocol + +from ..constants import KEY_PREFIXES + +logger = logging.getLogger(__name__) + +class ChatMixin(DatabaseProtocol): + """Mixin for chat-related database operations""" + + # Chat Sessions operations + async def get_candidate_chat_summary(self, candidate_id: str) -> Dict[str, Any]: + """Get a summary of chat activity for a specific candidate""" + sessions = await self.get_chat_sessions_by_candidate(candidate_id) + + if not sessions: + return { + "candidate_id": candidate_id, + "total_sessions": 0, + "total_messages": 0, + "first_chat": None, + "last_chat": None + } + + total_messages = 0 + for session in sessions: + session_id = session.get("id") + if session_id: + message_count = await self.get_chat_message_count(session_id) + total_messages += message_count + + # Sort sessions by creation date + sessions_by_date = sorted(sessions, key=lambda x: x.get("createdAt", "")) + + return { + "candidate_id": candidate_id, + "total_sessions": len(sessions), + "total_messages": total_messages, + "first_chat": sessions_by_date[0].get("createdAt") if sessions_by_date else None, + "last_chat": sessions_by_date[-1].get("lastActivity") if sessions_by_date else None, + "recent_sessions": sessions[:5] # Last 5 sessions + } + + # Chat Sessions operations + async def get_chat_session(self, session_id: str) -> Optional[Dict]: + """Get chat session by ID""" + key = f"{KEY_PREFIXES['chat_sessions']}{session_id}" + data = await self.redis.get(key) + return self._deserialize(data) if data else None + + async def set_chat_session(self, session_id: str, session_data: Dict): + """Set chat session data""" + key = f"{KEY_PREFIXES['chat_sessions']}{session_id}" + await self.redis.set(key, self._serialize(session_data)) + + async def get_all_chat_sessions(self) -> Dict[str, Any]: + """Get all chat sessions""" + pattern = f"{KEY_PREFIXES['chat_sessions']}*" + keys = await self.redis.keys(pattern) + + if not keys: + return {} + + pipe = self.redis.pipeline() + for key in keys: + pipe.get(key) + values = await pipe.execute() + + result = {} + for key, value in zip(keys, values): + session_id = key.replace(KEY_PREFIXES['chat_sessions'], '') + result[session_id] = self._deserialize(value) + + return result + + async def delete_chat_session(self, session_id: str) -> bool: + '''Delete a chat session from Redis''' + try: + result = await self.redis.delete(f"chat_session:{session_id}") + return result > 0 + except Exception as e: + logger.error(f"Error deleting chat session {session_id}: {e}") + raise + + async def delete_chat_message(self, session_id: str, message_id: str) -> bool: + '''Delete a specific chat message from Redis''' + try: + # Remove from the session's message list + key = f"{KEY_PREFIXES['chat_messages']}{session_id}" + await self.redis.lrem(key, 0, message_id)# type: ignore + # Delete the message data itself + result = await self.redis.delete(f"chat_message:{message_id}") + return result > 0 + except Exception as e: + logger.error(f"Error deleting chat message {message_id}: {e}") + raise + + # Chat Messages operations (stored as lists) + async def get_chat_messages(self, session_id: str) -> List[Dict]: + """Get chat messages for a session""" + key = f"{KEY_PREFIXES['chat_messages']}{session_id}" + messages = await self.redis.lrange(key, 0, -1)# type: ignore + return [self._deserialize(msg) for msg in messages if msg] # type: ignore + + async def add_chat_message(self, session_id: str, message_data: Dict): + """Add a chat message to a session""" + key = f"{KEY_PREFIXES['chat_messages']}{session_id}" + await self.redis.rpush(key, self._serialize(message_data))# type: ignore + + async def set_chat_messages(self, session_id: str, messages: List[Dict]): + """Set all chat messages for a session (replaces existing)""" + key = f"{KEY_PREFIXES['chat_messages']}{session_id}" + + # Clear existing messages + await self.redis.delete(key) + + # Add new messages + if messages: + serialized_messages = [self._serialize(msg) for msg in messages] + await self.redis.rpush(key, *serialized_messages)# type: ignore + + async def get_all_chat_messages(self) -> Dict[str, List[Dict]]: + """Get all chat messages grouped by session""" + pattern = f"{KEY_PREFIXES['chat_messages']}*" + keys = await self.redis.keys(pattern) + + if not keys: + return {} + + result = {} + for key in keys: + session_id = key.replace(KEY_PREFIXES['chat_messages'], '') + messages = await self.redis.lrange(key, 0, -1)# type: ignore + result[session_id] = [self._deserialize(msg) for msg in messages if msg] + + return result + + async def delete_chat_messages(self, session_id: str): + """Delete all chat messages for a session""" + key = f"{KEY_PREFIXES['chat_messages']}{session_id}" + await self.redis.delete(key) + + # Enhanced Chat Session Methods + async def get_chat_sessions_by_user(self, user_id: str) -> List[Dict]: + """Get all chat sessions for a specific user""" + all_sessions = await self.get_all_chat_sessions() + user_sessions = [] + + for session_data in all_sessions.values(): + if session_data.get("userId") == user_id or session_data.get("guestId") == user_id: + user_sessions.append(session_data) + + # Sort by last activity (most recent first) + user_sessions.sort(key=lambda x: x.get("lastActivity", ""), reverse=True) + return user_sessions + + async def get_chat_sessions_by_candidate(self, candidate_id: str) -> List[Dict]: + """Get all chat sessions related to a specific candidate""" + all_sessions = await self.get_all_chat_sessions() + candidate_sessions = [] + + for session_data in all_sessions.values(): + context = session_data.get("context", {}) + if (context.get("relatedEntityType") == "candidate" and + context.get("relatedEntityId") == candidate_id): + candidate_sessions.append(session_data) + + # Sort by last activity (most recent first) + candidate_sessions.sort(key=lambda x: x.get("lastActivity", ""), reverse=True) + return candidate_sessions + + async def update_chat_session_activity(self, session_id: str): + """Update the last activity timestamp for a chat session""" + session_data = await self.get_chat_session(session_id) + if session_data: + session_data["lastActivity"] = datetime.now(UTC).isoformat() + await self.set_chat_session(session_id, session_data) + + async def get_recent_chat_messages(self, session_id: str, limit: int = 10) -> List[Dict]: + """Get the most recent chat messages for a session""" + messages = await self.get_chat_messages(session_id) + # Return the last 'limit' messages + return messages[-limit:] if len(messages) > limit else messages + + async def get_chat_message_count(self, session_id: str) -> int: + """Get the total number of messages in a chat session""" + key = f"{KEY_PREFIXES['chat_messages']}{session_id}" + return await self.redis.llen(key)# type: ignore + + async def search_chat_messages(self, session_id: str, query: str) -> List[Dict]: + """Search for messages containing specific text in a session""" + messages = await self.get_chat_messages(session_id) + query_lower = query.lower() + + matching_messages = [] + for msg in messages: + content = msg.get("content", "").lower() + if query_lower in content: + matching_messages.append(msg) + + return matching_messages + + # Chat Session Management + async def archive_chat_session(self, session_id: str): + """Archive a chat session""" + session_data = await self.get_chat_session(session_id) + if session_data: + session_data["isArchived"] = True + session_data["updatedAt"] = datetime.now(UTC).isoformat() + await self.set_chat_session(session_id, session_data) + + async def delete_chat_session_completely(self, session_id: str): + """Delete a chat session and all its messages""" + # Delete the session + await self.delete_chat_session(session_id) + # Delete all messages + await self.delete_chat_messages(session_id) + + async def cleanup_old_chat_sessions(self, days_old: int = 90): + """Archive or delete chat sessions older than specified days""" + cutoff_date = datetime.now(UTC) - timedelta(days=days_old) + cutoff_iso = cutoff_date.isoformat() + + all_sessions = await self.get_all_chat_sessions() + archived_count = 0 + + for session_id, session_data in all_sessions.items(): + last_activity = session_data.get("lastActivity", session_data.get("createdAt", "")) + + if last_activity < cutoff_iso and not session_data.get("isArchived", False): + await self.archive_chat_session(session_id) + archived_count += 1 + + return archived_count + + + # Analytics and Reporting + async def get_chat_statistics(self) -> Dict[str, Any]: + """Get comprehensive chat statistics""" + all_sessions = await self.get_all_chat_sessions() + all_messages = await self.get_all_chat_messages() + + stats = { + "total_sessions": len(all_sessions), + "total_messages": sum(len(messages) for messages in all_messages.values()), + "active_sessions": 0, + "archived_sessions": 0, + "sessions_by_type": {}, + "sessions_with_candidates": 0, + "average_messages_per_session": 0 + } + + # Analyze sessions + for session_data in all_sessions.values(): + if session_data.get("isArchived", False): + stats["archived_sessions"] += 1 + else: + stats["active_sessions"] += 1 + + # Count by type + context_type = session_data.get("context", {}).get("type", "unknown") + stats["sessions_by_type"][context_type] = stats["sessions_by_type"].get(context_type, 0) + 1 + + # Count sessions with candidate association + if session_data.get("context", {}).get("relatedEntityType") == "candidate": + stats["sessions_with_candidates"] += 1 + + # Calculate averages + if stats["total_sessions"] > 0: + stats["average_messages_per_session"] = stats["total_messages"] / stats["total_sessions"] + + return stats + + async def bulk_update_chat_sessions(self, session_updates: Dict[str, Dict]): + """Bulk update multiple chat sessions""" + pipe = self.redis.pipeline() + + for session_id, updates in session_updates.items(): + session_data = await self.get_chat_session(session_id) + if session_data: + session_data.update(updates) + session_data["updatedAt"] = datetime.now(UTC).isoformat() + key = f"{KEY_PREFIXES['chat_sessions']}{session_id}" + pipe.set(key, self._serialize(session_data)) + + await pipe.execute() diff --git a/src/backend/database/mixins/document.py b/src/backend/database/mixins/document.py new file mode 100644 index 0000000..04f90c8 --- /dev/null +++ b/src/backend/database/mixins/document.py @@ -0,0 +1,143 @@ +from datetime import UTC, datetime +import logging +from typing import Any, Dict, TYPE_CHECKING, List, Optional + +from .protocols import DatabaseProtocol +from ..constants import KEY_PREFIXES + +logger = logging.getLogger(__name__) + +class DocumentMixin(DatabaseProtocol): + """Mixin for document-related database operations""" + + async def get_document(self, document_id: str) -> Optional[Dict]: + """Get document metadata by ID""" + key = f"document:{document_id}" + data = await self.redis.get(key) + return self._deserialize(data) if data else None + + async def set_document(self, document_id: str, document_data: Dict): + """Set document metadata""" + key = f"document:{document_id}" + await self.redis.set(key, self._serialize(document_data)) + + async def delete_document(self, document_id: str): + """Delete document metadata""" + key = f"document:{document_id}" + await self.redis.delete(key) + + async def delete_all_candidate_documents(self, candidate_id: str) -> int: + """Delete all documents for a specific candidate and return count of deleted documents""" + try: + # Get all document IDs for this candidate + key = f"{KEY_PREFIXES['candidate_documents']}{candidate_id}" + document_ids = await self.redis.lrange(key, 0, -1)# type: ignore + + if not document_ids: + logger.info(f"No documents found for candidate {candidate_id}") + return 0 + + deleted_count = 0 + + # Use pipeline for efficient batch operations + pipe = self.redis.pipeline() + + # Delete each document's metadata + for doc_id in document_ids: + pipe.delete(f"document:{doc_id}") + pipe.delete(f"{KEY_PREFIXES['job_requirements']}{doc_id}") + deleted_count += 1 + + # Delete the candidate's document list + pipe.delete(key) + + # Execute all operations + await pipe.execute() + + logger.info(f"Successfully deleted {deleted_count} documents for candidate {candidate_id}") + return deleted_count + + except Exception as e: + logger.error(f"Error deleting all documents for candidate {candidate_id}: {e}") + raise + + async def get_candidate_documents(self, candidate_id: str) -> List[Dict]: + """Get all documents for a specific candidate""" + key = f"{KEY_PREFIXES['candidate_documents']}{candidate_id}" + document_ids = await self.redis.lrange(key, 0, -1) # type: ignore + + if not document_ids: + return [] + + # Get all document metadata + pipe = self.redis.pipeline() + for doc_id in document_ids: + pipe.get(f"document:{doc_id}") + values = await pipe.execute() + + documents = [] + for doc_id, value in zip(document_ids, values): + if value: + doc_data = self._deserialize(value) + if doc_data: + documents.append(doc_data) + else: + # Clean up orphaned document ID + await self.redis.lrem(key, 0, doc_id)# type: ignore + logger.warning(f"Removed orphaned document ID {doc_id} for candidate {candidate_id}") + + return documents + + async def add_document_to_candidate(self, candidate_id: str, document_id: str): + """Add a document ID to a candidate's document list""" + key = f"{KEY_PREFIXES['candidate_documents']}{candidate_id}" + await self.redis.rpush(key, document_id)# type: ignore + + async def remove_document_from_candidate(self, candidate_id: str, document_id: str): + """Remove a document ID from a candidate's document list""" + key = f"{KEY_PREFIXES['candidate_documents']}{candidate_id}" + await self.redis.lrem(key, 0, document_id)# type: ignore + + async def update_document(self, document_id: str, updates: Dict) -> Dict[Any, Any] | None: + """Update document metadata""" + document_data = await self.get_document(document_id) + if document_data: + document_data.update(updates) + await self.set_document(document_id, document_data) + return document_data + return None + + async def get_documents_by_rag_status(self, candidate_id: str, include_in_rag: bool = True) -> List[Dict]: + """Get candidate documents filtered by RAG inclusion status""" + all_documents = await self.get_candidate_documents(candidate_id) + return [doc for doc in all_documents if doc.get("include_in_rag", False) == include_in_rag] + + async def bulk_update_document_rag_status(self, candidate_id: str, document_ids: List[str], include_in_rag: bool): + """Bulk update RAG status for multiple documents""" + pipe = self.redis.pipeline() + + for doc_id in document_ids: + doc_data = await self.get_document(doc_id) + if doc_data and doc_data.get("candidate_id") == candidate_id: + doc_data["include_in_rag"] = include_in_rag + doc_data["updatedAt"] = datetime.now(UTC).isoformat() + pipe.set(f"document:{doc_id}", self._serialize(doc_data)) + + await pipe.execute() + + async def get_document_count_for_candidate(self, candidate_id: str) -> int: + """Get total number of documents for a candidate""" + key = f"{KEY_PREFIXES['candidate_documents']}{candidate_id}" + return await self.redis.llen(key)# type: ignore + + async def search_candidate_documents(self, candidate_id: str, query: str) -> List[Dict]: + """Search documents by filename for a candidate""" + all_documents = await self.get_candidate_documents(candidate_id) + query_lower = query.lower() + + return [ + doc for doc in all_documents + if (query_lower in doc.get("filename", "").lower() or + query_lower in doc.get("originalName", "").lower()) + ] + diff --git a/src/backend/database/mixins/job.py b/src/backend/database/mixins/job.py new file mode 100644 index 0000000..39fea6a --- /dev/null +++ b/src/backend/database/mixins/job.py @@ -0,0 +1,280 @@ +from datetime import datetime, UTC, timedelta +import logging +from typing import Any, Dict, TYPE_CHECKING, List, Optional + +from .protocols import DatabaseProtocol + +from ..constants import KEY_PREFIXES + +logger = logging.getLogger(__name__) + +class JobMixin(DatabaseProtocol): + """Mixin for job-related database operations""" + async def get_job(self, job_id: str) -> Optional[Dict]: + """Get job by ID""" + key = f"{KEY_PREFIXES['jobs']}{job_id}" + data = await self.redis.get(key) + return self._deserialize(data) if data else None + + async def set_job(self, job_id: str, job_data: Dict): + """Set job data""" + key = f"{KEY_PREFIXES['jobs']}{job_id}" + await self.redis.set(key, self._serialize(job_data)) + + async def get_all_jobs(self) -> Dict[str, Any]: + """Get all jobs""" + pattern = f"{KEY_PREFIXES['jobs']}*" + keys = await self.redis.keys(pattern) + + if not keys: + return {} + + pipe = self.redis.pipeline() + for key in keys: + pipe.get(key) + values = await pipe.execute() + + result = {} + for key, value in zip(keys, values): + job_id = key.replace(KEY_PREFIXES['jobs'], '') + result[job_id] = self._deserialize(value) + + return result + + async def delete_job(self, job_id: str): + """Delete job""" + key = f"{KEY_PREFIXES['jobs']}{job_id}" + await self.redis.delete(key) + + async def get_job_requirements_by_candidate(self, candidate_id: str) -> List[Dict]: + """Get all job requirements analysis for documents belonging to a candidate""" + try: + # Get all documents for the candidate + candidate_documents = await self.get_candidate_documents(candidate_id) + + if not candidate_documents: + return [] + + # Get job requirements for each document + job_requirements = [] + for doc in candidate_documents: + doc_id = doc.get("id") + if doc_id: + requirements = await self.get_job_requirements(doc_id) + if requirements: + # Add document metadata to requirements + requirements["document_filename"] = doc.get("filename") + requirements["document_original_name"] = doc.get("originalName") + job_requirements.append(requirements) + + return job_requirements + except Exception as e: + logger.error(f"โŒ Error retrieving job requirements for candidate {candidate_id}: {e}") + return [] + + async def invalidate_job_requirements_cache(self, document_id: str) -> bool: + """Invalidate (delete) cached job requirements for a document""" + # This is an alias for delete_job_requirements for semantic clarity + return await self.delete_job_requirements(document_id) + + async def bulk_delete_job_requirements(self, document_ids: List[str]) -> int: + """Delete job requirements for multiple documents and return count of deleted items""" + try: + deleted_count = 0 + pipe = self.redis.pipeline() + + for doc_id in document_ids: + key = f"{KEY_PREFIXES['job_requirements']}{doc_id}" + pipe.delete(key) + deleted_count += 1 + + results = await pipe.execute() + actual_deleted = sum(1 for result in results if result > 0) + + logger.info(f"๐Ÿ“‹ Bulk deleted job requirements for {actual_deleted}/{len(document_ids)} documents") + return actual_deleted + except Exception as e: + logger.error(f"โŒ Error bulk deleting job requirements: {e}") + return 0 + + # Job Applications operations + async def get_job_application(self, application_id: str) -> Optional[Dict]: + """Get job application by ID""" + key = f"{KEY_PREFIXES['job_applications']}{application_id}" + data = await self.redis.get(key) + return self._deserialize(data) if data else None + + async def set_job_application(self, application_id: str, application_data: Dict): + """Set job application data""" + key = f"{KEY_PREFIXES['job_applications']}{application_id}" + await self.redis.set(key, self._serialize(application_data)) + + async def get_all_job_applications(self) -> Dict[str, Any]: + """Get all job applications""" + pattern = f"{KEY_PREFIXES['job_applications']}*" + keys = await self.redis.keys(pattern) + + if not keys: + return {} + + pipe = self.redis.pipeline() + for key in keys: + pipe.get(key) + values = await pipe.execute() + + result = {} + for key, value in zip(keys, values): + app_id = key.replace(KEY_PREFIXES['job_applications'], '') + result[app_id] = self._deserialize(value) + + return result + + async def delete_job_application(self, application_id: str): + """Delete job application""" + key = f"{KEY_PREFIXES['job_applications']}{application_id}" + await self.redis.delete(key) + + async def cleanup_orphaned_job_requirements(self) -> int: + """Clean up job requirements for documents that no longer exist""" + try: + # Get all job requirements + all_requirements = await self.get_all_job_requirements() + + if not all_requirements: + return 0 + + orphaned_count = 0 + pipe = self.redis.pipeline() + + for document_id in all_requirements.keys(): + # Check if the document still exists + document_exists = await self.get_document(document_id) + if not document_exists: + # Document no longer exists, delete its job requirements + key = f"{KEY_PREFIXES['job_requirements']}{document_id}" + pipe.delete(key) + orphaned_count += 1 + logger.info(f"๐Ÿ“‹ Queued orphaned job requirements for deletion: {document_id}") + + if orphaned_count > 0: + await pipe.execute() + logger.info(f"๐Ÿงน Cleaned up {orphaned_count} orphaned job requirements") + + return orphaned_count + except Exception as e: + logger.error(f"โŒ Error cleaning up orphaned job requirements: {e}") + return 0 + + async def get_job_requirements(self, document_id: str) -> Optional[Dict]: + """Get cached job requirements analysis for a document""" + try: + key = f"{KEY_PREFIXES['job_requirements']}{document_id}" + data = await self.redis.get(key) + if data: + requirements_data = self._deserialize(data) + logger.info(f"๐Ÿ“‹ Retrieved cached job requirements for document {document_id}") + return requirements_data + logger.info(f"๐Ÿ“‹ No cached job requirements found for document {document_id}") + return None + except Exception as e: + logger.error(f"โŒ Error retrieving job requirements for document {document_id}: {e}") + return None + + async def save_job_requirements(self, document_id: str, requirements: Dict) -> bool: + """Save job requirements analysis results for a document""" + try: + key = f"{KEY_PREFIXES['job_requirements']}{document_id}" + + # Add metadata to the requirements + requirements_with_meta = { + **requirements, + "cached_at": datetime.now(UTC).isoformat(), + "document_id": document_id + } + + await self.redis.set(key, self._serialize(requirements_with_meta)) + + # Optional: Set expiration (e.g., 30 days) to prevent indefinite storage + # await self.redis.expire(key, 30 * 24 * 60 * 60) # 30 days + + logger.info(f"๐Ÿ“‹ Saved job requirements for document {document_id}") + return True + except Exception as e: + logger.error(f"โŒ Error saving job requirements for document {document_id}: {e}") + return False + + async def delete_job_requirements(self, document_id: str) -> bool: + """Delete cached job requirements for a document""" + try: + key = f"{KEY_PREFIXES['job_requirements']}{document_id}" + result = await self.redis.delete(key) + if result > 0: + logger.info(f"๐Ÿ“‹ Deleted job requirements for document {document_id}") + return True + return False + except Exception as e: + logger.error(f"โŒ Error deleting job requirements for document {document_id}: {e}") + return False + + async def get_all_job_requirements(self) -> Dict[str, Any]: + """Get all cached job requirements""" + try: + pattern = f"{KEY_PREFIXES['job_requirements']}*" + keys = await self.redis.keys(pattern) + + if not keys: + return {} + + pipe = self.redis.pipeline() + for key in keys: + pipe.get(key) + values = await pipe.execute() + + result = {} + for key, value in zip(keys, values): + document_id = key.replace(KEY_PREFIXES['job_requirements'], '') + if value: + result[document_id] = self._deserialize(value) + + return result + except Exception as e: + logger.error(f"โŒ Error retrieving all job requirements: {e}") + return {} + + async def get_job_requirements_stats(self) -> Dict[str, Any]: + """Get statistics about cached job requirements""" + try: + pattern = f"{KEY_PREFIXES['job_requirements']}*" + keys = await self.redis.keys(pattern) + + stats = { + "total_cached_requirements": len(keys), + "cache_dates": {}, + "documents_with_requirements": [] + } + + if keys: + # Get cache dates for analysis + pipe = self.redis.pipeline() + for key in keys: + pipe.get(key) + values = await pipe.execute() + + for key, value in zip(keys, values): + if value: + requirements_data = self._deserialize(value) + if requirements_data: + document_id = key.replace(KEY_PREFIXES['job_requirements'], '') + stats["documents_with_requirements"].append(document_id) + + # Track cache dates + cached_at = requirements_data.get("cached_at") + if cached_at: + cache_date = cached_at[:10] # Extract date part + stats["cache_dates"][cache_date] = stats["cache_dates"].get(cache_date, 0) + 1 + + return stats + except Exception as e: + logger.error(f"โŒ Error getting job requirements stats: {e}") + return {"total_cached_requirements": 0, "cache_dates": {}, "documents_with_requirements": []} + diff --git a/src/backend/database/mixins/protocols.py b/src/backend/database/mixins/protocols.py new file mode 100644 index 0000000..06774e7 --- /dev/null +++ b/src/backend/database/mixins/protocols.py @@ -0,0 +1,159 @@ +from datetime import datetime +from typing import Any, Dict, List, Optional, Protocol, TYPE_CHECKING +from redis.asyncio import Redis + +if TYPE_CHECKING: + from database.core import RedisDatabase + +from models import SkillAssessment + +class DatabaseProtocol(Protocol): + # Base mixin + redis: Redis + def _serialize(self, data) -> str: ... + def _deserialize(self, data: str): ... + + # Chat mixin + async def add_chat_message(self, session_id: str, message_data: Dict): ... + async def archive_chat_session(self, session_id: str): ... + async def bulk_update_chat_sessions(self, session_updates: Dict[str, Dict]): ... + async def delete_chat_message(self, session_id: str, message_id: str) -> bool: ... + async def delete_chat_messages(self, session_id: str): ... + async def delete_chat_session_completely(self, session_id: str): ... + async def delete_chat_session(self, session_id: str) -> bool: ... + + # Document mixin + async def add_document_to_candidate(self, candidate_id: str, document_id: str): ... + async def bulk_update_document_rag_status(self, candidate_id: str, document_ids: List[str], include_in_rag: bool): ... + + # Job mixin + async def bulk_delete_job_requirements(self, document_ids: List[str]) -> int: ... + async def cache_skill_match(self, cache_key: str, assessment: SkillAssessment) -> None: ... + + # User mixin + async def delete_candidate_batch(self, candidate_ids: List[str]) -> Dict[str, Dict[str, int]]: ... + async def delete_candidate(self, candidate_id: str) -> Dict[str, int]: ... + async def delete_employer(self, employer_id: str): ... + async def delete_guest(self, guest_id: str) -> bool: ... + async def delete_user(self, email: str): ... + async def find_candidate_by_username(self, username: str) -> Optional[Dict]: ... + async def get_all_users(self) -> Dict[str, Any]: ... + async def get_all_viewers(self) -> Dict[str, Any]: ... + async def get_candidate_chat_summary(self, candidate_id: str) -> Dict[str, Any]: ... + async def get_candidate_documents(self, candidate_id: str) -> List[Dict]: ... + async def get_candidate(self, candidate_id: str) -> Optional[Dict]: ... + async def get_employer(self, employer_id: str) -> Optional[Dict]: ... + async def get_guest_by_session_id(self, session_id: str) -> Optional[Dict[str, Any]]: ... + async def get_guest(self, guest_id: str) -> Optional[Dict[str, Any]]: ... + async def get_guest_statistics(self) -> Dict[str, Any]: ... + async def get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]: ... + async def get_user_by_username(self, username: str) -> Optional[Dict]: ... + async def get_user_rag_update_time(self, user_id: str) -> Optional[datetime]: ... + async def get_user_security_log(self, user_id: str, days: int = 7) -> List[Dict[str, Any]]: ... + async def get_user(self, login: str) -> Optional[Dict[str, Any]]: ... + async def invalidate_candidate_skill_cache(self, candidate_id: str) -> int: ... + async def invalidate_user_skill_cache(self, user_id: str) -> int: ... + async def set_candidate(self, candidate_id: str, candidate_data: Dict): ... + async def set_employer(self, employer_id: str, employer_data: Dict): ... + async def set_guest(self, guest_id: str, guest_data: Dict[str, Any]) -> None: ... + async def set_user_by_id(self, user_id: str, user_data: Dict[str, Any]) -> bool: ... + async def set_user(self, login: str, user_data: Dict[str, Any]) -> bool: ... + async def update_user_rag_timestamp(self, user_id: str) -> bool: ... + async def user_exists_by_email(self, email: str) -> bool: ... + async def user_exists_by_username(self, username: str) -> bool: ... + + # Auth mixin + async def cleanup_expired_verification_tokens(self) -> int: ... + async def cleanup_inactive_guests(self, inactive_hours: int = 24) -> int: ... + async def cleanup_old_chat_sessions(self, days_old: int = 90) -> int: ... + async def cleanup_orphaned_job_requirements(self) -> int: ... + async def clear_all_data(self: "DatabaseProtocol"): ... + async def clear_all_skill_match_cache(self) -> int: ... + + # Resume mixin + + async def delete_ai_parameters(self, param_id: str): ... + async def delete_all_candidate_documents(self, candidate_id: str) -> int: ... + async def delete_all_resumes_for_user(self, user_id: str) -> int: ... + async def delete_authentication(self, user_id: str) -> bool: ... + async def delete_document(self, document_id: str): ... + + async def delete_job_application(self, application_id: str): ... + async def delete_job_requirements(self, document_id: str) -> bool: ... + async def delete_job(self, job_id: str): ... + async def delete_resume(self, user_id: str, resume_id: str) -> bool: ... + async def delete_viewer(self, viewer_id: str): ... + async def find_verification_token_by_email(self, email: str) -> Optional[Dict[str, Any]]: ... + async def get_ai_parameters(self, param_id: str) -> Optional[Dict]: ... + async def get_all_ai_parameters(self) -> Dict[str, Any]: ... + async def get_all_candidates(self) -> Dict[str, Any]: ... + async def get_all_chat_messages(self) -> Dict[str, List[Dict]]: ... + async def get_all_chat_sessions(self) -> Dict[str, Any]: ... + async def get_all_employers(self) -> Dict[str, Any]: ... + async def get_all_guests(self) -> Dict[str, Dict[str, Any]]: ... + async def get_all_job_applications(self) -> Dict[str, Any]: ... + async def get_all_job_requirements(self) -> Dict[str, Any]: ... + async def get_all_jobs(self) -> Dict[str, Any]: ... + async def get_all_resumes_for_user(self, user_id: str) -> List[Dict]: ... + async def get_all_resumes(self) -> Dict[str, List[Dict]]: ... + async def get_authentication(self, user_id: str) -> Optional[Dict[str, Any]]: ... + async def get_cached_skill_match(self, cache_key: str) -> Optional[SkillAssessment]: ... + async def get_chat_message_count(self, session_id: str) -> int: ... + async def get_chat_messages(self, session_id: str) -> List[Dict]: ... + async def get_chat_sessions_by_candidate(self, candidate_id: str) -> List[Dict]: ... + async def get_chat_sessions_by_user(self, user_id: str) -> List[Dict]: ... + async def get_chat_session(self, session_id: str) -> Optional[Dict]: ... + async def get_chat_statistics(self) -> Dict[str, Any]: ... + async def get_document_count_for_candidate(self, candidate_id: str) -> int: ... + async def get_documents_by_rag_status(self, candidate_id: str, include_in_rag: bool = True) -> List[Dict]: ... + async def get_document(self, document_id: str) -> Optional[Dict]: ... + async def get_email_verification_token(self, token: str) -> Optional[Dict[str, Any]]: ... + async def get_job_application(self, application_id: str) -> Optional[Dict]: ... + async def get_job_requirements_by_candidate(self, candidate_id: str) -> List[Dict]: ... + async def get_job_requirements(self, document_id: str) -> Optional[Dict]: ... + async def get_job_requirements_stats(self) -> Dict[str, Any]: ... + async def get_job(self, job_id: str) -> Optional[Dict]: ... + async def get_mfa_code(self, email: str, device_id: str) -> Optional[Dict[str, Any]]: ... + async def get_multiple_candidates_by_usernames(self, usernames: List[str]) -> Dict[str, Dict]: ... + async def get_password_reset_token(self, token: str) -> Optional[Dict[str, Any]]: ... + async def get_pending_verifications_count(self) -> int: ... + async def get_recent_chat_messages(self, session_id: str, limit: int = 10) -> List[Dict]: ... + async def get_refresh_token(self, token: str) -> Optional[Dict[str, Any]]: ... + async def get_resumes_by_candidate(self, user_id: str, candidate_id: str) -> List[Dict]: ... + async def get_resumes_by_job(self, user_id: str, job_id: str) -> List[Dict]: ... + async def get_resume(self, user_id: str, resume_id: str) -> Optional[Dict]: ... + async def get_resume_statistics(self, user_id: str) -> Dict[str, Any]: ... + async def get_stats(self) -> Dict[str, int]: ... + async def get_verification_attempts_count(self, email: str) -> int: ... + async def get_viewer(self, viewer_id: str) -> Optional[Dict]: ... + async def increment_mfa_attempts(self, email: str, device_id: str) -> int: ... + async def invalidate_job_requirements_cache(self, document_id: str) -> bool: ... + async def log_security_event(self, user_id: str, event_type: str, details: Dict[str, Any]) -> bool: ... + async def mark_email_verified(self, token: str) -> bool: ... + async def mark_mfa_verified(self, email: str, device_id: str) -> bool: ... + async def mark_password_reset_token_used(self, token: str) -> bool: ... + async def record_verification_attempt(self, email: str) -> bool: ... + async def remove_document_from_candidate(self, candidate_id: str, document_id: str): ... + async def revoke_all_user_tokens(self, user_id: str) -> bool: ... + async def revoke_refresh_token(self, token: str) -> bool: ... + async def save_job_requirements(self, document_id: str, requirements: Dict) -> bool: ... + async def search_candidate_documents(self, candidate_id: str, query: str) -> List[Dict]: ... + async def search_chat_messages(self, session_id: str, query: str) -> List[Dict]: ... + async def search_resumes_for_user(self, user_id: str, query: str) -> List[Dict]: ... + async def set_ai_parameters(self, param_id: str, param_data: Dict): ... + async def set_authentication(self, user_id: str, auth_data: Dict[str, Any]) -> bool: ... + async def set_chat_messages(self, session_id: str, messages: List[Dict]): ... + async def set_chat_session(self, session_id: str, session_data: Dict): ... + async def set_document(self, document_id: str, document_data: Dict): ... + async def set_job_application(self, application_id: str, application_data: Dict): ... + async def set_job(self, job_id: str, job_data: Dict): ... + async def set_resume(self, user_id: str, resume_data: Dict) -> bool: ... + async def set_viewer(self, viewer_id: str, viewer_data: Dict): ... + async def store_email_verification_token(self, email: str, token: str, user_type: str, user_data: dict) -> bool: ... + async def store_mfa_code(self, email: str, code: str, device_id: str) -> bool: ... + async def store_password_reset_token(self, email: str, token: str, expires_at: datetime) -> bool: ... + async def store_refresh_token(self, user_id: str, token: str, expires_at: datetime, device_info: Dict[str, str]) -> bool: ... + async def update_chat_session_activity(self, session_id: str): ... + async def update_document(self, document_id: str, updates: Dict)-> Dict[Any, Any] | None: ... + async def update_resume(self, user_id: str, resume_id: str, updates: Dict) -> Optional[Dict]: ... + diff --git a/src/backend/database/mixins/resume.py b/src/backend/database/mixins/resume.py new file mode 100644 index 0000000..472cbf9 --- /dev/null +++ b/src/backend/database/mixins/resume.py @@ -0,0 +1,290 @@ +from datetime import UTC, datetime +import logging +from typing import Any, Dict, TYPE_CHECKING, List, Optional + +from .protocols import DatabaseProtocol +from ..constants import KEY_PREFIXES + +logger = logging.getLogger(__name__) + +class ResumeMixin(DatabaseProtocol): + """Mixin for resume-related database operations""" + + async def set_resume(self, user_id: str, resume_data: Dict) -> bool: + """Save a resume for a user""" + try: + # Generate resume_id if not present + if 'id' not in resume_data: + raise ValueError("Resume data must include an 'id' field") + + resume_id = resume_data['id'] + + # Store the resume data + key = f"{KEY_PREFIXES['resumes']}{user_id}:{resume_id}" + await self.redis.set(key, self._serialize(resume_data)) + + # Add resume_id to user's resume list + user_resumes_key = f"{KEY_PREFIXES['user_resumes']}{user_id}" + await self.redis.rpush(user_resumes_key, resume_id) # type: ignore + + logger.info(f"๐Ÿ“„ Saved resume {resume_id} for user {user_id}") + return True + except Exception as e: + logger.error(f"โŒ Error saving resume for user {user_id}: {e}") + return False + + async def get_resume(self, user_id: str, resume_id: str) -> Optional[Dict]: + """Get a specific resume for a user""" + try: + key = f"{KEY_PREFIXES['resumes']}{user_id}:{resume_id}" + data = await self.redis.get(key) + if data: + resume_data = self._deserialize(data) + logger.info(f"๐Ÿ“„ Retrieved resume {resume_id} for user {user_id}") + return resume_data + logger.info(f"๐Ÿ“„ Resume {resume_id} not found for user {user_id}") + return None + except Exception as e: + logger.error(f"โŒ Error retrieving resume {resume_id} for user {user_id}: {e}") + return None + + async def get_all_resumes_for_user(self, user_id: str) -> List[Dict]: + """Get all resumes for a specific user""" + try: + # Get all resume IDs for this user + user_resumes_key = f"{KEY_PREFIXES['user_resumes']}{user_id}" + resume_ids = await self.redis.lrange(user_resumes_key, 0, -1)# type: ignore + + if not resume_ids: + logger.info(f"๐Ÿ“„ No resumes found for user {user_id}") + return [] + + # Get all resume data + resumes = [] + pipe = self.redis.pipeline() + for resume_id in resume_ids: + pipe.get(f"{KEY_PREFIXES['resumes']}{user_id}:{resume_id}") + values = await pipe.execute() + + for resume_id, value in zip(resume_ids, values): + if value: + resume_data = self._deserialize(value) + if resume_data: + resumes.append(resume_data) + else: + # Clean up orphaned resume ID + await self.redis.lrem(user_resumes_key, 0, resume_id)# type: ignore + logger.warning(f"Removed orphaned resume ID {resume_id} for user {user_id}") + + # Sort by created_at timestamp (most recent first) + resumes.sort(key=lambda x: x.get("created_at", ""), reverse=True) + + logger.info(f"๐Ÿ“„ Retrieved {len(resumes)} resumes for user {user_id}") + return resumes + except Exception as e: + logger.error(f"โŒ Error retrieving resumes for user {user_id}: {e}") + return [] + + async def delete_resume(self, user_id: str, resume_id: str) -> bool: + """Delete a specific resume for a user""" + try: + # Delete the resume data + key = f"{KEY_PREFIXES['resumes']}{user_id}:{resume_id}" + result = await self.redis.delete(key) + + # Remove from user's resume list + user_resumes_key = f"{KEY_PREFIXES['user_resumes']}{user_id}" + await self.redis.lrem(user_resumes_key, 0, resume_id)# type: ignore + + if result > 0: + logger.info(f"๐Ÿ—‘๏ธ Deleted resume {resume_id} for user {user_id}") + return True + else: + logger.warning(f"โš ๏ธ Resume {resume_id} not found for user {user_id}") + return False + except Exception as e: + logger.error(f"โŒ Error deleting resume {resume_id} for user {user_id}: {e}") + return False + + async def delete_all_resumes_for_user(self, user_id: str) -> int: + """Delete all resumes for a specific user and return count of deleted resumes""" + try: + # Get all resume IDs for this user + user_resumes_key = f"{KEY_PREFIXES['user_resumes']}{user_id}" + resume_ids = await self.redis.lrange(user_resumes_key, 0, -1)# type: ignore + + if not resume_ids: + logger.info(f"๐Ÿ“„ No resumes found for user {user_id}") + return 0 + + deleted_count = 0 + + # Use pipeline for efficient batch operations + pipe = self.redis.pipeline() + + # Delete each resume + for resume_id in resume_ids: + pipe.delete(f"{KEY_PREFIXES['resumes']}{user_id}:{resume_id}") + deleted_count += 1 + + # Delete the user's resume list + pipe.delete(user_resumes_key) + + # Execute all operations + await pipe.execute() + + logger.info(f"๐Ÿ—‘๏ธ Successfully deleted {deleted_count} resumes for user {user_id}") + return deleted_count + + except Exception as e: + logger.error(f"โŒ Error deleting all resumes for user {user_id}: {e}") + raise + + async def get_all_resumes(self) -> Dict[str, List[Dict]]: + """Get all resumes grouped by user (admin function)""" + try: + pattern = f"{KEY_PREFIXES['resumes']}*" + keys = await self.redis.keys(pattern) + + if not keys: + return {} + + # Group by user_id + user_resumes = {} + pipe = self.redis.pipeline() + for key in keys: + pipe.get(key) + values = await pipe.execute() + + for key, value in zip(keys, values): + if value: + # Extract user_id from key format: resume:{user_id}:{resume_id} + key_parts = key.replace(KEY_PREFIXES['resumes'], '').split(':', 1) + if len(key_parts) >= 1: + user_id = key_parts[0] + resume_data = self._deserialize(value) + if resume_data: + if user_id not in user_resumes: + user_resumes[user_id] = [] + user_resumes[user_id].append(resume_data) + + # Sort each user's resumes by created_at + for user_id in user_resumes: + user_resumes[user_id].sort(key=lambda x: x.get("created_at", ""), reverse=True) + + return user_resumes + except Exception as e: + logger.error(f"โŒ Error retrieving all resumes: {e}") + return {} + + async def search_resumes_for_user(self, user_id: str, query: str) -> List[Dict]: + """Search resumes for a user by content, job title, or candidate name""" + try: + all_resumes = await self.get_all_resumes_for_user(user_id) + query_lower = query.lower() + + matching_resumes = [] + for resume in all_resumes: + # Search in resume content, job_id, candidate_id, etc. + searchable_text = " ".join([ + resume.get("resume", ""), + resume.get("job_id", ""), + resume.get("candidate_id", ""), + str(resume.get("created_at", "")) + ]).lower() + + if query_lower in searchable_text: + matching_resumes.append(resume) + + logger.info(f"๐Ÿ“„ Found {len(matching_resumes)} matching resumes for user {user_id}") + return matching_resumes + except Exception as e: + logger.error(f"โŒ Error searching resumes for user {user_id}: {e}") + return [] + + async def get_resumes_by_candidate(self, user_id: str, candidate_id: str) -> List[Dict]: + """Get all resumes for a specific candidate created by a user""" + try: + all_resumes = await self.get_all_resumes_for_user(user_id) + candidate_resumes = [ + resume for resume in all_resumes + if resume.get("candidate_id") == candidate_id + ] + + logger.info(f"๐Ÿ“„ Found {len(candidate_resumes)} resumes for candidate {candidate_id} by user {user_id}") + return candidate_resumes + except Exception as e: + logger.error(f"โŒ Error retrieving resumes for candidate {candidate_id} by user {user_id}: {e}") + return [] + + async def get_resumes_by_job(self, user_id: str, job_id: str) -> List[Dict]: + """Get all resumes for a specific job created by a user""" + try: + all_resumes = await self.get_all_resumes_for_user(user_id) + job_resumes = [ + resume for resume in all_resumes + if resume.get("job_id") == job_id + ] + + logger.info(f"๐Ÿ“„ Found {len(job_resumes)} resumes for job {job_id} by user {user_id}") + return job_resumes + except Exception as e: + logger.error(f"โŒ Error retrieving resumes for job {job_id} by user {user_id}: {e}") + return [] + + async def get_resume_statistics(self, user_id: str) -> Dict[str, Any]: + """Get resume statistics for a user""" + try: + all_resumes = await self.get_all_resumes_for_user(user_id) + + stats = { + "total_resumes": len(all_resumes), + "resumes_by_candidate": {}, + "resumes_by_job": {}, + "creation_timeline": {}, + "recent_resumes": [] + } + + for resume in all_resumes: + # Count by candidate + candidate_id = resume.get("candidate_id", "unknown") + stats["resumes_by_candidate"][candidate_id] = stats["resumes_by_candidate"].get(candidate_id, 0) + 1 + + # Count by job + job_id = resume.get("job_id", "unknown") + stats["resumes_by_job"][job_id] = stats["resumes_by_job"].get(job_id, 0) + 1 + + # Timeline by date + created_at = resume.get("created_at") + if created_at: + try: + date_key = created_at[:10] # Extract date part + stats["creation_timeline"][date_key] = stats["creation_timeline"].get(date_key, 0) + 1 + except (IndexError, TypeError): + pass + + # Get recent resumes (last 5) + stats["recent_resumes"] = all_resumes[:5] + + return stats + except Exception as e: + logger.error(f"โŒ Error getting resume statistics for user {user_id}: {e}") + return {"total_resumes": 0, "resumes_by_candidate": {}, "resumes_by_job": {}, "creation_timeline": {}, "recent_resumes": []} + + async def update_resume(self, user_id: str, resume_id: str, updates: Dict) -> Optional[Dict]: + """Update specific fields of a resume""" + try: + resume_data = await self.get_resume(user_id, resume_id) + if resume_data: + resume_data.update(updates) + resume_data["updated_at"] = datetime.now(UTC).isoformat() + + key = f"{KEY_PREFIXES['resumes']}{user_id}:{resume_id}" + await self.redis.set(key, self._serialize(resume_data)) + + logger.info(f"๐Ÿ“„ Updated resume {resume_id} for user {user_id}") + return resume_data + return None + except Exception as e: + logger.error(f"โŒ Error updating resume {resume_id} for user {user_id}: {e}") + return None diff --git a/src/backend/database/mixins/skill.py b/src/backend/database/mixins/skill.py new file mode 100644 index 0000000..8850910 --- /dev/null +++ b/src/backend/database/mixins/skill.py @@ -0,0 +1,82 @@ +import json +import logging +from typing import Any, Dict, TYPE_CHECKING, Optional + +from models import SkillAssessment + +from .protocols import DatabaseProtocol +from ..constants import KEY_PREFIXES + +logger = logging.getLogger(__name__) + +class SkillMixin(DatabaseProtocol): + """Mixin for Skill-related database operations""" + + async def get_cached_skill_match(self, cache_key: str) -> Optional[SkillAssessment]: + """Get cached skill match assessment""" + try: + json_str = await self.redis.get(cache_key) + if json_str: + json_data = json.loads(json_str) + skill_assessment = SkillAssessment.model_validate(json_data) + return skill_assessment + return None + except Exception as e: + logger.error(f"โŒ Error getting cached skill match: {e}") + return None + + async def invalidate_candidate_skill_cache(self, candidate_id: str) -> int: + """Invalidate all cached skill matches for a specific candidate""" + try: + pattern = f"skill_match:{candidate_id}:*" + keys = await self.redis.keys(pattern) + if keys: + return await self.redis.delete(*keys) + return 0 + except Exception as e: + logger.error(f"Error invalidating candidate skill cache: {e}") + return 0 + + async def clear_all_skill_match_cache(self) -> int: + """Clear all skill match cache (useful after major system updates)""" + try: + pattern = "skill_match:*" + keys = await self.redis.keys(pattern) + if keys: + return await self.redis.delete(*keys) + return 0 + except Exception as e: + logger.error(f"Error clearing skill match cache: {e}") + return 0 + + async def invalidate_user_skill_cache(self, user_id: str) -> int: + """Invalidate all cached skill matches when a user's RAG data is updated""" + try: + # This assumes all candidates belonging to this user need cache invalidation + # You might need to adjust the pattern based on how you associate candidates with users + pattern = f"skill_match:{user_id}:*" + keys = await self.redis.keys(pattern) + + # Filter keys that belong to candidates owned by this user + # This would require additional logic to determine candidate ownership + # For now, you might want to clear all cache when any user's RAG data updates + # or implement a more sophisticated mapping + + if keys: + return await self.redis.delete(*keys) + return 0 + except Exception as e: + logger.error(f"Error invalidating user skill cache for user {user_id}: {e}") + return 0 + + async def cache_skill_match(self, cache_key: str, assessment: SkillAssessment) -> None: + """Cache skill match assessment""" + try: + # Cache for 1 hour by default + await self.redis.set( + cache_key, + json.dumps(assessment.model_dump(mode='json', by_alias=True), default=str) # Serialize with datetime handling + ) + logger.info(f"๐Ÿ’พ Skill match cached: {cache_key}") + except Exception as e: + logger.error(f"โŒ Error caching skill match: {e}") diff --git a/src/backend/database/mixins/user.py b/src/backend/database/mixins/user.py new file mode 100644 index 0000000..0e4c1fd --- /dev/null +++ b/src/backend/database/mixins/user.py @@ -0,0 +1,867 @@ +from datetime import UTC, datetime, timedelta, timezone +import logging +import json +from typing import List, Optional, Any, Dict, TYPE_CHECKING, Self + + +from .protocols import DatabaseProtocol + +from ..constants import KEY_PREFIXES + +logger = logging.getLogger(__name__) + +class UserMixin(DatabaseProtocol): + """Mixin for user and candidate operations""" + + async def set_guest(self, guest_id: str, guest_data: Dict[str, Any]) -> None: + """Store guest data with enhanced persistence""" + try: + # Ensure last_activity is always set + guest_data["last_activity"] = datetime.now(UTC).isoformat() + + # Store in Redis with both hash and individual key for redundancy + await self.redis.hset("guests", guest_id, json.dumps(guest_data))# type: ignore + + # Also store with a longer TTL as backup + await self.redis.setex( + f"guest_backup:{guest_id}", + 86400 * 7, # 7 days TTL + json.dumps(guest_data) + ) + + logger.info(f"๐Ÿ’พ Guest stored with backup: {guest_id}") + except Exception as e: + logger.error(f"โŒ Error storing guest {guest_id}: {e}") + raise + + async def get_guest(self, guest_id: str) -> Optional[Dict[str, Any]]: + """Get guest data with fallback to backup""" + try: + # Try primary storage first + data = await self.redis.hget("guests", guest_id)# type: ignore + if data: + guest_data = json.loads(data) + # Update last activity when accessed + guest_data["last_activity"] = datetime.now(UTC).isoformat() + await self.set_guest(guest_id, guest_data) + logger.info(f"๐Ÿ” Guest found in primary storage: {guest_id}") + return guest_data + + # Fallback to backup storage + backup_data = await self.redis.get(f"guest_backup:{guest_id}") + if backup_data: + guest_data = json.loads(backup_data) + guest_data["last_activity"] = datetime.now(UTC).isoformat() + + # Restore to primary storage + await self.set_guest(guest_id, guest_data) + logger.info(f"๐Ÿ”„ Guest restored from backup: {guest_id}") + return guest_data + + logger.warning(f"โš ๏ธ Guest not found: {guest_id}") + return None + except Exception as e: + logger.error(f"โŒ Error getting guest {guest_id}: {e}") + return None + + async def get_guest_by_session_id(self, session_id: str) -> Optional[Dict[str, Any]]: + """Get guest data by session ID""" + try: + all_guests = await self.get_all_guests() + for guest_data in all_guests.values(): + if guest_data.get("session_id") == session_id: + return guest_data + return None + except Exception as e: + logger.error(f"โŒ Error getting guest by session ID {session_id}: {e}") + return None + + async def get_all_guests(self) -> Dict[str, Dict[str, Any]]: + """Get all guests""" + try: + data = await self.redis.hgetall("guests")# type: ignore + return { + guest_id: json.loads(guest_json) + for guest_id, guest_json in data.items() + } + except Exception as e: + logger.error(f"โŒ Error getting all guests: {e}") + return {} + + async def delete_guest(self, guest_id: str) -> bool: + """Delete a guest""" + try: + result = await self.redis.hdel("guests", guest_id)# type: ignore + if result: + logger.info(f"๐Ÿ—‘๏ธ Guest deleted: {guest_id}") + return True + return False + except Exception as e: + logger.error(f"โŒ Error deleting guest {guest_id}: {e}") + return False + + async def cleanup_inactive_guests(self, inactive_hours: int = 24) -> int: + """Clean up inactive guest sessions with safety checks""" + try: + all_guests = await self.get_all_guests() + current_time = datetime.now(UTC) + cutoff_time = current_time - timedelta(hours=inactive_hours) + + deleted_count = 0 + preserved_count = 0 + + for guest_id, guest_data in all_guests.items(): + try: + last_activity_str = guest_data.get("last_activity") + created_at_str = guest_data.get("created_at") + + # Skip cleanup if guest is very new (less than 1 hour old) + if created_at_str: + created_at = datetime.fromisoformat(created_at_str.replace('Z', '+00:00')) + if current_time - created_at < timedelta(hours=1): + preserved_count += 1 + logger.info(f"๐Ÿ›ก๏ธ Preserving new guest: {guest_id}") + continue + + # Check last activity + should_delete = False + if last_activity_str: + try: + last_activity = datetime.fromisoformat(last_activity_str.replace('Z', '+00:00')) + if last_activity < cutoff_time: + should_delete = True + except ValueError: + # Invalid date format, but don't delete if guest is new + if not created_at_str: + should_delete = True + else: + # No last activity, but don't delete if guest is new + if not created_at_str: + should_delete = True + + if should_delete: + await self.delete_guest(guest_id) + deleted_count += 1 + else: + preserved_count += 1 + + except Exception as e: + logger.error(f"โŒ Error processing guest {guest_id} for cleanup: {e}") + preserved_count += 1 # Preserve on error + + if deleted_count > 0: + logger.info(f"๐Ÿงน Guest cleanup: removed {deleted_count}, preserved {preserved_count}") + + return deleted_count + except Exception as e: + logger.error(f"โŒ Error in guest cleanup: {e}") + return 0 + + async def get_guest_statistics(self) -> Dict[str, Any]: + """Get guest usage statistics""" + try: + all_guests = await self.get_all_guests() + current_time = datetime.now(UTC) + + stats = { + "total_guests": len(all_guests), + "active_last_hour": 0, + "active_last_day": 0, + "converted_guests": 0, + "by_ip": {}, + "creation_timeline": {} + } + + hour_ago = current_time - timedelta(hours=1) + day_ago = current_time - timedelta(days=1) + + for guest_data in all_guests.values(): + # Check activity + last_activity_str = guest_data.get("last_activity") + if last_activity_str: + try: + last_activity = datetime.fromisoformat(last_activity_str.replace('Z', '+00:00')) + if last_activity > hour_ago: + stats["active_last_hour"] += 1 + if last_activity > day_ago: + stats["active_last_day"] += 1 + except ValueError: + pass + + # Check conversions + if guest_data.get("converted_to_user_id"): + stats["converted_guests"] += 1 + + # IP tracking + ip = guest_data.get("ip_address", "unknown") + stats["by_ip"][ip] = stats["by_ip"].get(ip, 0) + 1 + + # Creation timeline + created_at_str = guest_data.get("created_at") + if created_at_str: + try: + created_at = datetime.fromisoformat(created_at_str.replace('Z', '+00:00')) + date_key = created_at.strftime('%Y-%m-%d') + stats["creation_timeline"][date_key] = stats["creation_timeline"].get(date_key, 0) + 1 + except ValueError: + pass + + return stats + except Exception as e: + logger.error(f"โŒ Error getting guest statistics: {e}") + return {} + + async def set_user_by_id(self, user_id: str, user_data: Dict[str, Any]) -> bool: + """Store user data with ID as key for direct lookup""" + try: + key = f"user_by_id:{user_id}" + await self.redis.set(key, json.dumps(user_data, default=str)) + logger.info(f"๐Ÿ‘ค Stored user data by ID for {user_id}") + return True + except Exception as e: + logger.error(f"โŒ Error storing user by ID {user_id}: {e}") + return False + + async def get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]: + """Get user lookup data by user ID""" + try: + data = await self.redis.hget("user_lookup_by_id", user_id)# type: ignore + if data: + return json.loads(data) + return None + except Exception as e: + logger.error(f"โŒ Error getting user by ID {user_id}: {e}") + return None + + async def user_exists_by_email(self, email: str) -> bool: + """Check if a user exists with the given email""" + try: + key = f"users:{email.lower()}" + exists = await self.redis.exists(key) + return exists > 0 + except Exception as e: + logger.error(f"โŒ Error checking user existence by email {email}: {e}") + return False + + async def user_exists_by_username(self, username: str) -> bool: + """Check if a user exists with the given username""" + try: + key = f"users:{username.lower()}" + exists = await self.redis.exists(key) + return exists > 0 + except Exception as e: + logger.error(f"โŒ Error checking user existence by username {username}: {e}") + return False + + async def get_all_users(self) -> Dict[str, Any]: + """Get all users""" + pattern = f"{KEY_PREFIXES['users']}*" + keys = await self.redis.keys(pattern) + + if not keys: + return {} + + pipe = self.redis.pipeline() + for key in keys: + pipe.get(key) + values = await pipe.execute() + + result = {} + for key, value in zip(keys, values): + email = key.replace(KEY_PREFIXES['users'], '') + logger.info(f"๐Ÿ” Found user key: {key}, type: {type(value)}") + if type(value) == str: + result[email] = value + else: + result[email] = self._deserialize(value) + + return result + + async def delete_user(self, email: str): + """Delete user""" + key = f"{KEY_PREFIXES['users']}{email}" + await self.redis.delete(key) + + async def get_employer(self, employer_id: str) -> Optional[Dict]: + """Get employer by ID""" + key = f"{KEY_PREFIXES['employers']}{employer_id}" + data = await self.redis.get(key) + return self._deserialize(data) if data else None + + async def set_employer(self, employer_id: str, employer_data: Dict): + """Set employer data""" + key = f"{KEY_PREFIXES['employers']}{employer_id}" + await self.redis.set(key, self._serialize(employer_data)) + + async def get_all_employers(self) -> Dict[str, Any]: + """Get all employers""" + pattern = f"{KEY_PREFIXES['employers']}*" + keys = await self.redis.keys(pattern) + + if not keys: + return {} + + pipe = self.redis.pipeline() + for key in keys: + pipe.get(key) + values = await pipe.execute() + + result = {} + for key, value in zip(keys, values): + employer_id = key.replace(KEY_PREFIXES['employers'], '') + result[employer_id] = self._deserialize(value) + + return result + + async def delete_employer(self, employer_id: str): + """Delete employer""" + key = f"{KEY_PREFIXES['employers']}{employer_id}" + await self.redis.delete(key) + + + async def get_user(self, login: str) -> Optional[Dict[str, Any]]: + """Get user by email or username""" + try: + login = login.strip().lower() + key = f"users:{login}" + + data = await self.redis.get(key) + if data: + user_data = json.loads(data) + logger.info(f"๐Ÿ‘ค Retrieved user data for {login}") + return user_data + return None + except Exception as e: + logger.error(f"โŒ Error retrieving user {login}: {e}") + return None + + async def set_user(self, login: str, user_data: Dict[str, Any]) -> bool: + """Store user data by email or username""" + try: + login = login.strip().lower() + key = f"users:{login}" + + await self.redis.set(key, json.dumps(user_data, default=str)) + logger.info(f"๐Ÿ‘ค Stored user data for {login}") + return True + except Exception as e: + logger.error(f"โŒ Error storing user {login}: {e}") + return False + + # Candidates operations + async def get_candidate(self, candidate_id: str) -> Optional[Dict]: + """Get candidate by ID""" + key = f"{KEY_PREFIXES['candidates']}{candidate_id}" + data = await self.redis.get(key) + return self._deserialize(data) if data else None + + async def set_candidate(self, candidate_id: str, candidate_data: Dict): + """Set candidate data""" + key = f"{KEY_PREFIXES['candidates']}{candidate_id}" + await self.redis.set(key, self._serialize(candidate_data)) + + async def get_all_candidates(self) -> Dict[str, Any]: + """Get all candidates""" + pattern = f"{KEY_PREFIXES['candidates']}*" + keys = await self.redis.keys(pattern) + + if not keys: + return {} + + # Use pipeline for efficiency + pipe = self.redis.pipeline() + for key in keys: + pipe.get(key) + values = await pipe.execute() + + result = {} + for key, value in zip(keys, values): + candidate_id = key.replace(KEY_PREFIXES['candidates'], '') + result[candidate_id] = self._deserialize(value) + + return result + + async def delete_candidate(self: Self, candidate_id: str) -> Dict[str, int]: + """ + Delete candidate and all related records in a cascading manner + Returns a dictionary with counts of deleted items for each category + """ + try: + deletion_stats = { + "documents": 0, + "chat_sessions": 0, + "chat_messages": 0, + "job_applications": 0, + "user_records": 0, + "auth_records": 0, + "security_logs": 0, + "ai_parameters": 0, + "candidate_record": 0, + "resumes": 0 + } + + logger.info(f"๐Ÿ—‘๏ธ Starting cascading delete for candidate {candidate_id}") + + # 1. Get candidate data first to retrieve associated information + candidate_data = await self.get_candidate(candidate_id) + if not candidate_data: + logger.warning(f"โš ๏ธ Candidate {candidate_id} not found") + return deletion_stats + + candidate_email = candidate_data.get("email", "").lower() + candidate_username = candidate_data.get("username", "").lower() + + # 2. Delete all candidate documents and their metadata + try: + documents_deleted = await self.delete_all_candidate_documents(candidate_id) + deletion_stats["documents"] = documents_deleted + logger.info(f"๐Ÿ—‘๏ธ Deleted {documents_deleted} documents for candidate {candidate_id}") + except Exception as e: + logger.error(f"โŒ Error deleting candidate documents: {e}") + + # 3. Delete all chat sessions related to this candidate + try: + candidate_sessions = await self.get_chat_sessions_by_candidate(candidate_id) + messages_deleted = 0 + + for session in candidate_sessions: + session_id = session.get("id") + if session_id: + # Count messages before deletion + message_count = await self.get_chat_message_count(session_id) + messages_deleted += message_count + + # Delete chat session and its messages + await self.delete_chat_session_completely(session_id) + + deletion_stats["chat_sessions"] = len(candidate_sessions) + deletion_stats["chat_messages"] = messages_deleted + logger.info(f"๐Ÿ—‘๏ธ Deleted {len(candidate_sessions)} chat sessions and {messages_deleted} messages for candidate {candidate_id}") + except Exception as e: + logger.error(f"โŒ Error deleting chat sessions: {e}") + + # 4. Delete job applications from this candidate + try: + all_applications = await self.get_all_job_applications() + candidate_applications = [] + + for app_id, app_data in all_applications.items(): + if app_data.get("candidateId") == candidate_id: + candidate_applications.append(app_id) + + # Delete each application + for app_id in candidate_applications: + await self.delete_job_application(app_id) + + deletion_stats["job_applications"] = len(candidate_applications) + logger.info(f"๐Ÿ—‘๏ธ Deleted {len(candidate_applications)} job applications for candidate {candidate_id}") + except Exception as e: + logger.error(f"โŒ Error deleting job applications: {e}") + + # 5. Delete user records (by email and username if they exist) + try: + user_records_deleted = 0 + + # Delete by email + if candidate_email and await self.user_exists_by_email(candidate_email): + await self.delete_user(candidate_email) + user_records_deleted += 1 + logger.info(f"๐Ÿ—‘๏ธ Deleted user record by email: {candidate_email}") + + # Delete by username (if different from email) + if (candidate_username and + candidate_username != candidate_email and + await self.user_exists_by_username(candidate_username)): + await self.delete_user(candidate_username) + user_records_deleted += 1 + logger.info(f"๐Ÿ—‘๏ธ Deleted user record by username: {candidate_username}") + + # Delete user by ID if exists + user_by_id = await self.get_user_by_id(candidate_id) + if user_by_id: + key = f"user_by_id:{candidate_id}" + await self.redis.delete(key) + user_records_deleted += 1 + logger.info(f"๐Ÿ—‘๏ธ Deleted user record by ID: {candidate_id}") + + deletion_stats["user_records"] = user_records_deleted + logger.info(f"๐Ÿ—‘๏ธ Deleted {user_records_deleted} user records for candidate {candidate_id}") + except Exception as e: + logger.error(f"โŒ Error deleting user records: {e}") + + # 6. Delete authentication records + try: + auth_deleted = await self.delete_authentication(candidate_id) + if auth_deleted: + deletion_stats["auth_records"] = 1 + logger.info(f"๐Ÿ—‘๏ธ Deleted authentication record for candidate {candidate_id}") + except Exception as e: + logger.error(f"โŒ Error deleting authentication records: {e}") + + # 7. Revoke all refresh tokens for this user + try: + await self.revoke_all_user_tokens(candidate_id) + logger.info(f"๐Ÿ—‘๏ธ Revoked all refresh tokens for candidate {candidate_id}") + except Exception as e: + logger.error(f"โŒ Error revoking refresh tokens: {e}") + + # 8. Delete security logs for this user + try: + security_logs_deleted = 0 + # Security logs are stored by date, so we need to scan for them + pattern = f"security_log:{candidate_id}:*" + cursor = 0 + + while True: + cursor, keys = await self.redis.scan(cursor, match=pattern, count=100) + + if keys: + await self.redis.delete(*keys) + security_logs_deleted += len(keys) + + if cursor == 0: + break + + deletion_stats["security_logs"] = security_logs_deleted + if security_logs_deleted > 0: + logger.info(f"๐Ÿ—‘๏ธ Deleted {security_logs_deleted} security log entries for candidate {candidate_id}") + except Exception as e: + logger.error(f"โŒ Error deleting security logs: {e}") + + # 9. Delete AI parameters that might be specific to this candidate + try: + all_ai_params = await self.get_all_ai_parameters() + candidate_ai_params = [] + + for param_id, param_data in all_ai_params.items(): + if (param_data.get("candidateId") == candidate_id or + param_data.get("userId") == candidate_id): + candidate_ai_params.append(param_id) + + # Delete each AI parameter set + for param_id in candidate_ai_params: + await self.delete_ai_parameters(param_id) + + deletion_stats["ai_parameters"] = len(candidate_ai_params) + if len(candidate_ai_params) > 0: + logger.info(f"๐Ÿ—‘๏ธ Deleted {len(candidate_ai_params)} AI parameter sets for candidate {candidate_id}") + except Exception as e: + logger.error(f"โŒ Error deleting AI parameters: {e}") + + # 10. Delete email verification tokens if any exist + try: + if candidate_email: + # Clean up any pending verification tokens + pattern = "email_verification:*" + cursor = 0 + tokens_deleted = 0 + + while True: + cursor, keys = await self.redis.scan(cursor, match=pattern, count=100) + + for key in keys: + token_data = await self.redis.get(key) + if token_data: + verification_info = json.loads(token_data) + if verification_info.get("email", "").lower() == candidate_email: + await self.redis.delete(key) + tokens_deleted += 1 + + if cursor == 0: + break + + if tokens_deleted > 0: + logger.info(f"๐Ÿ—‘๏ธ Deleted {tokens_deleted} email verification tokens for candidate {candidate_id}") + except Exception as e: + logger.error(f"โŒ Error deleting email verification tokens: {e}") + + # 11. Delete password reset tokens if any exist + try: + if candidate_email: + pattern = "password_reset:*" + cursor = 0 + tokens_deleted = 0 + + while True: + cursor, keys = await self.redis.scan(cursor, match=pattern, count=100) + + for key in keys: + token_data = await self.redis.get(key) + if token_data: + reset_info = json.loads(token_data) + if reset_info.get("email", "").lower() == candidate_email: + await self.redis.delete(key) + tokens_deleted += 1 + + if cursor == 0: + break + + if tokens_deleted > 0: + logger.info(f"๐Ÿ—‘๏ธ Deleted {tokens_deleted} password reset tokens for candidate {candidate_id}") + except Exception as e: + logger.error(f"โŒ Error deleting password reset tokens: {e}") + + # 12. Delete MFA codes if any exist + try: + if candidate_email: + pattern = f"mfa_code:{candidate_email}:*" + cursor = 0 + mfa_codes_deleted = 0 + + while True: + cursor, keys = await self.redis.scan(cursor, match=pattern, count=100) + + if keys: + await self.redis.delete(*keys) + mfa_codes_deleted += len(keys) + + if cursor == 0: + break + + if mfa_codes_deleted > 0: + logger.info(f"๐Ÿ—‘๏ธ Deleted {mfa_codes_deleted} MFA codes for candidate {candidate_id}") + except Exception as e: + logger.error(f"โŒ Error deleting MFA codes: {e}") + + # 13. Finally, delete the candidate record itself + try: + key = f"{KEY_PREFIXES['candidates']}{candidate_id}" + result = await self.redis.delete(key) + deletion_stats["candidate_record"] = result + logger.info(f"๐Ÿ—‘๏ธ Deleted candidate record for {candidate_id}") + except Exception as e: + logger.error(f"โŒ Error deleting candidate record: {e}") + + # 14. Delete resumes associated with this candidate across all users + try: + all_resumes = await self.get_all_resumes() + candidate_resumes_deleted = 0 + + for user_id, user_resumes in all_resumes.items(): + resumes_to_delete = [] + for resume in user_resumes: + if resume.get("candidate_id") == candidate_id: + resumes_to_delete.append(resume.get("resume_id")) + + # Delete each resume for this candidate + for resume_id in resumes_to_delete: + if resume_id: + await self.delete_resume(user_id, resume_id) + candidate_resumes_deleted += 1 + + deletion_stats["resumes"] = candidate_resumes_deleted + if candidate_resumes_deleted > 0: + logger.info(f"๐Ÿ—‘๏ธ Deleted {candidate_resumes_deleted} resumes for candidate {candidate_id}") + except Exception as e: + logger.error(f"โŒ Error deleting resumes for candidate {candidate_id}: {e}") + + # 15. Log the deletion as a security event (if we have admin/system user context) + try: + total_items_deleted = sum(deletion_stats.values()) + logger.info(f"โœ… Completed cascading delete for candidate {candidate_id}. " + f"Total items deleted: {total_items_deleted}") + logger.info(f"๐Ÿ“Š Deletion breakdown: {deletion_stats}") + except Exception as e: + logger.error(f"โŒ Error logging deletion summary: {e}") + + return deletion_stats + + except Exception as e: + logger.error(f"โŒ Critical error during candidate deletion {candidate_id}: {e}") + raise + + async def delete_candidate_batch(self, candidate_ids: List[str]) -> Dict[str, Dict[str, int]]: + """ + Delete multiple candidates in batch with detailed reporting + Returns deletion stats for each candidate + """ + try: + batch_results = {} + total_stats = { + "documents": 0, + "chat_sessions": 0, + "chat_messages": 0, + "job_applications": 0, + "user_records": 0, + "auth_records": 0, + "security_logs": 0, + "ai_parameters": 0, + "candidate_record": 0, + "resumes": 0, + } + + logger.info(f"๐Ÿ—‘๏ธ Starting batch deletion for {len(candidate_ids)} candidates") + + for candidate_id in candidate_ids: + try: + deletion_stats = await self.delete_candidate(candidate_id) + batch_results[candidate_id] = deletion_stats + + # Add to totals + for key, value in deletion_stats.items(): + total_stats[key] += value + + except Exception as e: + logger.error(f"โŒ Failed to delete candidate {candidate_id}: {e}") + batch_results[candidate_id] = {"error": str(e)} + + logger.info(f"โœ… Completed batch deletion. Total items deleted: {sum(total_stats.values())}") + logger.info(f"๐Ÿ“Š Batch totals: {total_stats}") + + return { + "individual_results": batch_results, + "totals": total_stats, + "summary": { + "total_candidates_processed": len(candidate_ids), + "successful_deletions": len([r for r in batch_results.values() if "error" not in r]), + "failed_deletions": len([r for r in batch_results.values() if "error" in r]), + "total_items_deleted": sum(total_stats.values()) + } + } + + except Exception as e: + logger.error(f"โŒ Critical error during batch candidate deletion: {e}") + raise + + # User Operations + async def get_user_by_username(self, username: str) -> Optional[Dict]: + """Get user by username specifically""" + username_key = f"{KEY_PREFIXES['users']}{username.lower()}" + data = await self.redis.get(username_key) + return self._deserialize(data) if data else None + + async def find_candidate_by_username(self, username: str) -> Optional[Dict]: + """Find candidate by username""" + all_candidates = await self.get_all_candidates() + username_lower = username.lower() + + for candidate_data in all_candidates.values(): + if candidate_data.get("username", "").lower() == username_lower: + return candidate_data + + return None + + # Batch Operations + async def get_multiple_candidates_by_usernames(self, usernames: List[str]) -> Dict[str, Dict]: + """Get multiple candidates by their usernames efficiently""" + all_candidates = await self.get_all_candidates() + username_set = {username.lower() for username in usernames} + + result = {} + for candidate_data in all_candidates.values(): + candidate_username = candidate_data.get("username", "").lower() + if candidate_username in username_set: + result[candidate_username] = candidate_data + + return result + + async def get_candidate_chat_summary(self, candidate_id: str) -> Dict[str, Any]: + """Get a summary of chat activity for a specific candidate""" + sessions = await self.get_chat_sessions_by_candidate(candidate_id) + + if not sessions: + return { + "candidate_id": candidate_id, + "total_sessions": 0, + "total_messages": 0, + "first_chat": None, + "last_chat": None + } + + total_messages = 0 + for session in sessions: + session_id = session.get("id") + if session_id: + message_count = await self.get_chat_message_count(session_id) + total_messages += message_count + + # Sort sessions by creation date + sessions_by_date = sorted(sessions, key=lambda x: x.get("createdAt", "")) + + return { + "candidate_id": candidate_id, + "total_sessions": len(sessions), + "total_messages": total_messages, + "first_chat": sessions_by_date[0].get("createdAt") if sessions_by_date else None, + "last_chat": sessions_by_date[-1].get("lastActivity") if sessions_by_date else None, + "recent_sessions": sessions[:5] # Last 5 sessions + } + + async def get_viewer(self, viewer_id: str) -> Optional[Dict]: + """Get viewer by ID""" + key = f"{KEY_PREFIXES['viewers']}{viewer_id}" + data = await self.redis.get(key) + return self._deserialize(data) if data else None + + async def set_viewer(self, viewer_id: str, viewer_data: Dict): + """Set viewer data""" + key = f"{KEY_PREFIXES['viewers']}{viewer_id}" + await self.redis.set(key, self._serialize(viewer_data)) + + async def get_all_viewers(self) -> Dict[str, Any]: + """Get all viewers""" + pattern = f"{KEY_PREFIXES['viewers']}*" + keys = await self.redis.keys(pattern) + + if not keys: + return {} + + # Use pipeline for efficiency + pipe = self.redis.pipeline() + for key in keys: + pipe.get(key) + values = await pipe.execute() + + result = {} + for key, value in zip(keys, values): + viewer_id = key.replace(KEY_PREFIXES['viewers'], '') + result[viewer_id] = self._deserialize(value) + + return result + + async def delete_viewer(self, viewer_id: str): + """Delete viewer""" + key = f"{KEY_PREFIXES['viewers']}{viewer_id}" + await self.redis.delete(key) + + async def get_user_rag_update_time(self, user_id: str) -> Optional[datetime]: + """Get the last time user's RAG data was updated (returns timezone-aware UTC)""" + try: + rag_update_key = f"user:{user_id}:rag_last_update" + timestamp_str = await self.redis.get(rag_update_key) + if timestamp_str: + dt = datetime.fromisoformat(timestamp_str) + # Ensure the datetime is timezone-aware (assume UTC if naive) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + else: + # Convert to UTC if it's in a different timezone + dt = dt.astimezone(timezone.utc) + return dt + logger.warning(f"โš ๏ธ No RAG update time found for user {user_id}") + return None + except Exception as e: + logger.error(f"โŒ Error getting user RAG update time: {e}") + return None + + async def update_user_rag_timestamp(self, user_id: str) -> bool: + """Set the user's RAG data update time (stores as UTC ISO format)""" + try: + update_time = datetime.now(timezone.utc) + + # Ensure we're storing UTC timezone-aware format + if update_time.tzinfo is None: + update_time = update_time.replace(tzinfo=timezone.utc) + else: + update_time = update_time.astimezone(timezone.utc) + + rag_update_key = f"user:{user_id}:rag_last_update" + # Store as ISO format with timezone info + timestamp_str = update_time.isoformat() # This includes timezone + await self.redis.set(rag_update_key, timestamp_str) + logger.info(f"โœ… User RAG update time set for user {user_id}: {timestamp_str}") + return True + except Exception as e: + logger.error(f"โŒ Error setting user RAG update time: {e}") + return False + diff --git a/src/backend/device_manager.py b/src/backend/device_manager.py index 0dd4ea7..df17624 100644 --- a/src/backend/device_manager.py +++ b/src/backend/device_manager.py @@ -1,5 +1,5 @@ from fastapi import FastAPI, HTTPException, Depends, Query, Path, Body, status, APIRouter, Request, BackgroundTasks -from database import RedisDatabase +from database.manager import RedisDatabase import hashlib from logger import logger from datetime import datetime, timezone diff --git a/src/backend/email_service.py b/src/backend/email_service.py index 76450eb..29b82c1 100644 --- a/src/backend/email_service.py +++ b/src/backend/email_service.py @@ -8,7 +8,7 @@ import asyncio from email_templates import EMAIL_TEMPLATES from datetime import datetime, timezone, timedelta import json -from database import RedisDatabase +from database.manager import RedisDatabase class EmailService: def __init__(self): diff --git a/src/backend/entities/entity_manager.py b/src/backend/entities/entity_manager.py index 6e5c6bc..e9195bb 100644 --- a/src/backend/entities/entity_manager.py +++ b/src/backend/entities/entity_manager.py @@ -9,7 +9,7 @@ from pydantic import BaseModel, Field # type: ignore from models import Candidate from agents.base import CandidateEntity -from database import RedisDatabase +from database.manager import RedisDatabase from prometheus_client import CollectorRegistry # type: ignore class EntityManager(BaseModel): diff --git a/src/backend/main.py b/src/backend/main.py index 3b4c4ee..b1b1deb 100644 --- a/src/backend/main.py +++ b/src/backend/main.py @@ -63,7 +63,7 @@ from auth_utils import ( import model_cast import defines from logger import logger -from database import RedisDatabase, redis_manager, DatabaseManager +from database.manager import RedisDatabase, redis_manager, DatabaseManager import entities from email_service import VerificationEmailRateLimiter, email_service from device_manager import DeviceManager diff --git a/src/backend/rag/rag.py b/src/backend/rag/rag.py index 3220d76..574a605 100644 --- a/src/backend/rag/rag.py +++ b/src/backend/rag/rag.py @@ -26,7 +26,7 @@ from .markdown_chunker import ( # When imported as a module, use relative imports import defines -from database import RedisDatabase +from database.manager import RedisDatabase from models import ChromaDBGetResponse __all__ = ["ChromaDBFileWatcher", "start_file_watcher"] @@ -64,10 +64,10 @@ class ChromaDBFileWatcher(FileSystemEventHandler): self.chunk_overlap = chunk_overlap self.loop = loop self._umap_collection: ChromaDBGetResponse | None = None - self._umap_embedding_2d: np.ndarray = [] - self._umap_embedding_3d: np.ndarray = [] - self._umap_model_2d: umap.UMAP = None - self._umap_model_3d: umap.UMAP = None + self._umap_embedding_2d: np.ndarray = np.array([]) + self._umap_embedding_3d: np.ndarray = np.array([]) + self._umap_model_2d: Optional[umap.UMAP] = None + self._umap_model_3d: Optional[umap.UMAP] = None self.md = MarkItDown(enable_plugins=False) # Set to True to enable plugins # self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2') @@ -324,7 +324,7 @@ class ChromaDBFileWatcher(FileSystemEventHandler): n_neighbors=30, min_dist=0.1, ) - self._umap_embedding_2d = self._umap_model_2d.fit_transform(vectors) + self._umap_embedding_2d = self._umap_model_2d.fit_transform(vectors) # type: ignore # logging.info( # f"2D UMAP model n_components: {self._umap_model_2d.n_components}" # ) # Should be 2 @@ -339,7 +339,7 @@ class ChromaDBFileWatcher(FileSystemEventHandler): n_neighbors=30, min_dist=0.01, ) - self._umap_embedding_3d = self._umap_model_3d.fit_transform(vectors) + self._umap_embedding_3d = self._umap_model_3d.fit_transform(vectors)# type: ignore # logging.info( # f"3D UMAP model n_components: {self._umap_model_3d.n_components}" # ) # Should be 3 @@ -353,7 +353,7 @@ class ChromaDBFileWatcher(FileSystemEventHandler): # Initialize ChromaDB client chroma_client = chromadb.PersistentClient( path=self.persist_directory, - settings=chromadb.Settings(anonymized_telemetry=False), + settings=chromadb.Settings(anonymized_telemetry=False), # type: ignore ) # Check if the collection exists @@ -477,9 +477,9 @@ class ChromaDBFileWatcher(FileSystemEventHandler): # Extract results ids = results["ids"][0] - documents = results["documents"][0] - distances = results["distances"][0] - metadatas = results["metadatas"][0] + documents = results["documents"][0] if results["documents"] else [] + distances = results["distances"][0] if results["distances"] else [] + metadatas = results["metadatas"][0] if results["metadatas"] else [] filtered_ids = [] filtered_documents = [] diff --git a/src/backend/routes/admin.py b/src/backend/routes/admin.py index 71387c6..24d3c72 100644 --- a/src/backend/routes/admin.py +++ b/src/backend/routes/admin.py @@ -17,7 +17,7 @@ from fastapi import ( from fastapi.responses import JSONResponse from utils.rate_limiter import RateLimiter, get_rate_limiter -from database import RedisDatabase +from database.manager import RedisDatabase from logger import logger from utils.dependencies import ( get_current_admin, get_current_user_or_guest, get_database, background_task_manager @@ -142,7 +142,7 @@ async def get_system_health( if database_manager: try: database = database_manager.get_database() - from database import redis_manager + from database.manager import redis_manager redis_health = await redis_manager.health_check() db_health = { "status": redis_health.get("status", "unknown"), diff --git a/src/backend/routes/auth.py b/src/backend/routes/auth.py index 9424c2e..4974843 100644 --- a/src/backend/routes/auth.py +++ b/src/backend/routes/auth.py @@ -16,7 +16,7 @@ from pydantic import BaseModel, EmailStr, ValidationError, field_validator from auth_utils import AuthenticationManager, SecurityConfig import backstory_traceback as backstory_traceback from utils.rate_limiter import RateLimiter -from database import RedisDatabase, redis_manager +from database.manager import RedisDatabase, redis_manager from device_manager import DeviceManager from email_service import VerificationEmailRateLimiter, email_service from logger import logger diff --git a/src/backend/routes/candidates.py b/src/backend/routes/candidates.py index be6351d..bd9aaca 100644 --- a/src/backend/routes/candidates.py +++ b/src/backend/routes/candidates.py @@ -22,7 +22,7 @@ from agents.generate_resume import GenerateResume import agents.base as agents from utils.rate_limiter import RateLimiter, rate_limited from utils.helpers import filter_and_paginate, get_document_type_from_filename, get_skill_cache_key, get_requirements_list -from database import RedisDatabase, redis_manager +from database.manager import RedisDatabase, redis_manager from device_manager import DeviceManager from email_service import VerificationEmailRateLimiter, email_service from logger import logger diff --git a/src/backend/routes/chat.py b/src/backend/routes/chat.py index 72dca48..9cf614d 100644 --- a/src/backend/routes/chat.py +++ b/src/backend/routes/chat.py @@ -18,7 +18,7 @@ from fastapi.responses import JSONResponse from pydantic import ValidationError from auth_utils import AuthenticationManager, SecurityConfig -from database import RedisDatabase, redis_manager +from database.manager import RedisDatabase, redis_manager from device_manager import DeviceManager from email_service import email_service from logger import logger diff --git a/src/backend/routes/debug.py b/src/backend/routes/debug.py index e389acd..4285ba7 100644 --- a/src/backend/routes/debug.py +++ b/src/backend/routes/debug.py @@ -11,7 +11,7 @@ from pydantic import BaseModel, EmailStr, ValidationError, field_validator from auth_utils import AuthenticationManager, SecurityConfig import backstory_traceback as backstory_traceback from utils.rate_limiter import RateLimiter -from database import RedisDatabase, redis_manager +from database.manager import RedisDatabase, redis_manager from device_manager import DeviceManager from email_service import VerificationEmailRateLimiter, email_service from logger import logger diff --git a/src/backend/routes/employers.py b/src/backend/routes/employers.py index 2c23a18..c2f7395 100644 --- a/src/backend/routes/employers.py +++ b/src/backend/routes/employers.py @@ -21,7 +21,7 @@ from pydantic import BaseModel, ValidationError from auth_utils import AuthenticationManager from utils.helpers import create_job_from_content, filter_and_paginate, get_document_type_from_filename, get_skill_cache_key, get_requirements_list -from database import RedisDatabase, redis_manager +from database.manager import RedisDatabase, redis_manager from logger import logger from models import ( MOCK_UUID, ApiActivityType, ApiMessageType, ApiStatusType, CandidateAI, ChatContextType, ChatMessage, ChatMessageError, ChatMessageRagSearch, ChatMessageResume, ChatMessageSkillAssessment, ChatMessageStatus, ChatMessageStreaming, ChatMessageUser, ChatSession, Document, DocumentContentResponse, DocumentListResponse, DocumentMessage, DocumentOptions, DocumentType, DocumentUpdateRequest, Job, JobRequirements, JobRequirementsMessage, LoginRequest, CreateCandidateRequest, CreateEmployerRequest, diff --git a/src/backend/routes/jobs.py b/src/backend/routes/jobs.py index 6845aaf..9d6187e 100644 --- a/src/backend/routes/jobs.py +++ b/src/backend/routes/jobs.py @@ -23,7 +23,7 @@ import defines from agents.generate_resume import GenerateResume from agents.base import CandidateEntity from utils.helpers import create_job_from_content, filter_and_paginate, get_document_type_from_filename, get_skill_cache_key, get_requirements_list -from database import RedisDatabase, redis_manager +from database.manager import RedisDatabase, redis_manager from logger import logger from models import ( MOCK_UUID, ApiActivityType, ApiMessageType, ApiStatusType, CandidateAI, ChatContextType, ChatMessage, ChatMessageError, ChatMessageRagSearch, ChatMessageResume, ChatMessageSkillAssessment, ChatMessageStatus, ChatMessageStreaming, ChatMessageUser, ChatSession, Document, DocumentContentResponse, DocumentListResponse, DocumentMessage, DocumentOptions, DocumentType, DocumentUpdateRequest, Job, JobRequirements, JobRequirementsMessage, LoginRequest, CreateCandidateRequest, CreateEmployerRequest, diff --git a/src/backend/routes/resumes.py b/src/backend/routes/resumes.py index 217d46f..0ce1403 100644 --- a/src/backend/routes/resumes.py +++ b/src/backend/routes/resumes.py @@ -18,7 +18,7 @@ from pydantic import BaseModel, ValidationError import backstory_traceback as backstory_traceback from utils.helpers import filter_and_paginate, get_document_type_from_filename, get_skill_cache_key, get_requirements_list -from database import RedisDatabase, redis_manager +from database.manager import RedisDatabase, redis_manager from logger import logger from models import ( MOCK_UUID, ApiActivityType, ApiMessageType, ApiStatusType, CandidateAI, ChatContextType, ChatMessageError, ChatMessageRagSearch, ChatMessageResume, ChatMessageSkillAssessment, ChatMessageStatus, ChatMessageStreaming, ChatMessageUser, ChatSession, Document, DocumentContentResponse, DocumentListResponse, DocumentMessage, DocumentOptions, DocumentType, DocumentUpdateRequest, Job, JobRequirements, LoginRequest, CreateCandidateRequest, CreateEmployerRequest, diff --git a/src/backend/routes/system.py b/src/backend/routes/system.py index ff13f01..5b2f473 100644 --- a/src/backend/routes/system.py +++ b/src/backend/routes/system.py @@ -11,7 +11,7 @@ from pydantic import BaseModel, EmailStr, ValidationError, field_validator from auth_utils import AuthenticationManager, SecurityConfig import backstory_traceback as backstory_traceback from utils.rate_limiter import RateLimiter -from database import RedisDatabase, redis_manager, Redis +from database.manager import RedisDatabase, redis_manager, Redis from device_manager import DeviceManager from email_service import VerificationEmailRateLimiter, email_service from logger import logger diff --git a/src/backend/routes/users.py b/src/backend/routes/users.py index 750e854..29ce87c 100644 --- a/src/backend/routes/users.py +++ b/src/backend/routes/users.py @@ -16,7 +16,7 @@ from typing import Any, Dict, List, Optional from fastapi import APIRouter, File, Form, HTTPException, Depends, Body, Path, Query, Request, BackgroundTasks, UploadFile from fastapi.responses import FileResponse, JSONResponse, StreamingResponse -from database import RedisDatabase, redis_manager +from database.manager import RedisDatabase, redis_manager from device_manager import DeviceManager from email_service import VerificationEmailRateLimiter, email_service from logger import logger diff --git a/src/backend/utils/dependencies.py b/src/backend/utils/dependencies.py index 8775017..0742308 100644 --- a/src/backend/utils/dependencies.py +++ b/src/backend/utils/dependencies.py @@ -14,7 +14,7 @@ from prometheus_fastapi_instrumentator import Instrumentator import defines -from database import RedisDatabase, redis_manager, DatabaseManager +from database.manager import RedisDatabase, redis_manager, DatabaseManager from models import BaseUserWithType, Candidate, CandidateAI, Employer, Guest from logger import logger from background_tasks import BackgroundTaskManager diff --git a/src/backend/utils/rate_limiter.py b/src/backend/utils/rate_limiter.py index e4988a3..eee96a2 100644 --- a/src/backend/utils/rate_limiter.py +++ b/src/backend/utils/rate_limiter.py @@ -9,7 +9,7 @@ from datetime import datetime, timedelta, UTC from typing import Callable, Dict, Optional, Tuple, Any from fastapi import Depends, HTTPException, Request from pydantic import BaseModel # type: ignore -from database import RedisDatabase +from database.manager import RedisDatabase from logger import logger from . dependencies import get_current_user_or_guest, get_database