Moved auth_utils to utils

This commit is contained in:
James Ketr 2025-06-18 12:36:32 -07:00
parent aefc14c610
commit 168fe8cc8e

View File

@ -0,0 +1,275 @@
# auth_utils.py
"""
Secure Authentication Utilities
Provides password hashing, verification, and security features
"""
import traceback
import bcrypt
import secrets
import logging
from datetime import datetime, timezone, timedelta
from typing import Dict, Any, Optional, Tuple
from pydantic import BaseModel
logger = logging.getLogger(__name__)
class PasswordSecurity:
"""Handles password hashing and verification using bcrypt"""
@staticmethod
def hash_password(password: str) -> Tuple[str, str]:
"""
Hash a password with a random salt using bcrypt
Args:
password: Plain text password
Returns:
Tuple of (password_hash, salt) both as strings
"""
# Generate a random salt
salt = bcrypt.gensalt()
# Hash the password
password_hash = bcrypt.hashpw(password.encode('utf-8'), salt)
return password_hash.decode('utf-8'), salt.decode('utf-8')
@staticmethod
def verify_password(password: str, password_hash: str) -> bool:
"""
Verify a password against its hash
Args:
password: Plain text password to verify
password_hash: Stored password hash
Returns:
True if password matches, False otherwise
"""
try:
return bcrypt.checkpw(
password.encode('utf-8'),
password_hash.encode('utf-8')
)
except Exception as e:
logger.error(f"Password verification error: {e}")
return False
@staticmethod
def generate_secure_token(length: int = 32) -> str:
"""Generate a cryptographically secure random token"""
return secrets.token_urlsafe(length)
class AuthenticationRecord(BaseModel):
"""Authentication record for storing user credentials"""
user_id: str
password_hash: str
salt: str
refresh_tokens: list = []
reset_password_token: Optional[str] = None
reset_password_expiry: Optional[datetime] = None
last_password_change: datetime
mfa_enabled: bool = False
mfa_method: Optional[str] = None
mfa_secret: Optional[str] = None
login_attempts: int = 0
locked_until: Optional[datetime] = None
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() if v else None
}
class SecurityConfig:
"""Security configuration constants"""
MAX_LOGIN_ATTEMPTS = 5
ACCOUNT_LOCKOUT_DURATION_MINUTES = 15
PASSWORD_MIN_LENGTH = 8
TOKEN_EXPIRY_HOURS = 24
REFRESH_TOKEN_EXPIRY_DAYS = 30
class AuthenticationManager:
"""Manages authentication operations with security features"""
def __init__(self, database):
self.database = database
self.password_security = PasswordSecurity()
async def create_user_authentication(self, user_id: str, password: str) -> AuthenticationRecord:
"""
Create authentication record for a new user
Args:
user_id: Unique user identifier
password: Plain text password
Returns:
AuthenticationRecord object
"""
if len(password) < SecurityConfig.PASSWORD_MIN_LENGTH:
raise ValueError(f"Password must be at least {SecurityConfig.PASSWORD_MIN_LENGTH} characters long")
# Hash the password
password_hash, salt = self.password_security.hash_password(password)
# Create authentication record
auth_record = AuthenticationRecord(
user_id=user_id,
password_hash=password_hash,
salt=salt,
last_password_change=datetime.now(timezone.utc),
login_attempts=0
)
# Store in database
await self.database.set_authentication(user_id, auth_record.model_dump())
logger.info(f"🔐 Created authentication record for user {user_id}")
return auth_record
async def verify_user_credentials(self, login: str, password: str) -> Tuple[bool, Optional[Dict[str, Any]], Optional[str]]:
"""
Verify user credentials with security checks
Args:
login: Email or username
password: Plain text password
Returns:
Tuple of (is_valid, user_data, error_message)
"""
try:
# Get user data
user_data = await self.database.get_user(login)
if not user_data:
logger.warning(f"⚠️ Login attempt with non-existent user: {login}")
return False, None, "Invalid credentials"
# Get authentication record
auth_record = await self.database.get_authentication(user_data["id"])
if not auth_record:
logger.error(f"❌ No authentication record found for user {user_data['id']}")
return False, None, "Authentication record not found"
auth_data = AuthenticationRecord.model_validate(auth_record)
# Check if account is locked
if auth_data.locked_until and auth_data.locked_until > datetime.now(timezone.utc):
time_until_unlock = auth_data.locked_until - datetime.now(timezone.utc)
# Convert time_until_unlock to minutes:seconds format
total_seconds = time_until_unlock.total_seconds()
minutes = int(total_seconds // 60)
seconds = int(total_seconds % 60)
time_until_unlock_str = f"{minutes}m {seconds}s"
logger.warning(f"🔒 Account is locked for user {login} for another {time_until_unlock_str}.")
return False, None, f"Account is temporarily locked due to too many failed attempts. Retry after {time_until_unlock_str}"
# Verify password
if not self.password_security.verify_password(password, auth_data.password_hash):
# Increment failed attempts
auth_data.login_attempts += 1
# Lock account if too many attempts
if auth_data.login_attempts >= SecurityConfig.MAX_LOGIN_ATTEMPTS:
auth_data.locked_until = datetime.now(timezone.utc) + timedelta(
minutes=SecurityConfig.ACCOUNT_LOCKOUT_DURATION_MINUTES
)
logger.warning(f"🔒 Account locked for user {login} after {auth_data.login_attempts} failed attempts")
# Update authentication record
await self.database.set_authentication(user_data["id"], auth_data.model_dump())
logger.warning(f"⚠️ Invalid password for user {login} (attempt {auth_data.login_attempts})")
return False, None, "Invalid credentials"
# Reset failed attempts on successful login
if auth_data.login_attempts > 0:
auth_data.login_attempts = 0
auth_data.locked_until = None
await self.database.set_authentication(user_data["id"], auth_data.model_dump())
logger.info(f"✅ Successful authentication for user {login}")
return True, user_data, None
except Exception as e:
logger.error(traceback.format_exc())
logger.error(f"❌ Authentication error for user {login}: {e}")
return False, None, "Authentication failed"
async def check_user_exists(self, email: str, username: str | None = None) -> Tuple[bool, Optional[str]]:
"""
Check if a user already exists with the given email or username
Args:
email: Email address to check
username: Username to check (optional)
Returns:
Tuple of (exists, conflict_field)
"""
try:
# Check email
existing_user = await self.database.get_user(email)
if existing_user:
return True, "email"
# Check username if provided
if username:
existing_user = await self.database.get_user(username)
if existing_user:
return True, "username"
return False, None
except Exception as e:
logger.error(f"❌ Error checking user existence: {e}")
# In case of error, assume user doesn't exist to avoid blocking creation
return False, None
async def update_last_login(self, user_id: str):
"""Update user's last login timestamp"""
try:
user_data = await self.database.get_user_by_id(user_id)
if user_data:
user_data["lastLogin"] = datetime.now(timezone.utc).isoformat()
await self.database.set_user_by_id(user_id, user_data)
except Exception as e:
logger.error(f"❌ Error updating last login for user {user_id}: {e}")
# Utility functions for common operations
def validate_password_strength(password: str) -> Tuple[bool, list]:
"""
Validate password strength
Args:
password: Password to validate
Returns:
Tuple of (is_valid, list_of_issues)
"""
issues = []
if len(password) < SecurityConfig.PASSWORD_MIN_LENGTH:
issues.append(f"Password must be at least {SecurityConfig.PASSWORD_MIN_LENGTH} characters long")
if not any(c.isupper() for c in password):
issues.append("Password must contain at least one uppercase letter")
if not any(c.islower() for c in password):
issues.append("Password must contain at least one lowercase letter")
if not any(c.isdigit() for c in password):
issues.append("Password must contain at least one digit")
# Check for special characters
special_chars = "!@#$%^&*()_+-=[]{}|;:,.<>?"
if not any(c in special_chars for c in password):
issues.append("Password must contain at least one special character")
return len(issues) == 0, issues
def sanitize_login_input(login: str) -> str:
"""Sanitize login input (email or username)"""
return login.strip().lower() if login else ""