183 lines
6.7 KiB
Python
183 lines
6.7 KiB
Python
"""
|
|
Authentication and name management for the AI Voice Bot server.
|
|
|
|
This module handles password hashing, name protection, and user authentication.
|
|
Extracted from main.py to improve maintainability and separation of concerns.
|
|
"""
|
|
|
|
import hashlib
|
|
import binascii
|
|
import secrets
|
|
import threading
|
|
from typing import Dict, Optional, Tuple
|
|
|
|
# Import shared models
|
|
try:
|
|
# Try relative import first (when running as part of the package)
|
|
from ...shared.models import NamePasswordRecord
|
|
except ImportError:
|
|
try:
|
|
# Try direct import (when PYTHONPATH is set)
|
|
from shared.models import NamePasswordRecord
|
|
except ImportError:
|
|
# Log a warning for debugging (optional)
|
|
import warnings
|
|
|
|
warnings.warn(
|
|
"Relative import failed, ensure PYTHONPATH includes project root or run as package"
|
|
)
|
|
# Rely on environment setup or raise a clear error
|
|
raise ImportError(
|
|
"Cannot import shared.models. Ensure the project is run as a package or PYTHONPATH is set."
|
|
)
|
|
|
|
from logger import logger
|
|
|
|
|
|
class AuthManager:
|
|
"""Manages user authentication and name protection"""
|
|
|
|
def __init__(self, save_file: str = "sessions.json"):
|
|
# Mapping of reserved names to password records (lowercased name -> {salt:..., hash:...})
|
|
self.name_passwords: Dict[str, Dict[str, str]] = {}
|
|
self.lock = threading.RLock()
|
|
self._save_file = save_file
|
|
self._loaded = False
|
|
|
|
def _hash_password(self, password: str, salt_hex: Optional[str] = None) -> Tuple[str, str]:
|
|
"""Return (salt_hex, hash_hex) for the given password. If salt_hex is provided
|
|
it is used; otherwise a new salt is generated."""
|
|
if salt_hex:
|
|
salt = binascii.unhexlify(salt_hex)
|
|
else:
|
|
salt = secrets.token_bytes(16)
|
|
salt_hex = binascii.hexlify(salt).decode()
|
|
dk = hashlib.pbkdf2_hmac("sha256", password.encode("utf-8"), salt, 100000)
|
|
hash_hex = binascii.hexlify(dk).decode()
|
|
return salt_hex, hash_hex
|
|
|
|
def set_password(self, name: str, password: str) -> None:
|
|
"""Set password for a name"""
|
|
lname = name.lower()
|
|
salt, hash_hex = self._hash_password(password)
|
|
|
|
with self.lock:
|
|
self.name_passwords[lname] = {"salt": salt, "hash": hash_hex}
|
|
|
|
logger.info(f"Password set for name: {name}")
|
|
|
|
def clear_password(self, name: str) -> bool:
|
|
"""Clear password for a name. Returns True if password existed."""
|
|
lname = name.lower()
|
|
|
|
with self.lock:
|
|
if lname in self.name_passwords:
|
|
del self.name_passwords[lname]
|
|
logger.info(f"Password cleared for name: {name}")
|
|
return True
|
|
return False
|
|
|
|
def verify_password(self, name: str, password: str) -> bool:
|
|
"""Verify password for a name"""
|
|
lname = name.lower()
|
|
|
|
with self.lock:
|
|
saved_pw = self.name_passwords.get(lname)
|
|
if not saved_pw:
|
|
return False
|
|
|
|
salt = saved_pw.get("salt")
|
|
if not salt:
|
|
return False
|
|
|
|
_, candidate_hash = self._hash_password(password, salt_hex=salt)
|
|
return candidate_hash == saved_pw.get("hash")
|
|
|
|
def is_name_protected(self, name: str) -> bool:
|
|
"""Check if a name is protected by a password"""
|
|
lname = name.lower()
|
|
with self.lock:
|
|
return lname in self.name_passwords
|
|
|
|
def check_name_takeover(self, name: str, password: Optional[str]) -> Tuple[bool, str]:
|
|
"""
|
|
Check if name takeover is allowed.
|
|
|
|
Returns:
|
|
(allowed: bool, reason: str)
|
|
"""
|
|
lname = name.lower()
|
|
|
|
with self.lock:
|
|
saved_pw = self.name_passwords.get(lname)
|
|
|
|
# If no password is saved and no password provided, allow takeover
|
|
if not saved_pw and not password:
|
|
return True, "Name takeover allowed (no password protection)"
|
|
|
|
# If password is saved but none provided, deny
|
|
if saved_pw and not password:
|
|
return False, "Password required for protected name"
|
|
|
|
# If password is provided and saved, verify
|
|
if saved_pw and password:
|
|
if self.verify_password(name, password):
|
|
return True, "Password verified for name takeover"
|
|
else:
|
|
return False, "Invalid password for name takeover"
|
|
|
|
# If no saved password but password provided, allow (sets new password)
|
|
if not saved_pw and password:
|
|
return True, "Name takeover allowed with new password"
|
|
|
|
return False, "Unknown error in name takeover check"
|
|
|
|
def get_all_protected_names(self) -> Dict[str, NamePasswordRecord]:
|
|
"""Get all protected names for admin purposes"""
|
|
with self.lock:
|
|
return {
|
|
name: NamePasswordRecord(**record)
|
|
for name, record in self.name_passwords.items()
|
|
}
|
|
|
|
def load_from_payload(self, payload_name_passwords: Dict[str, NamePasswordRecord]) -> None:
|
|
"""Load name passwords from session payload"""
|
|
with self.lock:
|
|
self.name_passwords.clear()
|
|
for name, rec in payload_name_passwords.items():
|
|
self.name_passwords[name] = {"salt": rec.salt, "hash": rec.hash}
|
|
|
|
logger.info(f"Loaded {len(self.name_passwords)} protected names")
|
|
|
|
def get_save_data(self) -> Dict[str, NamePasswordRecord]:
|
|
"""Get data for saving to disk"""
|
|
with self.lock:
|
|
return {
|
|
name: NamePasswordRecord(**record)
|
|
for name, record in self.name_passwords.items()
|
|
}
|
|
|
|
def get_protection_count(self) -> int:
|
|
"""Get number of protected names"""
|
|
with self.lock:
|
|
return len(self.name_passwords)
|
|
|
|
def validate_integrity(self) -> list[str]:
|
|
"""Validate auth data integrity and return list of issues"""
|
|
issues: list[str] = []
|
|
|
|
with self.lock:
|
|
for name, record in self.name_passwords.items():
|
|
if "salt" not in record or "hash" not in record:
|
|
issues.append(f"Name '{name}' missing salt or hash")
|
|
continue
|
|
|
|
try:
|
|
# Verify salt and hash are valid hex
|
|
binascii.unhexlify(record["salt"])
|
|
binascii.unhexlify(record["hash"])
|
|
except (ValueError, binascii.Error):
|
|
issues.append(f"Name '{name}' has invalid salt or hash format")
|
|
|
|
return issues
|