backstory/src/backend/device_manager.py

87 lines
3.6 KiB
Python

from fastapi import FastAPI, HTTPException, Depends, Query, Path, Body, status, APIRouter, Request, BackgroundTasks
from database.manager import RedisDatabase
import hashlib
from logger import logger
from datetime import datetime, timezone
from user_agents import parse
import json
class DeviceManager:
def __init__(self, database: RedisDatabase):
self.database = database
def generate_device_fingerprint(self, request: Request) -> str:
"""Generate device fingerprint from request"""
user_agent = request.headers.get("user-agent", "")
ip_address = request.client.host if request.client else "unknown"
accept_language = request.headers.get("accept-language", "")
# Create fingerprint
fingerprint_data = f"{user_agent}|{accept_language}"
fingerprint = hashlib.sha256(fingerprint_data.encode()).hexdigest()[:16]
return fingerprint
def parse_device_info(self, request: Request) -> dict:
"""Parse device information from request"""
user_agent_string = request.headers.get("user-agent", "")
user_agent = parse(user_agent_string)
return {
"device_id": self.generate_device_fingerprint(request),
"device_name": f"{user_agent.browser.family} on {user_agent.os.family}",
"browser": user_agent.browser.family,
"browser_version": user_agent.browser.version_string,
"os": user_agent.os.family,
"os_version": user_agent.os.version_string,
"ip_address": request.client.host if request.client else "unknown",
"user_agent": user_agent_string
}
async def is_trusted_device(self, user_id: str, device_id: str) -> bool:
"""Check if device is trusted for user"""
try:
key = f"trusted_device:{user_id}:{device_id}"
exists = await self.database.redis.exists(key)
return exists > 0
except Exception as e:
logger.error(f"Error checking trusted device: {e}")
return False
async def add_trusted_device(self, user_id: str, device_id: str, device_info: dict):
"""Add device to trusted devices"""
try:
key = f"trusted_device:{user_id}:{device_id}"
device_data = {
**device_info,
"added_at": datetime.now(timezone.utc).isoformat(),
"last_used": datetime.now(timezone.utc).isoformat()
}
# Store for 90 days
await self.database.redis.setex(
key,
90 * 24 * 60 * 60, # 90 days in seconds
json.dumps(device_data, default=str)
)
logger.info(f"🔒 Added trusted device {device_id} for user {user_id}")
except Exception as e:
logger.error(f"Error adding trusted device: {e}")
async def update_device_last_used(self, user_id: str, device_id: str):
"""Update last used timestamp for device"""
try:
key = f"trusted_device:{user_id}:{device_id}"
device_data = await self.database.redis.get(key)
if device_data:
device_info = json.loads(device_data)
device_info["last_used"] = datetime.now(timezone.utc).isoformat()
await self.database.redis.setex(
key,
90 * 24 * 60 * 60, # Reset 90 day expiry
json.dumps(device_info, default=str)
)
except Exception as e:
logger.error(f"Error updating device last used: {e}")