import os from typing import Tuple from logger import logger from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import smtplib import asyncio from email_templates import EMAIL_TEMPLATES from datetime import datetime, timezone, timedelta import json from database import RedisDatabase class EmailService: def __init__(self): # Configure these in your .env file self.smtp_server = os.getenv("SMTP_SERVER") self.smtp_port = int(os.getenv("SMTP_PORT", "587")) self.email_user = os.getenv("EMAIL_USER") self.email_password = os.getenv("EMAIL_PASSWORD") self.from_name = os.getenv("FROM_NAME", "Backstory") self.app_name = os.getenv("APP_NAME", "Backstory") self.frontend_url = os.getenv("FRONTEND_URL", "https://backstory-beta.ketrenos.com") if not self.smtp_server or self.smtp_port == 0 or self.email_user is None or self.email_password is None: raise ValueError("SMTP configuration is not set in the environment variables") def _get_template(self, template_name: str) -> dict: """Get email template by name""" return EMAIL_TEMPLATES.get(template_name, {}) def _format_template(self, template: str, **kwargs) -> str: """Format template with provided variables""" return template.format( app_name=self.app_name, from_name=self.from_name, frontend_url=self.frontend_url, **kwargs ) async def send_verification_email( self, to_email: str, verification_token: str, user_name: str, user_type: str = "user" ): """Send email verification email using template""" try: template = self._get_template("verification") verification_link = f"{self.frontend_url}/login/verify-email?token={verification_token}" subject = self._format_template( template["subject"], user_name=user_name, to_email=to_email ) html_content = self._format_template( template["html"], user_name=user_name, user_type=user_type, to_email=to_email, verification_link=verification_link ) await self._send_email(to_email, subject, html_content) logger.info(f"📧 Verification email sent to {to_email}") except Exception as e: logger.error(f"❌ Failed to send verification email to {to_email}: {e}") raise async def send_mfa_email( self, to_email: str, mfa_code: str, device_name: str, user_name: str, ip_address: str = "Unknown" ): """Send MFA code email using template""" try: template = self._get_template("mfa") login_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") subject = self._format_template(template["subject"]) html_content = self._format_template( template["html"], user_name=user_name, device_name=device_name, ip_address=ip_address, login_time=login_time, mfa_code=mfa_code, to_email=to_email ) await self._send_email(to_email, subject, html_content) logger.info(f"📧 MFA code sent to {to_email} for device {device_name}") except Exception as e: logger.error(f"❌ Failed to send MFA email to {to_email}: {e}") raise async def send_password_reset_email( self, to_email: str, reset_token: str, user_name: str ): """Send password reset email using template""" try: template = self._get_template("password_reset") reset_link = f"{self.frontend_url}/login/reset-password?token={reset_token}" subject = self._format_template(template["subject"]) html_content = self._format_template( template["html"], user_name=user_name, reset_link=reset_link, to_email=to_email ) await self._send_email(to_email, subject, html_content) logger.info(f"📧 Password reset email sent to {to_email}") except Exception as e: logger.error(f"❌ Failed to send password reset email to {to_email}: {e}") raise async def _send_email(self, to_email: str, subject: str, html_content: str): """Send email using SMTP with improved error handling""" try: if not self.email_user: raise ValueError("Email user is not configured") # Create message msg = MIMEMultipart('alternative') msg['From'] = f"{self.from_name} <{self.email_user}>" msg['To'] = to_email msg['Subject'] = subject msg['Reply-To'] = self.email_user # Add HTML content html_part = MIMEText(html_content, 'html', 'utf-8') msg.attach(html_part) # Send email with connection pooling and retry logic max_retries = 3 if not self.smtp_server or self.smtp_port == 0 or not self.email_user or not self.email_password: raise ValueError("SMTP configuration is not set in the environment variables") for attempt in range(max_retries): try: with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: server.starttls() server.login(self.email_user, self.email_password) text = msg.as_string() server.sendmail(self.email_user, to_email, text) break # Success, exit retry loop except smtplib.SMTPException as e: if attempt == max_retries - 1: # Last attempt raise logger.warning(f"⚠️ SMTP attempt {attempt + 1} failed, retrying: {e}") await asyncio.sleep(1) # Wait before retry logger.debug(f"📧 Email sent successfully to {to_email}") except Exception as e: logger.error(f"❌ SMTP error sending to {to_email}: {e}") raise class EmailRateLimiter: def __init__(self, database: RedisDatabase): self.database = database async def can_send_email(self, email: str, email_type: str, limit: int = 5, window_minutes: int = 60) -> bool: """Check if email can be sent based on rate limiting""" try: key = f"email_rate_limit:{email_type}:{email.lower()}" current_time = datetime.now(timezone.utc) window_start = current_time - timedelta(minutes=window_minutes) # Get current count count_data = await self.database.redis.get(key) if not count_data: # First email, allow it await self._record_email_sent(key, current_time, window_minutes) return True email_records = json.loads(count_data) # Filter out old records recent_records = [ record for record in email_records if datetime.fromisoformat(record) > window_start ] if len(recent_records) >= limit: logger.warning(f"⚠️ Email rate limit exceeded for {email} ({email_type})") return False # Add current email to records recent_records.append(current_time.isoformat()) await self.database.redis.setex( key, window_minutes * 60, json.dumps(recent_records) ) return True except Exception as e: logger.error(f"❌ Error checking email rate limit: {e}") # On error, allow the email to be safe return True async def _record_email_sent(self, key: str, timestamp: datetime, ttl_minutes: int): """Record that an email was sent""" await self.database.redis.setex( key, ttl_minutes * 60, json.dumps([timestamp.isoformat()]) ) class VerificationEmailRateLimiter: def __init__(self, database: RedisDatabase): self.database = database self.max_attempts_per_hour = 3 # Maximum 3 emails per hour self.max_attempts_per_day = 10 # Maximum 10 emails per day self.cooldown_minutes = 5 # 5 minute cooldown between emails async def can_send_verification_email(self, email: str) -> Tuple[bool, str]: """ Check if verification email can be sent based on rate limiting Returns (can_send, reason_if_not) """ try: email_lower = email.lower() current_time = datetime.now(timezone.utc) # Check daily limit daily_count = await self.database.get_verification_attempts_count(email) if daily_count >= self.max_attempts_per_day: return False, f"Daily limit reached. You can request up to {self.max_attempts_per_day} verification emails per day." # Check hourly limit hour_ago = current_time - timedelta(hours=1) hourly_key = f"verification_attempts:{email_lower}" data = await self.database.redis.get(hourly_key) if data: attempts_data = json.loads(data) recent_attempts = [ attempt for attempt in attempts_data if datetime.fromisoformat(attempt) > hour_ago ] if len(recent_attempts) >= self.max_attempts_per_hour: return False, f"Hourly limit reached. You can request up to {self.max_attempts_per_hour} verification emails per hour." # Check cooldown period if recent_attempts: last_attempt = max(datetime.fromisoformat(attempt) for attempt in recent_attempts) time_since_last = current_time - last_attempt if time_since_last.total_seconds() < self.cooldown_minutes * 60: remaining_minutes = self.cooldown_minutes - int(time_since_last.total_seconds() / 60) return False, f"Please wait {remaining_minutes} more minute(s) before requesting another email." return True, "OK" except Exception as e: logger.error(f"❌ Error checking verification email rate limit: {e}") # On error, be conservative and deny return False, "Rate limit check failed. Please try again later." async def record_email_sent(self, email: str): """Record that a verification email was sent""" await self.database.record_verification_attempt(email) email_service = EmailService()