87 lines
3.4 KiB
Python
87 lines
3.4 KiB
Python
from fastapi import Request
|
|
from database.manager import RedisDatabase
|
|
import hashlib
|
|
from logger import logger
|
|
from datetime import datetime, timezone
|
|
from user_agents import parse
|
|
import json
|
|
|
|
|
|
class DeviceManager:
|
|
def __init__(self, database: RedisDatabase):
|
|
self.database = database
|
|
|
|
def generate_device_fingerprint(self, request: Request) -> str:
|
|
"""Generate device fingerprint from request"""
|
|
user_agent = request.headers.get("user-agent", "")
|
|
accept_language = request.headers.get("accept-language", "")
|
|
|
|
# Create fingerprint
|
|
fingerprint_data = f"{user_agent}|{accept_language}"
|
|
fingerprint = hashlib.sha256(fingerprint_data.encode()).hexdigest()[:16]
|
|
|
|
return fingerprint
|
|
|
|
def parse_device_info(self, request: Request) -> dict:
|
|
"""Parse device information from request"""
|
|
user_agent_string = request.headers.get("user-agent", "")
|
|
user_agent = parse(user_agent_string)
|
|
|
|
return {
|
|
"device_id": self.generate_device_fingerprint(request),
|
|
"device_name": f"{user_agent.browser.family} on {user_agent.os.family}",
|
|
"browser": user_agent.browser.family,
|
|
"browser_version": user_agent.browser.version_string,
|
|
"os": user_agent.os.family,
|
|
"os_version": user_agent.os.version_string,
|
|
"ip_address": request.client.host if request.client else "unknown",
|
|
"user_agent": user_agent_string,
|
|
}
|
|
|
|
async def is_trusted_device(self, user_id: str, device_id: str) -> bool:
|
|
"""Check if device is trusted for user"""
|
|
try:
|
|
key = f"trusted_device:{user_id}:{device_id}"
|
|
exists = await self.database.redis.exists(key)
|
|
return exists > 0
|
|
except Exception as e:
|
|
logger.error(f"Error checking trusted device: {e}")
|
|
return False
|
|
|
|
async def add_trusted_device(self, user_id: str, device_id: str, device_info: dict):
|
|
"""Add device to trusted devices"""
|
|
try:
|
|
key = f"trusted_device:{user_id}:{device_id}"
|
|
device_data = {
|
|
**device_info,
|
|
"added_at": datetime.now(timezone.utc).isoformat(),
|
|
"last_used": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
|
|
# Store for 90 days
|
|
await self.database.redis.setex(
|
|
key,
|
|
90 * 24 * 60 * 60, # 90 days in seconds
|
|
json.dumps(device_data, default=str),
|
|
)
|
|
|
|
logger.info(f"🔒 Added trusted device {device_id} for user {user_id}")
|
|
except Exception as e:
|
|
logger.error(f"Error adding trusted device: {e}")
|
|
|
|
async def update_device_last_used(self, user_id: str, device_id: str):
|
|
"""Update last used timestamp for device"""
|
|
try:
|
|
key = f"trusted_device:{user_id}:{device_id}"
|
|
device_data = await self.database.redis.get(key)
|
|
if device_data:
|
|
device_info = json.loads(device_data)
|
|
device_info["last_used"] = datetime.now(timezone.utc).isoformat()
|
|
await self.database.redis.setex(
|
|
key,
|
|
90 * 24 * 60 * 60, # Reset 90 day expiry
|
|
json.dumps(device_info, default=str),
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error updating device last used: {e}")
|