From 168fe8cc8ec33217f84736ba3e4681035e615c75 Mon Sep 17 00:00:00 2001 From: James Ketrenos Date: Wed, 18 Jun 2025 12:36:32 -0700 Subject: [PATCH] Moved auth_utils to utils --- src/backend/utils/auth_utils.py | 275 ++++++++++++++++++++++++++++++++ 1 file changed, 275 insertions(+) create mode 100644 src/backend/utils/auth_utils.py diff --git a/src/backend/utils/auth_utils.py b/src/backend/utils/auth_utils.py new file mode 100644 index 0000000..0a31dac --- /dev/null +++ b/src/backend/utils/auth_utils.py @@ -0,0 +1,275 @@ +# auth_utils.py +""" +Secure Authentication Utilities +Provides password hashing, verification, and security features +""" + +import traceback +import bcrypt +import secrets +import logging +from datetime import datetime, timezone, timedelta +from typing import Dict, Any, Optional, Tuple +from pydantic import BaseModel + +logger = logging.getLogger(__name__) + +class PasswordSecurity: + """Handles password hashing and verification using bcrypt""" + + @staticmethod + def hash_password(password: str) -> Tuple[str, str]: + """ + Hash a password with a random salt using bcrypt + + Args: + password: Plain text password + + Returns: + Tuple of (password_hash, salt) both as strings + """ + # Generate a random salt + salt = bcrypt.gensalt() + + # Hash the password + password_hash = bcrypt.hashpw(password.encode('utf-8'), salt) + + return password_hash.decode('utf-8'), salt.decode('utf-8') + + @staticmethod + def verify_password(password: str, password_hash: str) -> bool: + """ + Verify a password against its hash + + Args: + password: Plain text password to verify + password_hash: Stored password hash + + Returns: + True if password matches, False otherwise + """ + try: + return bcrypt.checkpw( + password.encode('utf-8'), + password_hash.encode('utf-8') + ) + except Exception as e: + logger.error(f"Password verification error: {e}") + return False + + @staticmethod + def generate_secure_token(length: int = 32) -> str: + """Generate a cryptographically secure random token""" + return secrets.token_urlsafe(length) + +class AuthenticationRecord(BaseModel): + """Authentication record for storing user credentials""" + user_id: str + password_hash: str + salt: str + refresh_tokens: list = [] + reset_password_token: Optional[str] = None + reset_password_expiry: Optional[datetime] = None + last_password_change: datetime + mfa_enabled: bool = False + mfa_method: Optional[str] = None + mfa_secret: Optional[str] = None + login_attempts: int = 0 + locked_until: Optional[datetime] = None + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + +class SecurityConfig: + """Security configuration constants""" + MAX_LOGIN_ATTEMPTS = 5 + ACCOUNT_LOCKOUT_DURATION_MINUTES = 15 + PASSWORD_MIN_LENGTH = 8 + TOKEN_EXPIRY_HOURS = 24 + REFRESH_TOKEN_EXPIRY_DAYS = 30 + +class AuthenticationManager: + """Manages authentication operations with security features""" + + def __init__(self, database): + self.database = database + self.password_security = PasswordSecurity() + + async def create_user_authentication(self, user_id: str, password: str) -> AuthenticationRecord: + """ + Create authentication record for a new user + + Args: + user_id: Unique user identifier + password: Plain text password + + Returns: + AuthenticationRecord object + """ + if len(password) < SecurityConfig.PASSWORD_MIN_LENGTH: + raise ValueError(f"Password must be at least {SecurityConfig.PASSWORD_MIN_LENGTH} characters long") + + # Hash the password + password_hash, salt = self.password_security.hash_password(password) + + # Create authentication record + auth_record = AuthenticationRecord( + user_id=user_id, + password_hash=password_hash, + salt=salt, + last_password_change=datetime.now(timezone.utc), + login_attempts=0 + ) + + # Store in database + await self.database.set_authentication(user_id, auth_record.model_dump()) + + logger.info(f"🔐 Created authentication record for user {user_id}") + return auth_record + + async def verify_user_credentials(self, login: str, password: str) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]: + """ + Verify user credentials with security checks + + Args: + login: Email or username + password: Plain text password + + Returns: + Tuple of (is_valid, user_data, error_message) + """ + try: + # Get user data + user_data = await self.database.get_user(login) + if not user_data: + logger.warning(f"⚠️ Login attempt with non-existent user: {login}") + return False, None, "Invalid credentials" + + # Get authentication record + auth_record = await self.database.get_authentication(user_data["id"]) + if not auth_record: + logger.error(f"❌ No authentication record found for user {user_data['id']}") + return False, None, "Authentication record not found" + + auth_data = AuthenticationRecord.model_validate(auth_record) + + # Check if account is locked + if auth_data.locked_until and auth_data.locked_until > datetime.now(timezone.utc): + time_until_unlock = auth_data.locked_until - datetime.now(timezone.utc) + # Convert time_until_unlock to minutes:seconds format + total_seconds = time_until_unlock.total_seconds() + minutes = int(total_seconds // 60) + seconds = int(total_seconds % 60) + time_until_unlock_str = f"{minutes}m {seconds}s" + logger.warning(f"🔒 Account is locked for user {login} for another {time_until_unlock_str}.") + return False, None, f"Account is temporarily locked due to too many failed attempts. Retry after {time_until_unlock_str}" + + # Verify password + if not self.password_security.verify_password(password, auth_data.password_hash): + # Increment failed attempts + auth_data.login_attempts += 1 + + # Lock account if too many attempts + if auth_data.login_attempts >= SecurityConfig.MAX_LOGIN_ATTEMPTS: + auth_data.locked_until = datetime.now(timezone.utc) + timedelta( + minutes=SecurityConfig.ACCOUNT_LOCKOUT_DURATION_MINUTES + ) + logger.warning(f"🔒 Account locked for user {login} after {auth_data.login_attempts} failed attempts") + + # Update authentication record + await self.database.set_authentication(user_data["id"], auth_data.model_dump()) + + logger.warning(f"⚠️ Invalid password for user {login} (attempt {auth_data.login_attempts})") + return False, None, "Invalid credentials" + + # Reset failed attempts on successful login + if auth_data.login_attempts > 0: + auth_data.login_attempts = 0 + auth_data.locked_until = None + await self.database.set_authentication(user_data["id"], auth_data.model_dump()) + + logger.info(f"✅ Successful authentication for user {login}") + return True, user_data, None + + except Exception as e: + logger.error(traceback.format_exc()) + logger.error(f"❌ Authentication error for user {login}: {e}") + return False, None, "Authentication failed" + + async def check_user_exists(self, email: str, username: str | None = None) -> Tuple[bool, Optional[str]]: + """ + Check if a user already exists with the given email or username + + Args: + email: Email address to check + username: Username to check (optional) + + Returns: + Tuple of (exists, conflict_field) + """ + try: + # Check email + existing_user = await self.database.get_user(email) + if existing_user: + return True, "email" + + # Check username if provided + if username: + existing_user = await self.database.get_user(username) + if existing_user: + return True, "username" + + return False, None + + except Exception as e: + logger.error(f"❌ Error checking user existence: {e}") + # In case of error, assume user doesn't exist to avoid blocking creation + return False, None + + async def update_last_login(self, user_id: str): + """Update user's last login timestamp""" + try: + user_data = await self.database.get_user_by_id(user_id) + if user_data: + user_data["lastLogin"] = datetime.now(timezone.utc).isoformat() + await self.database.set_user_by_id(user_id, user_data) + except Exception as e: + logger.error(f"❌ Error updating last login for user {user_id}: {e}") + +# Utility functions for common operations +def validate_password_strength(password: str) -> Tuple[bool, list]: + """ + Validate password strength + + Args: + password: Password to validate + + Returns: + Tuple of (is_valid, list_of_issues) + """ + issues = [] + + if len(password) < SecurityConfig.PASSWORD_MIN_LENGTH: + issues.append(f"Password must be at least {SecurityConfig.PASSWORD_MIN_LENGTH} characters long") + + if not any(c.isupper() for c in password): + issues.append("Password must contain at least one uppercase letter") + + if not any(c.islower() for c in password): + issues.append("Password must contain at least one lowercase letter") + + if not any(c.isdigit() for c in password): + issues.append("Password must contain at least one digit") + + # Check for special characters + special_chars = "!@#$%^&*()_+-=[]{}|;:,.<>?" + if not any(c in special_chars for c in password): + issues.append("Password must contain at least one special character") + + return len(issues) == 0, issues + +def sanitize_login_input(login: str) -> str: + """Sanitize login input (email or username)""" + return login.strip().lower() if login else "" \ No newline at end of file