Implementing MFA

This commit is contained in:
James Ketr 2025-05-31 19:40:30 -07:00
parent 35701d9719
commit 32f81f6314
4 changed files with 466 additions and 29 deletions

View File

@ -350,6 +350,154 @@ class RedisDatabase:
await self.redis.delete(key)
# MFA and Email Verification operations
async def find_verification_token_by_email(self, email: str) -> Optional[Dict[str, Any]]:
"""Find pending verification token by email address"""
try:
pattern = "email_verification:*"
cursor = 0
email_lower = email.lower()
while True:
cursor, keys = await self.redis.scan(cursor, match=pattern, count=100)
for key in keys:
token_data = await self.redis.get(key)
if token_data:
verification_info = json.loads(token_data)
if (verification_info.get("email", "").lower() == email_lower and
not verification_info.get("verified", False)):
# Extract token from key
token = key.replace("email_verification:", "")
verification_info["token"] = token
return verification_info
if cursor == 0:
break
return None
except Exception as e:
logger.error(f"❌ Error finding verification token by email {email}: {e}")
return None
async def get_pending_verifications_count(self) -> int:
"""Get count of pending email verifications (admin function)"""
try:
pattern = "email_verification:*"
cursor = 0
count = 0
while True:
cursor, keys = await self.redis.scan(cursor, match=pattern, count=100)
for key in keys:
token_data = await self.redis.get(key)
if token_data:
verification_info = json.loads(token_data)
if not verification_info.get("verified", False):
count += 1
if cursor == 0:
break
return count
except Exception as e:
logger.error(f"❌ Error counting pending verifications: {e}")
return 0
async def cleanup_expired_verification_tokens(self) -> int:
"""Clean up expired verification tokens and return count of cleaned tokens"""
try:
pattern = "email_verification:*"
cursor = 0
cleaned_count = 0
current_time = datetime.now(timezone.utc)
while True:
cursor, keys = await self.redis.scan(cursor, match=pattern, count=100)
for key in keys:
token_data = await self.redis.get(key)
if token_data:
verification_info = json.loads(token_data)
expires_at = datetime.fromisoformat(verification_info.get("expires_at", ""))
if current_time > expires_at:
await self.redis.delete(key)
cleaned_count += 1
logger.debug(f"🧹 Cleaned expired verification token for {verification_info.get('email')}")
if cursor == 0:
break
if cleaned_count > 0:
logger.info(f"🧹 Cleaned up {cleaned_count} expired verification tokens")
return cleaned_count
except Exception as e:
logger.error(f"❌ Error cleaning up expired verification tokens: {e}")
return 0
async def get_verification_attempts_count(self, email: str) -> int:
"""Get the number of verification emails sent for an email in the last 24 hours"""
try:
key = f"verification_attempts:{email.lower()}"
data = await self.redis.get(key)
if not data:
return 0
attempts_data = json.loads(data)
current_time = datetime.now(timezone.utc)
window_start = current_time - timedelta(hours=24)
# Filter out old attempts
recent_attempts = [
attempt for attempt in attempts_data
if datetime.fromisoformat(attempt) > window_start
]
return len(recent_attempts)
except Exception as e:
logger.error(f"❌ Error getting verification attempts count for {email}: {e}")
return 0
async def record_verification_attempt(self, email: str) -> bool:
"""Record a verification email attempt"""
try:
key = f"verification_attempts:{email.lower()}"
current_time = datetime.now(timezone.utc)
# Get existing attempts
data = await self.redis.get(key)
attempts_data = json.loads(data) if data else []
# Add current attempt
attempts_data.append(current_time.isoformat())
# Keep only last 24 hours of attempts
window_start = current_time - timedelta(hours=24)
recent_attempts = [
attempt for attempt in attempts_data
if datetime.fromisoformat(attempt) > window_start
]
# Store with 24 hour expiration
await self.redis.setex(
key,
24 * 60 * 60, # 24 hours
json.dumps(recent_attempts)
)
return True
except Exception as e:
logger.error(f"❌ Error recording verification attempt for {email}: {e}")
return False
async def store_email_verification_token(self, email: str, token: str, user_type: str, user_data: dict) -> bool:
"""Store email verification token with user data"""
try:

View File

@ -163,13 +163,16 @@ email_service = EmailService()
class EnhancedEmailService:
def __init__(self):
self.smtp_server = os.getenv("SMTP_SERVER", "smtp.gmail.com")
self.smtp_port = int(os.getenv("SMTP_PORT", "587"))
self.email_user = os.getenv("EMAIL_USER", "your-app@example.com")
self.email_password = os.getenv("EMAIL_PASSWORD", "your-app-password")
# Configure these in your .env file
self.smtp_server = os.getenv("SMTP_SERVER")
self.smtp_port = int(os.getenv("SMTP_PORT", "0"))
self.email_user = os.getenv("EMAIL_USER",)
self.email_password = os.getenv("EMAIL_PASSWORD")
self.from_name = os.getenv("FROM_NAME", "Backstory")
self.app_name = os.getenv("APP_NAME", "Backstory")
self.frontend_url = os.getenv("FRONTEND_URL", "https://backstory-beta.ketrenos.com")
if not self.smtp_server or self.smtp_port == 0 or self.email_user is None or self.email_password is None:
raise ValueError("SMTP configuration is not set in the environment variables")
def _get_template(self, template_name: str) -> dict:
"""Get email template by name"""
@ -279,6 +282,8 @@ class EnhancedEmailService:
async def _send_email(self, to_email: str, subject: str, html_content: str):
"""Send email using SMTP with improved error handling"""
try:
if not self.email_user:
raise ValueError("Email user is not configured")
# Create message
msg = MIMEMultipart('alternative')
msg['From'] = f"{self.from_name} <{self.email_user}>"
@ -292,6 +297,9 @@ class EnhancedEmailService:
# Send email with connection pooling and retry logic
max_retries = 3
if not self.smtp_server or self.smtp_port == 0 or not self.email_user or not self.email_password:
raise ValueError("SMTP configuration is not set in the environment variables")
for attempt in range(max_retries):
try:
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
@ -364,4 +372,60 @@ class EmailRateLimiter:
key,
ttl_minutes * 60,
json.dumps([timestamp.isoformat()])
)
)
class VerificationEmailRateLimiter:
def __init__(self, database: RedisDatabase):
self.database = database
self.max_attempts_per_hour = 3 # Maximum 3 emails per hour
self.max_attempts_per_day = 10 # Maximum 10 emails per day
self.cooldown_minutes = 5 # 5 minute cooldown between emails
async def can_send_verification_email(self, email: str) -> Tuple[bool, str]:
"""
Check if verification email can be sent based on rate limiting
Returns (can_send, reason_if_not)
"""
try:
email_lower = email.lower()
current_time = datetime.now(timezone.utc)
# Check daily limit
daily_count = await self.database.get_verification_attempts_count(email)
if daily_count >= self.max_attempts_per_day:
return False, f"Daily limit reached. You can request up to {self.max_attempts_per_day} verification emails per day."
# Check hourly limit
hour_ago = current_time - timedelta(hours=1)
hourly_key = f"verification_attempts:{email_lower}"
data = await self.database.redis.get(hourly_key)
if data:
attempts_data = json.loads(data)
recent_attempts = [
attempt for attempt in attempts_data
if datetime.fromisoformat(attempt) > hour_ago
]
if len(recent_attempts) >= self.max_attempts_per_hour:
return False, f"Hourly limit reached. You can request up to {self.max_attempts_per_hour} verification emails per hour."
# Check cooldown period
if recent_attempts:
last_attempt = max(datetime.fromisoformat(attempt) for attempt in recent_attempts)
time_since_last = current_time - last_attempt
if time_since_last.total_seconds() < self.cooldown_minutes * 60:
remaining_minutes = self.cooldown_minutes - int(time_since_last.total_seconds() / 60)
return False, f"Please wait {remaining_minutes} more minute(s) before requesting another email."
return True, "OK"
except Exception as e:
logger.error(f"❌ Error checking verification email rate limit: {e}")
# On error, be conservative and deny
return False, "Rate limit check failed. Please try again later."
async def record_email_sent(self, email: str):
"""Record that a verification email was sent"""
await self.database.record_verification_attempt(email)

View File

@ -45,7 +45,7 @@ from database import RedisDatabase, redis_manager, DatabaseManager
from metrics import Metrics
from llm_manager import llm_manager
import entities
from email_service import email_service
from email_service import VerificationEmailRateLimiter, email_service
from device_manager import DeviceManager
# =============================
@ -376,12 +376,12 @@ api_router = APIRouter(prefix="/api/1.0")
# ============================
@api_router.post("/auth/login")
async def enhanced_login(
async def login(
request: LoginRequest,
http_request: Request,
database: RedisDatabase = Depends(get_database)
):
"""Enhanced login with device detection and MFA"""
"""Login with device detection and MFA"""
try:
# Initialize managers
auth_manager = AuthenticationManager(database)
@ -458,7 +458,7 @@ async def enhanced_login(
return create_success_response(auth_response.model_dump(by_alias=True, exclude_unset=True))
except Exception as e:
logger.error(f"Enhanced login error: {e}")
logger.error(f"Login error: {e}")
return JSONResponse(
status_code=500,
content=create_error_response("LOGIN_ERROR", "An error occurred during login")
@ -941,32 +941,142 @@ async def resend_verification_email(
background_tasks: BackgroundTasks,
database: RedisDatabase = Depends(get_database)
):
"""Resend verification email"""
"""Resend verification email with comprehensive rate limiting and validation"""
try:
# Check if user exists and is pending
user_data = await database.get_user(request.email)
email_lower = request.email.lower().strip()
if user_data:
# Initialize rate limiter
rate_limiter = VerificationEmailRateLimiter(database)
# Check rate limiting
can_send, reason = await rate_limiter.can_send_verification_email(email_lower)
if not can_send:
logger.warning(f"⚠️ Verification email rate limit exceeded for {email_lower}: {reason}")
return JSONResponse(
status_code=400,
content=create_error_response("ALREADY_VERIFIED", "Account is already verified")
status_code=429,
content=create_error_response("RATE_LIMITED", reason)
)
# Look for pending verification
# This would require scanning verification tokens (implement if needed)
# For now, return a generic success message
# Clean up expired tokens first
await database.cleanup_expired_verification_tokens()
return create_success_response({
"message": "If your email is in our system and pending verification, a new verification email has been sent."
# Check if user already exists and is verified
user_data = await database.get_user(email_lower)
if user_data:
# User exists and is verified - don't reveal this for security
logger.info(f"🔍 Resend verification requested for already verified user: {email_lower}")
await rate_limiter.record_email_sent(email_lower) # Record attempt to prevent abuse
return create_success_response({
"message": "If your email is in our system and pending verification, a new verification email has been sent."
})
# Look for pending verification token
verification_data = await database.find_verification_token_by_email(email_lower)
if not verification_data:
# No pending verification found - don't reveal this for security
logger.info(f"🔍 Resend verification requested for non-existent pending verification: {email_lower}")
await rate_limiter.record_email_sent(email_lower) # Record attempt to prevent abuse
return create_success_response({
"message": "If your email is in our system and pending verification, a new verification email has been sent."
})
# Check if verification token has expired
expires_at = datetime.fromisoformat(verification_data["expires_at"])
current_time = datetime.now(timezone.utc)
if current_time > expires_at:
# Token expired - clean it up and inform user
await database.redis.delete(f"email_verification:{verification_data['token']}")
logger.info(f"🧹 Cleaned up expired verification token for {email_lower}")
return JSONResponse(
status_code=400,
content=create_error_response(
"TOKEN_EXPIRED",
"Your verification link has expired. Please register again to create a new account."
)
)
# Generate new verification token (invalidate old one)
old_token = verification_data["token"]
new_token = secrets.token_urlsafe(32)
# Update verification data with new token and reset attempts
verification_data.update({
"token": new_token,
"expires_at": (current_time + timedelta(hours=24)).isoformat(),
"resent_at": current_time.isoformat(),
"resend_count": verification_data.get("resend_count", 0) + 1
})
# Store new token and remove old one
await database.redis.delete(f"email_verification:{old_token}")
await database.store_email_verification_token(
email_lower,
new_token,
verification_data["user_type"],
verification_data["user_data"]
)
# Get user name for email
user_data_container = verification_data["user_data"]
user_type = verification_data["user_type"]
if user_type == "candidate":
candidate_data = user_data_container["candidate_data"]
user_name = candidate_data.get("fullName", "User")
elif user_type == "employer":
employer_data = user_data_container["employer_data"]
user_name = employer_data.get("companyName", "User")
else:
user_name = "User"
# Record email attempt
await rate_limiter.record_email_sent(email_lower)
# Send new verification email in background
background_tasks.add_task(
email_service.send_verification_email,
email_lower,
new_token,
user_name,
user_type
)
# Log security event
await database.log_security_event(
verification_data["user_data"].get("candidate_data", {}).get("id") or
verification_data["user_data"].get("employer_data", {}).get("id") or "unknown",
"verification_resend",
{
"email": email_lower,
"user_type": user_type,
"resend_count": verification_data.get("resend_count", 1),
"old_token_invalidated": old_token[:8] + "...", # Log partial token for debugging
"ip_address": "unknown" # You can extract this from request if needed
}
)
logger.info(f"✅ Verification email resent to {email_lower} (attempt #{verification_data.get('resend_count', 1)})")
return create_success_response({
"message": "A new verification email has been sent to your email address. Please check your inbox and spam folder.",
"resendCount": verification_data.get("resend_count", 1)
})
except ValueError as ve:
logger.warning(f"⚠️ Invalid resend verification request: {ve}")
return JSONResponse(
status_code=400,
content=create_error_response("VALIDATION_ERROR", str(ve))
)
except Exception as e:
logger.error(f"❌ Resend verification error: {e}")
logger.error(f"❌ Resend verification email error: {e}")
return JSONResponse(
status_code=500,
content=create_error_response("RESEND_FAILED", "Failed to resend verification email")
content=create_error_response("RESEND_FAILED", "An error occurred while processing your request. Please try again later.")
)
@api_router.post("/auth/mfa/request")
async def request_mfa(
request: MFARequest,
@ -1745,8 +1855,7 @@ async def search_jobs(
# ============================
# Chat Endpoints
# ============================
# Enhanced Chat Session Endpoints with Username Association
# Add these modifications to your main.py file
# Chat Session Endpoints with Username Association
@api_router.get("/chat/statistics")
async def get_chat_statistics(
current_user = Depends(get_current_user),
@ -1832,7 +1941,7 @@ async def archive_chat_session(
)
# ============================
# Chat Endpoints (Enhanced)
# Chat Endpoints
# ============================
@api_router.post("/chat/sessions")
@ -2334,6 +2443,121 @@ async def get_candidate_chat_sessions(
content=create_error_response("FETCH_ERROR", str(e))
)
# ============================
# Admin Endpoints
# ============================
# @api_router.get("/admin/verification-stats")
async def get_verification_statistics(
current_user = Depends(get_current_user),
database: RedisDatabase = Depends(get_database)
):
"""Get verification statistics (admin only)"""
try:
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Admin access required")
stats = {
"pending_verifications": await database.get_pending_verifications_count(),
"expired_tokens_cleaned": await database.cleanup_expired_verification_tokens()
}
return create_success_response(stats)
except Exception as e:
logger.error(f"❌ Error getting verification stats: {e}")
return JSONResponse(
status_code=500,
content=create_error_response("STATS_ERROR", str(e))
)
@api_router.post("/admin/cleanup-verifications")
async def cleanup_verification_tokens(
current_user = Depends(get_current_user),
database: RedisDatabase = Depends(get_database)
):
"""Manually trigger cleanup of expired verification tokens (admin only)"""
try:
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Admin access required")
cleaned_count = await database.cleanup_expired_verification_tokens()
logger.info(f"🧹 Manual cleanup completed by admin {current_user.id}: {cleaned_count} tokens cleaned")
return create_success_response({
"message": f"Cleanup completed. Removed {cleaned_count} expired verification tokens.",
"cleaned_count": cleaned_count
})
except Exception as e:
logger.error(f"❌ Error in manual cleanup: {e}")
return JSONResponse(
status_code=500,
content=create_error_response("CLEANUP_ERROR", str(e))
)
@api_router.get("/admin/pending-verifications")
async def get_pending_verifications(
current_user = Depends(get_current_user),
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
database: RedisDatabase = Depends(get_database)
):
"""Get list of pending email verifications (admin only)"""
try:
if not current_user.is_admin:
raise HTTPException(status_code=403, detail="Admin access required")
pattern = "email_verification:*"
cursor = 0
pending_verifications = []
current_time = datetime.now(timezone.utc)
while True:
cursor, keys = await database.redis.scan(cursor, match=pattern, count=100)
for key in keys:
token_data = await database.redis.get(key)
if token_data:
verification_info = json.loads(token_data)
if not verification_info.get("verified", False):
expires_at = datetime.fromisoformat(verification_info.get("expires_at", ""))
pending_verifications.append({
"email": verification_info.get("email"),
"user_type": verification_info.get("user_type"),
"created_at": verification_info.get("created_at"),
"expires_at": verification_info.get("expires_at"),
"is_expired": current_time > expires_at,
"resend_count": verification_info.get("resend_count", 0)
})
if cursor == 0:
break
# Sort by creation date (newest first)
pending_verifications.sort(key=lambda x: x["created_at"], reverse=True)
# Apply pagination
total = len(pending_verifications)
start = (page - 1) * limit
end = start + limit
paginated_verifications = pending_verifications[start:end]
paginated_response = create_paginated_response(
paginated_verifications,
page, limit, total
)
return create_success_response(paginated_response)
except Exception as e:
logger.error(f"❌ Error getting pending verifications: {e}")
return JSONResponse(
status_code=500,
content=create_error_response("FETCH_ERROR", str(e))
)
# ============================
# Health Check and Info Endpoints
# ============================
@ -2342,8 +2566,8 @@ async def get_redis() -> redis.Redis:
return redis_manager.get_client()
@app.get("/health")
async def enhanced_health_check():
"""Enhanced health check endpoint"""
async def health_check():
"""Health check endpoint"""
try:
database = db_manager.get_database()
if not redis_manager.redis:

View File

@ -401,7 +401,8 @@ class BaseUser(BaseModel):
last_login: Optional[datetime] = Field(None, alias="lastLogin")
profile_image: Optional[str] = Field(None, alias="profileImage")
status: UserStatus
is_admin: bool = Field(default=False, alias="isAdmin")
model_config = {
"populate_by_name": True, # Allow both field names and aliases
"use_enum_values": True # Use enum values instead of names