""" Authentication and name management for the AI Voice Bot server. This module handles password hashing, name protection, and user authentication. Extracted from main.py to improve maintainability and separation of concerns. """ import hashlib import binascii import secrets import os import threading from typing import Dict, Optional, Tuple # Import shared models try: # Try relative import first (when running as part of the package) from ...shared.models import NamePasswordRecord except ImportError: try: # Try absolute import (when running directly) import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from shared.models import NamePasswordRecord except ImportError: # Fallback: create minimal model for testing from pydantic import BaseModel class NamePasswordRecord(BaseModel): name: str password: str from logger import logger class AuthManager: """Manages user authentication and name protection""" def __init__(self, save_file: str = "sessions.json"): # Mapping of reserved names to password records (lowercased name -> {salt:..., hash:...}) self.name_passwords: Dict[str, Dict[str, str]] = {} self.lock = threading.RLock() self._save_file = save_file self._loaded = False def _hash_password(self, password: str, salt_hex: Optional[str] = None) -> Tuple[str, str]: """Return (salt_hex, hash_hex) for the given password. If salt_hex is provided it is used; otherwise a new salt is generated.""" if salt_hex: salt = binascii.unhexlify(salt_hex) else: salt = secrets.token_bytes(16) salt_hex = binascii.hexlify(salt).decode() dk = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, 100000) hash_hex = binascii.hexlify(dk).decode() return salt_hex, hash_hex def set_password(self, name: str, password: str) -> None: """Set password for a name""" lname = name.lower() salt, hash_hex = self._hash_password(password) with self.lock: self.name_passwords[lname] = {"salt": salt, "hash": hash_hex} logger.info(f"Password set for name: {name}") def clear_password(self, name: str) -> bool: """Clear password for a name. Returns True if password existed.""" lname = name.lower() with self.lock: if lname in self.name_passwords: del self.name_passwords[lname] logger.info(f"Password cleared for name: {name}") return True return False def verify_password(self, name: str, password: str) -> bool: """Verify password for a name""" lname = name.lower() with self.lock: saved_pw = self.name_passwords.get(lname) if not saved_pw: return False salt = saved_pw.get("salt") if not salt: return False _, candidate_hash = self._hash_password(password, salt_hex=salt) return candidate_hash == saved_pw.get("hash") def is_name_protected(self, name: str) -> bool: """Check if a name is protected by a password""" lname = name.lower() with self.lock: return lname in self.name_passwords def check_name_takeover(self, name: str, password: Optional[str]) -> Tuple[bool, str]: """ Check if name takeover is allowed. Returns: (allowed: bool, reason: str) """ lname = name.lower() with self.lock: saved_pw = self.name_passwords.get(lname) # If no password is saved and no password provided, allow takeover if not saved_pw and not password: return True, "Name takeover allowed (no password protection)" # If password is saved but none provided, deny if saved_pw and not password: return False, "Password required for protected name" # If password is provided and saved, verify if saved_pw and password: if self.verify_password(name, password): return True, "Password verified for name takeover" else: return False, "Invalid password for name takeover" # If no saved password but password provided, allow (sets new password) if not saved_pw and password: return True, "Name takeover allowed with new password" return False, "Unknown error in name takeover check" def get_all_protected_names(self) -> Dict[str, NamePasswordRecord]: """Get all protected names for admin purposes""" with self.lock: return { name: NamePasswordRecord(**record) for name, record in self.name_passwords.items() } def load_from_payload(self, payload_name_passwords: Dict[str, NamePasswordRecord]) -> None: """Load name passwords from session payload""" with self.lock: self.name_passwords.clear() for name, rec in payload_name_passwords.items(): self.name_passwords[name] = {"salt": rec.salt, "hash": rec.hash} logger.info(f"Loaded {len(self.name_passwords)} protected names") def get_save_data(self) -> Dict[str, NamePasswordRecord]: """Get data for saving to disk""" with self.lock: return { name: NamePasswordRecord(**record) for name, record in self.name_passwords.items() } def get_protection_count(self) -> int: """Get number of protected names""" with self.lock: return len(self.name_passwords) def validate_integrity(self) -> list[str]: """Validate auth data integrity and return list of issues""" issues = [] with self.lock: for name, record in self.name_passwords.items(): if not isinstance(record, dict): issues.append(f"Name '{name}' has invalid record type: {type(record)}") continue if "salt" not in record or "hash" not in record: issues.append(f"Name '{name}' missing salt or hash") continue try: # Verify salt and hash are valid hex binascii.unhexlify(record["salt"]) binascii.unhexlify(record["hash"]) except (ValueError, binascii.Error): issues.append(f"Name '{name}' has invalid salt or hash format") return issues