from datetime import UTC, datetime, timedelta, timezone import logging import json from typing import List, Optional, Any, Dict, TYPE_CHECKING, Self from .protocols import DatabaseProtocol from ..constants import KEY_PREFIXES logger = logging.getLogger(__name__) class UserMixin(DatabaseProtocol): """Mixin for user operations""" # ================ # Guests # ================ async def set_guest(self, guest_id: str, guest_data: Dict[str, Any]) -> None: """Store guest data with enhanced persistence""" try: # Ensure last_activity is always set guest_data["last_activity"] = datetime.now(UTC).isoformat() # Store in Redis with both hash and individual key for redundancy await self.redis.hset("guests", guest_id, json.dumps(guest_data))# type: ignore # Also store with a longer TTL as backup await self.redis.setex( f"guest_backup:{guest_id}", 86400 * 7, # 7 days TTL json.dumps(guest_data) ) logger.info(f"๐Ÿ’พ Guest stored with backup: {guest_id}") except Exception as e: logger.error(f"โŒ Error storing guest {guest_id}: {e}") raise async def get_guest(self, guest_id: str) -> Optional[Dict[str, Any]]: """Get guest data with fallback to backup""" try: # Try primary storage first data = await self.redis.hget("guests", guest_id)# type: ignore if data: guest_data = json.loads(data) # Update last activity when accessed guest_data["last_activity"] = datetime.now(UTC).isoformat() await self.set_guest(guest_id, guest_data) logger.info(f"๐Ÿ” Guest found in primary storage: {guest_id}") return guest_data # Fallback to backup storage backup_data = await self.redis.get(f"guest_backup:{guest_id}") if backup_data: guest_data = json.loads(backup_data) guest_data["last_activity"] = datetime.now(UTC).isoformat() # Restore to primary storage await self.set_guest(guest_id, guest_data) logger.info(f"๐Ÿ”„ Guest restored from backup: {guest_id}") return guest_data logger.warning(f"โš ๏ธ Guest not found: {guest_id}") return None except Exception as e: logger.error(f"โŒ Error getting guest {guest_id}: {e}") return None async def get_guest_by_session_id(self, session_id: str) -> Optional[Dict[str, Any]]: """Get guest data by session ID""" try: all_guests = await self.get_all_guests() for guest_data in all_guests.values(): if guest_data.get("session_id") == session_id: return guest_data return None except Exception as e: logger.error(f"โŒ Error getting guest by session ID {session_id}: {e}") return None async def get_all_guests(self) -> Dict[str, Dict[str, Any]]: """Get all guests""" try: data = await self.redis.hgetall("guests")# type: ignore return { guest_id: json.loads(guest_json) for guest_id, guest_json in data.items() } except Exception as e: logger.error(f"โŒ Error getting all guests: {e}") return {} async def delete_guest(self, guest_id: str) -> bool: """Delete a guest""" try: result = await self.redis.hdel("guests", guest_id)# type: ignore if result: logger.info(f"๐Ÿ—‘๏ธ Guest deleted: {guest_id}") return True return False except Exception as e: logger.error(f"โŒ Error deleting guest {guest_id}: {e}") return False async def cleanup_inactive_guests(self, inactive_hours: int = 24) -> int: """Clean up inactive guest sessions with safety checks""" try: all_guests = await self.get_all_guests() current_time = datetime.now(UTC) cutoff_time = current_time - timedelta(hours=inactive_hours) deleted_count = 0 preserved_count = 0 for guest_id, guest_data in all_guests.items(): try: last_activity_str = guest_data.get("last_activity") created_at_str = guest_data.get("created_at") # Skip cleanup if guest is very new (less than 1 hour old) if created_at_str: created_at = datetime.fromisoformat(created_at_str.replace('Z', '+00:00')) if current_time - created_at < timedelta(hours=1): preserved_count += 1 logger.info(f"๐Ÿ›ก๏ธ Preserving new guest: {guest_id}") continue # Check last activity should_delete = False if last_activity_str: try: last_activity = datetime.fromisoformat(last_activity_str.replace('Z', '+00:00')) if last_activity < cutoff_time: should_delete = True except ValueError: # Invalid date format, but don't delete if guest is new if not created_at_str: should_delete = True else: # No last activity, but don't delete if guest is new if not created_at_str: should_delete = True if should_delete: await self.delete_guest(guest_id) deleted_count += 1 else: preserved_count += 1 except Exception as e: logger.error(f"โŒ Error processing guest {guest_id} for cleanup: {e}") preserved_count += 1 # Preserve on error if deleted_count > 0: logger.info(f"๐Ÿงน Guest cleanup: removed {deleted_count}, preserved {preserved_count}") return deleted_count except Exception as e: logger.error(f"โŒ Error in guest cleanup: {e}") return 0 async def get_guest_statistics(self) -> Dict[str, Any]: """Get guest usage statistics""" try: all_guests = await self.get_all_guests() current_time = datetime.now(UTC) stats = { "total_guests": len(all_guests), "active_last_hour": 0, "active_last_day": 0, "converted_guests": 0, "by_ip": {}, "creation_timeline": {} } hour_ago = current_time - timedelta(hours=1) day_ago = current_time - timedelta(days=1) for guest_data in all_guests.values(): # Check activity last_activity_str = guest_data.get("last_activity") if last_activity_str: try: last_activity = datetime.fromisoformat(last_activity_str.replace('Z', '+00:00')) if last_activity > hour_ago: stats["active_last_hour"] += 1 if last_activity > day_ago: stats["active_last_day"] += 1 except ValueError: pass # Check conversions if guest_data.get("converted_to_user_id"): stats["converted_guests"] += 1 # IP tracking ip = guest_data.get("ip_address", "unknown") stats["by_ip"][ip] = stats["by_ip"].get(ip, 0) + 1 # Creation timeline created_at_str = guest_data.get("created_at") if created_at_str: try: created_at = datetime.fromisoformat(created_at_str.replace('Z', '+00:00')) date_key = created_at.strftime('%Y-%m-%d') stats["creation_timeline"][date_key] = stats["creation_timeline"].get(date_key, 0) + 1 except ValueError: pass return stats except Exception as e: logger.error(f"โŒ Error getting guest statistics: {e}") return {} # ================ # Users # ================ async def get_user_by_username(self, username: str) -> Optional[Dict]: """Get user by username specifically""" username_key = f"{KEY_PREFIXES['users']}{username.lower()}" data = await self.redis.get(username_key) return self._deserialize(data) if data else None async def get_user_rag_update_time(self, user_id: str) -> Optional[datetime]: """Get the last time user's RAG data was updated (returns timezone-aware UTC)""" try: rag_update_key = f"user:{user_id}:rag_last_update" timestamp_str = await self.redis.get(rag_update_key) if timestamp_str: dt = datetime.fromisoformat(timestamp_str) # Ensure the datetime is timezone-aware (assume UTC if naive) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) else: # Convert to UTC if it's in a different timezone dt = dt.astimezone(timezone.utc) return dt logger.warning(f"โš ๏ธ No RAG update time found for user {user_id}") return None except Exception as e: logger.error(f"โŒ Error getting user RAG update time: {e}") return None async def update_user_rag_timestamp(self, user_id: str) -> bool: """Set the user's RAG data update time (stores as UTC ISO format)""" try: update_time = datetime.now(timezone.utc) # Ensure we're storing UTC timezone-aware format if update_time.tzinfo is None: update_time = update_time.replace(tzinfo=timezone.utc) else: update_time = update_time.astimezone(timezone.utc) rag_update_key = f"user:{user_id}:rag_last_update" # Store as ISO format with timezone info timestamp_str = update_time.isoformat() # This includes timezone await self.redis.set(rag_update_key, timestamp_str) logger.info(f"โœ… User RAG update time set for user {user_id}: {timestamp_str}") return True except Exception as e: logger.error(f"โŒ Error setting user RAG update time: {e}") return False async def set_user_by_id(self, user_id: str, user_data: Dict[str, Any]) -> bool: """Store user data with ID as key for direct lookup""" try: key = f"user_by_id:{user_id}" await self.redis.set(key, json.dumps(user_data, default=str)) logger.info(f"๐Ÿ‘ค Stored user data by ID for {user_id}") return True except Exception as e: logger.error(f"โŒ Error storing user by ID {user_id}: {e}") return False async def get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]: """Get user lookup data by user ID""" try: data = await self.redis.hget("user_lookup_by_id", user_id)# type: ignore if data: return json.loads(data) return None except Exception as e: logger.error(f"โŒ Error getting user by ID {user_id}: {e}") return None async def user_exists_by_email(self, email: str) -> bool: """Check if a user exists with the given email""" try: key = f"users:{email.lower()}" exists = await self.redis.exists(key) return exists > 0 except Exception as e: logger.error(f"โŒ Error checking user existence by email {email}: {e}") return False async def user_exists_by_username(self, username: str) -> bool: """Check if a user exists with the given username""" try: key = f"users:{username.lower()}" exists = await self.redis.exists(key) return exists > 0 except Exception as e: logger.error(f"โŒ Error checking user existence by username {username}: {e}") return False async def get_all_users(self) -> Dict[str, Any]: """Get all users""" pattern = f"{KEY_PREFIXES['users']}*" keys = await self.redis.keys(pattern) if not keys: return {} pipe = self.redis.pipeline() for key in keys: pipe.get(key) values = await pipe.execute() result = {} for key, value in zip(keys, values): email = key.replace(KEY_PREFIXES['users'], '') logger.info(f"๐Ÿ” Found user key: {key}, type: {type(value)}") if type(value) == str: result[email] = value else: result[email] = self._deserialize(value) return result async def delete_user(self, email: str): """Delete user""" key = f"{KEY_PREFIXES['users']}{email}" await self.redis.delete(key) async def get_user(self, login: str) -> Optional[Dict[str, Any]]: """Get user by email or username""" try: login = login.strip().lower() key = f"users:{login}" data = await self.redis.get(key) if data: user_data = json.loads(data) logger.info(f"๐Ÿ‘ค Retrieved user data for {login}") return user_data return None except Exception as e: logger.error(f"โŒ Error retrieving user {login}: {e}") return None async def set_user(self, login: str, user_data: Dict[str, Any]) -> bool: """Store user data by email or username""" try: login = login.strip().lower() key = f"users:{login}" await self.redis.set(key, json.dumps(user_data, default=str)) logger.info(f"๐Ÿ‘ค Stored user data for {login}") return True except Exception as e: logger.error(f"โŒ Error storing user {login}: {e}") return False # ================ # Employers # ================ async def get_employer(self, employer_id: str) -> Optional[Dict]: """Get employer by ID""" key = f"{KEY_PREFIXES['employers']}{employer_id}" data = await self.redis.get(key) return self._deserialize(data) if data else None async def set_employer(self, employer_id: str, employer_data: Dict): """Set employer data""" key = f"{KEY_PREFIXES['employers']}{employer_id}" await self.redis.set(key, self._serialize(employer_data)) async def get_all_employers(self) -> Dict[str, Any]: """Get all employers""" pattern = f"{KEY_PREFIXES['employers']}*" keys = await self.redis.keys(pattern) if not keys: return {} pipe = self.redis.pipeline() for key in keys: pipe.get(key) values = await pipe.execute() result = {} for key, value in zip(keys, values): employer_id = key.replace(KEY_PREFIXES['employers'], '') result[employer_id] = self._deserialize(value) return result async def delete_employer(self, employer_id: str): """Delete employer""" key = f"{KEY_PREFIXES['employers']}{employer_id}" await self.redis.delete(key) # ================ # Candidates # ================ async def get_candidate(self, candidate_id: str) -> Optional[Dict]: """Get candidate by ID""" key = f"{KEY_PREFIXES['candidates']}{candidate_id}" data = await self.redis.get(key) return self._deserialize(data) if data else None async def set_candidate(self, candidate_id: str, candidate_data: Dict): """Set candidate data""" key = f"{KEY_PREFIXES['candidates']}{candidate_id}" await self.redis.set(key, self._serialize(candidate_data)) async def get_all_candidates(self) -> Dict[str, Any]: """Get all candidates""" pattern = f"{KEY_PREFIXES['candidates']}*" keys = await self.redis.keys(pattern) if not keys: return {} # Use pipeline for efficiency pipe = self.redis.pipeline() for key in keys: pipe.get(key) values = await pipe.execute() result = {} for key, value in zip(keys, values): candidate_id = key.replace(KEY_PREFIXES['candidates'], '') result[candidate_id] = self._deserialize(value) return result async def delete_candidate(self: Self, candidate_id: str) -> Dict[str, int]: """ Delete candidate and all related records in a cascading manner Returns a dictionary with counts of deleted items for each category """ try: deletion_stats = { "documents": 0, "chat_sessions": 0, "chat_messages": 0, "job_applications": 0, "user_records": 0, "auth_records": 0, "security_logs": 0, "ai_parameters": 0, "candidate_record": 0, "resumes": 0 } logger.info(f"๐Ÿ—‘๏ธ Starting cascading delete for candidate {candidate_id}") # 1. Get candidate data first to retrieve associated information candidate_data = await self.get_candidate(candidate_id) if not candidate_data: logger.warning(f"โš ๏ธ Candidate {candidate_id} not found") return deletion_stats candidate_email = candidate_data.get("email", "").lower() candidate_username = candidate_data.get("username", "").lower() # 2. Delete all candidate documents and their metadata try: documents_deleted = await self.delete_all_candidate_documents(candidate_id) deletion_stats["documents"] = documents_deleted logger.info(f"๐Ÿ—‘๏ธ Deleted {documents_deleted} documents for candidate {candidate_id}") except Exception as e: logger.error(f"โŒ Error deleting candidate documents: {e}") # 3. Delete all chat sessions related to this candidate try: candidate_sessions = await self.get_chat_sessions_by_candidate(candidate_id) messages_deleted = 0 for session in candidate_sessions: session_id = session.get("id") if session_id: # Count messages before deletion message_count = await self.get_chat_message_count(session_id) messages_deleted += message_count # Delete chat session and its messages await self.delete_chat_session_completely(session_id) deletion_stats["chat_sessions"] = len(candidate_sessions) deletion_stats["chat_messages"] = messages_deleted logger.info(f"๐Ÿ—‘๏ธ Deleted {len(candidate_sessions)} chat sessions and {messages_deleted} messages for candidate {candidate_id}") except Exception as e: logger.error(f"โŒ Error deleting chat sessions: {e}") # 4. Delete job applications from this candidate try: all_applications = await self.get_all_job_applications() candidate_applications = [] for app_id, app_data in all_applications.items(): if app_data.get("candidateId") == candidate_id: candidate_applications.append(app_id) # Delete each application for app_id in candidate_applications: await self.delete_job_application(app_id) deletion_stats["job_applications"] = len(candidate_applications) logger.info(f"๐Ÿ—‘๏ธ Deleted {len(candidate_applications)} job applications for candidate {candidate_id}") except Exception as e: logger.error(f"โŒ Error deleting job applications: {e}") # 5. Delete user records (by email and username if they exist) try: user_records_deleted = 0 # Delete by email if candidate_email and await self.user_exists_by_email(candidate_email): await self.delete_user(candidate_email) user_records_deleted += 1 logger.info(f"๐Ÿ—‘๏ธ Deleted user record by email: {candidate_email}") # Delete by username (if different from email) if (candidate_username and candidate_username != candidate_email and await self.user_exists_by_username(candidate_username)): await self.delete_user(candidate_username) user_records_deleted += 1 logger.info(f"๐Ÿ—‘๏ธ Deleted user record by username: {candidate_username}") # Delete user by ID if exists user_by_id = await self.get_user_by_id(candidate_id) if user_by_id: key = f"user_by_id:{candidate_id}" await self.redis.delete(key) user_records_deleted += 1 logger.info(f"๐Ÿ—‘๏ธ Deleted user record by ID: {candidate_id}") deletion_stats["user_records"] = user_records_deleted logger.info(f"๐Ÿ—‘๏ธ Deleted {user_records_deleted} user records for candidate {candidate_id}") except Exception as e: logger.error(f"โŒ Error deleting user records: {e}") # 6. Delete authentication records try: auth_deleted = await self.delete_authentication(candidate_id) if auth_deleted: deletion_stats["auth_records"] = 1 logger.info(f"๐Ÿ—‘๏ธ Deleted authentication record for candidate {candidate_id}") except Exception as e: logger.error(f"โŒ Error deleting authentication records: {e}") # 7. Revoke all refresh tokens for this user try: await self.revoke_all_user_tokens(candidate_id) logger.info(f"๐Ÿ—‘๏ธ Revoked all refresh tokens for candidate {candidate_id}") except Exception as e: logger.error(f"โŒ Error revoking refresh tokens: {e}") # 8. Delete security logs for this user try: security_logs_deleted = 0 # Security logs are stored by date, so we need to scan for them pattern = f"security_log:{candidate_id}:*" cursor = 0 while True: cursor, keys = await self.redis.scan(cursor, match=pattern, count=100) if keys: await self.redis.delete(*keys) security_logs_deleted += len(keys) if cursor == 0: break deletion_stats["security_logs"] = security_logs_deleted if security_logs_deleted > 0: logger.info(f"๐Ÿ—‘๏ธ Deleted {security_logs_deleted} security log entries for candidate {candidate_id}") except Exception as e: logger.error(f"โŒ Error deleting security logs: {e}") # 9. Delete AI parameters that might be specific to this candidate try: all_ai_params = await self.get_all_ai_parameters() candidate_ai_params = [] for param_id, param_data in all_ai_params.items(): if (param_data.get("candidateId") == candidate_id or param_data.get("userId") == candidate_id): candidate_ai_params.append(param_id) # Delete each AI parameter set for param_id in candidate_ai_params: await self.delete_ai_parameters(param_id) deletion_stats["ai_parameters"] = len(candidate_ai_params) if len(candidate_ai_params) > 0: logger.info(f"๐Ÿ—‘๏ธ Deleted {len(candidate_ai_params)} AI parameter sets for candidate {candidate_id}") except Exception as e: logger.error(f"โŒ Error deleting AI parameters: {e}") # 10. Delete email verification tokens if any exist try: if candidate_email: # Clean up any pending verification tokens pattern = "email_verification:*" cursor = 0 tokens_deleted = 0 while True: cursor, keys = await self.redis.scan(cursor, match=pattern, count=100) for key in keys: token_data = await self.redis.get(key) if token_data: verification_info = json.loads(token_data) if verification_info.get("email", "").lower() == candidate_email: await self.redis.delete(key) tokens_deleted += 1 if cursor == 0: break if tokens_deleted > 0: logger.info(f"๐Ÿ—‘๏ธ Deleted {tokens_deleted} email verification tokens for candidate {candidate_id}") except Exception as e: logger.error(f"โŒ Error deleting email verification tokens: {e}") # 11. Delete password reset tokens if any exist try: if candidate_email: pattern = "password_reset:*" cursor = 0 tokens_deleted = 0 while True: cursor, keys = await self.redis.scan(cursor, match=pattern, count=100) for key in keys: token_data = await self.redis.get(key) if token_data: reset_info = json.loads(token_data) if reset_info.get("email", "").lower() == candidate_email: await self.redis.delete(key) tokens_deleted += 1 if cursor == 0: break if tokens_deleted > 0: logger.info(f"๐Ÿ—‘๏ธ Deleted {tokens_deleted} password reset tokens for candidate {candidate_id}") except Exception as e: logger.error(f"โŒ Error deleting password reset tokens: {e}") # 12. Delete MFA codes if any exist try: if candidate_email: pattern = f"mfa_code:{candidate_email}:*" cursor = 0 mfa_codes_deleted = 0 while True: cursor, keys = await self.redis.scan(cursor, match=pattern, count=100) if keys: await self.redis.delete(*keys) mfa_codes_deleted += len(keys) if cursor == 0: break if mfa_codes_deleted > 0: logger.info(f"๐Ÿ—‘๏ธ Deleted {mfa_codes_deleted} MFA codes for candidate {candidate_id}") except Exception as e: logger.error(f"โŒ Error deleting MFA codes: {e}") # 13. Finally, delete the candidate record itself try: key = f"{KEY_PREFIXES['candidates']}{candidate_id}" result = await self.redis.delete(key) deletion_stats["candidate_record"] = result logger.info(f"๐Ÿ—‘๏ธ Deleted candidate record for {candidate_id}") except Exception as e: logger.error(f"โŒ Error deleting candidate record: {e}") # 14. Delete resumes associated with this candidate across all users try: all_resumes = await self.get_all_resumes() candidate_resumes_deleted = 0 for user_id, user_resumes in all_resumes.items(): resumes_to_delete = [] for resume in user_resumes: if resume.get("candidate_id") == candidate_id: resumes_to_delete.append(resume.get("resume_id")) # Delete each resume for this candidate for resume_id in resumes_to_delete: if resume_id: await self.delete_resume(user_id, resume_id) candidate_resumes_deleted += 1 deletion_stats["resumes"] = candidate_resumes_deleted if candidate_resumes_deleted > 0: logger.info(f"๐Ÿ—‘๏ธ Deleted {candidate_resumes_deleted} resumes for candidate {candidate_id}") except Exception as e: logger.error(f"โŒ Error deleting resumes for candidate {candidate_id}: {e}") # 15. Log the deletion as a security event (if we have admin/system user context) try: total_items_deleted = sum(deletion_stats.values()) logger.info(f"โœ… Completed cascading delete for candidate {candidate_id}. " f"Total items deleted: {total_items_deleted}") logger.info(f"๐Ÿ“Š Deletion breakdown: {deletion_stats}") except Exception as e: logger.error(f"โŒ Error logging deletion summary: {e}") return deletion_stats except Exception as e: logger.error(f"โŒ Critical error during candidate deletion {candidate_id}: {e}") raise async def delete_candidate_batch(self, candidate_ids: List[str]) -> Dict[str, Dict[str, int]]: """ Delete multiple candidates in batch with detailed reporting Returns deletion stats for each candidate """ try: batch_results = {} total_stats = { "documents": 0, "chat_sessions": 0, "chat_messages": 0, "job_applications": 0, "user_records": 0, "auth_records": 0, "security_logs": 0, "ai_parameters": 0, "candidate_record": 0, "resumes": 0, } logger.info(f"๐Ÿ—‘๏ธ Starting batch deletion for {len(candidate_ids)} candidates") for candidate_id in candidate_ids: try: deletion_stats = await self.delete_candidate(candidate_id) batch_results[candidate_id] = deletion_stats # Add to totals for key, value in deletion_stats.items(): total_stats[key] += value except Exception as e: logger.error(f"โŒ Failed to delete candidate {candidate_id}: {e}") batch_results[candidate_id] = {"error": str(e)} logger.info(f"โœ… Completed batch deletion. Total items deleted: {sum(total_stats.values())}") logger.info(f"๐Ÿ“Š Batch totals: {total_stats}") return { "individual_results": batch_results, "totals": total_stats, "summary": { "total_candidates_processed": len(candidate_ids), "successful_deletions": len([r for r in batch_results.values() if "error" not in r]), "failed_deletions": len([r for r in batch_results.values() if "error" in r]), "total_items_deleted": sum(total_stats.values()) } } except Exception as e: logger.error(f"โŒ Critical error during batch candidate deletion: {e}") raise async def find_candidate_by_username(self, username: str) -> Optional[Dict]: """Find candidate by username""" all_candidates = await self.get_all_candidates() username_lower = username.lower() for candidate_data in all_candidates.values(): if candidate_data.get("username", "").lower() == username_lower: return candidate_data return None async def get_multiple_candidates_by_usernames(self, usernames: List[str]) -> Dict[str, Dict]: """Get multiple candidates by their usernames efficiently""" all_candidates = await self.get_all_candidates() username_set = {username.lower() for username in usernames} result = {} for candidate_data in all_candidates.values(): candidate_username = candidate_data.get("username", "").lower() if candidate_username in username_set: result[candidate_username] = candidate_data return result async def get_candidate_chat_summary(self, candidate_id: str) -> Dict[str, Any]: """Get a summary of chat activity for a specific candidate""" sessions = await self.get_chat_sessions_by_candidate(candidate_id) if not sessions: return { "candidate_id": candidate_id, "total_sessions": 0, "total_messages": 0, "first_chat": None, "last_chat": None } total_messages = 0 for session in sessions: session_id = session.get("id") if session_id: message_count = await self.get_chat_message_count(session_id) total_messages += message_count # Sort sessions by creation date sessions_by_date = sorted(sessions, key=lambda x: x.get("createdAt", "")) return { "candidate_id": candidate_id, "total_sessions": len(sessions), "total_messages": total_messages, "first_chat": sessions_by_date[0].get("createdAt") if sessions_by_date else None, "last_chat": sessions_by_date[-1].get("lastActivity") if sessions_by_date else None, "recent_sessions": sessions[:5] # Last 5 sessions } # ================ # Viewers # ================ async def get_viewer(self, viewer_id: str) -> Optional[Dict]: """Get viewer by ID""" key = f"{KEY_PREFIXES['viewers']}{viewer_id}" data = await self.redis.get(key) return self._deserialize(data) if data else None async def set_viewer(self, viewer_id: str, viewer_data: Dict): """Set viewer data""" key = f"{KEY_PREFIXES['viewers']}{viewer_id}" await self.redis.set(key, self._serialize(viewer_data)) async def get_all_viewers(self) -> Dict[str, Any]: """Get all viewers""" pattern = f"{KEY_PREFIXES['viewers']}*" keys = await self.redis.keys(pattern) if not keys: return {} # Use pipeline for efficiency pipe = self.redis.pipeline() for key in keys: pipe.get(key) values = await pipe.execute() result = {} for key, value in zip(keys, values): viewer_id = key.replace(KEY_PREFIXES['viewers'], '') result[viewer_id] = self._deserialize(value) return result async def delete_viewer(self, viewer_id: str): """Delete viewer""" key = f"{KEY_PREFIXES['viewers']}{viewer_id}" await self.redis.delete(key)