ai-voicebot/server/core/auth_manager.py

169 lines
6.2 KiB
Python

"""
Authentication and name management for the AI Voice Bot server.
This module handles password hashing, name protection, and user authentication.
Extracted from main.py to improve maintainability and separation of concerns.
"""
import hashlib
import binascii
import secrets
import os
import threading
from typing import Dict, Optional, Tuple
# Import shared models
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from shared.models import NamePasswordRecord
from logger import logger
class AuthManager:
"""Manages user authentication and name protection"""
def __init__(self, save_file: str = "sessions.json"):
# Mapping of reserved names to password records (lowercased name -> {salt:..., hash:...})
self.name_passwords: Dict[str, Dict[str, str]] = {}
self.lock = threading.RLock()
self._save_file = save_file
self._loaded = False
def _hash_password(self, password: str, salt_hex: Optional[str] = None) -> Tuple[str, str]:
"""Return (salt_hex, hash_hex) for the given password. If salt_hex is provided
it is used; otherwise a new salt is generated."""
if salt_hex:
salt = binascii.unhexlify(salt_hex)
else:
salt = secrets.token_bytes(16)
salt_hex = binascii.hexlify(salt).decode()
dk = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, 100000)
hash_hex = binascii.hexlify(dk).decode()
return salt_hex, hash_hex
def set_password(self, name: str, password: str) -> None:
"""Set password for a name"""
lname = name.lower()
salt, hash_hex = self._hash_password(password)
with self.lock:
self.name_passwords[lname] = {"salt": salt, "hash": hash_hex}
logger.info(f"Password set for name: {name}")
def clear_password(self, name: str) -> bool:
"""Clear password for a name. Returns True if password existed."""
lname = name.lower()
with self.lock:
if lname in self.name_passwords:
del self.name_passwords[lname]
logger.info(f"Password cleared for name: {name}")
return True
return False
def verify_password(self, name: str, password: str) -> bool:
"""Verify password for a name"""
lname = name.lower()
with self.lock:
saved_pw = self.name_passwords.get(lname)
if not saved_pw:
return False
salt = saved_pw.get("salt")
if not salt:
return False
_, candidate_hash = self._hash_password(password, salt_hex=salt)
return candidate_hash == saved_pw.get("hash")
def is_name_protected(self, name: str) -> bool:
"""Check if a name is protected by a password"""
lname = name.lower()
with self.lock:
return lname in self.name_passwords
def check_name_takeover(self, name: str, password: Optional[str]) -> Tuple[bool, str]:
"""
Check if name takeover is allowed.
Returns:
(allowed: bool, reason: str)
"""
lname = name.lower()
with self.lock:
saved_pw = self.name_passwords.get(lname)
# If no password is saved and no password provided, allow takeover
if not saved_pw and not password:
return True, "Name takeover allowed (no password protection)"
# If password is saved but none provided, deny
if saved_pw and not password:
return False, "Password required for protected name"
# If password is provided and saved, verify
if saved_pw and password:
if self.verify_password(name, password):
return True, "Password verified for name takeover"
else:
return False, "Invalid password for name takeover"
# If no saved password but password provided, allow (sets new password)
if not saved_pw and password:
return True, "Name takeover allowed with new password"
return False, "Unknown error in name takeover check"
def get_all_protected_names(self) -> Dict[str, NamePasswordRecord]:
"""Get all protected names for admin purposes"""
with self.lock:
return {
name: NamePasswordRecord(**record)
for name, record in self.name_passwords.items()
}
def load_from_payload(self, payload_name_passwords: Dict[str, NamePasswordRecord]) -> None:
"""Load name passwords from session payload"""
with self.lock:
self.name_passwords.clear()
for name, rec in payload_name_passwords.items():
self.name_passwords[name] = {"salt": rec.salt, "hash": rec.hash}
logger.info(f"Loaded {len(self.name_passwords)} protected names")
def get_save_data(self) -> Dict[str, NamePasswordRecord]:
"""Get data for saving to disk"""
with self.lock:
return {
name: NamePasswordRecord(**record)
for name, record in self.name_passwords.items()
}
def get_protection_count(self) -> int:
"""Get number of protected names"""
with self.lock:
return len(self.name_passwords)
def validate_integrity(self) -> list[str]:
"""Validate auth data integrity and return list of issues"""
issues : list[str] = []
with self.lock:
for name, record in self.name_passwords.items():
if "salt" not in record or "hash" not in record:
issues.append(f"Name '{name}' missing salt or hash")
continue
try:
# Verify salt and hash are valid hex
binascii.unhexlify(record["salt"])
binascii.unhexlify(record["hash"])
except (ValueError, binascii.Error):
issues.append(f"Name '{name}' has invalid salt or hash format")
return issues