import sys import json import os import piexif import sqlite3 from sqlite3 import Error from PIL import Image import numpy as np import functools from ketrface.util import * from ketrface.dbscan import * from ketrface.db import * from ketrface.config import * MAX_DISTANCE_FROM_CENTROID = 0.14 # Used to prune outliers config = read_config() html_path = merge_config_path(config['path'], 'frontend') pictures_path = merge_config_path(config['path'], config['picturesPath']) faces_path = merge_config_path(config['path'], config['facesPath']) db_path = merge_config_path(config['path'], config["db"]["photos"]["host"]) html_base = config['basePath'] if html_base == "/": html_base = "." def update_cluster_averages(identities): for identity in identities: average = [] for face in identity['faces']: if len(average) == 0: average = face['descriptors'] else: average = np.add(average, face['descriptors']) average = np.divide(average, len(identity['faces'])) identity['descriptors'] = average identity['sqrtsummul'] = np.sqrt(np.sum(np.multiply( average, average))) return identities def sort_identities(identities): identities.sort(reverse = True, key = lambda x: len(x['faces'])) for identity in identities: identity['faces'].sort(reverse = False, key = lambda x: x['distance']) def cluster_sort(A, B): diff = A['cluster'] - B['cluster'] if diff > 0: return 1 elif diff < 0: return -1 diff = A['confidence'] - B['confidence'] if diff > 0: return 1 elif diff < 0: return -1 return 0 def load_identities(db_path): conn = create_connection(db_path) identities = [] with conn: cur = conn.cursor() res = cur.execute(''' SELECT identities.id as identityId, identities.displayName as displayName, identities.descriptors as descriptors, COUNT(faces.id) AS faceCount FROM identities JOIN faces ON faces.identityId=identities.id GROUP BY identities.id ''') for row in res.fetchall(): identityId, displayName, descriptors, faceCount = row identity = { 'identityId': identityId, 'displayName': displayName, 'descriptors': np.frombuffer(descriptors), 'faceCount': faceCount, 'updated': False } # Pre-bake computations for cosine distance identity['sqrtsummul'] = np.sqrt(np.sum(np.multiply( identity['descriptors'], identity['descriptors']))) identities.append(identity) return identities def find_identity(identities, identityId): for element in identities: if element['identityId'] == identityId: return element raise Exception(f'Identity {identityId} missing') def load_doppelganger_photos(db_path): conn = create_connection(db_path) photos = {} with conn: cur = conn.cursor() res = cur.execute(''' SELECT f1.identityId AS identityId, photos.id AS photoId, f1.id AS f1_id, f2.id AS f2_id, f1_descriptors.descriptors AS f1_descriptors, f2_descriptors.descriptors AS f2_descriptors FROM faces AS f1 INNER JOIN faces AS f2 ON ( f2.identityId=f1.identityId AND f1.photoId=f2.photoId and f1_id!=f2_id) INNER JOIN photos ON ( photos.duplicate == 0 OR photos.duplicate IS NULL) INNER JOIN facedescriptors AS f1_descriptors ON ( f1.descriptorId=f1_descriptors.id) INNER JOIN facedescriptors AS f2_descriptors ON ( f2.descriptorId=f2_descriptors.id) WHERE f1.identityId IS NOT NULL AND f1.photoId=photos.id ORDER BY photos.id,f1.identityId ''') for row in res.fetchall(): identityId, photoId, f1_id, f2_id,f1_descriptors, f2_descriptors = row face1 = { 'id': f1_id, 'type': 'face', 'distance': 0, 'descriptors': np.frombuffer(f1_descriptors), 'cluster': identityId, # Undefined from dbscan.py } face1['sqrtsummul'] = np.sqrt(np.sum(np.multiply( face1['descriptors'], face1['descriptors']))) face2 = { 'id': f2_id, 'type': 'face', 'distance': 0, 'descriptors': np.frombuffer(f2_descriptors), 'cluster': identityId, # Undefined from dbscan.py } face2['sqrtsummul'] = np.sqrt(np.sum(np.multiply( face2['descriptors'], face2['descriptors']))) if photoId not in photos: photos[photoId] = { 'photoId': photoId, 'dopplegangers': {} } if identityId not in photos[photoId]['dopplegangers']: photos[photoId]['dopplegangers'][identityId] = { 'identity': None, 'faces': [] } faceList = photos[photoId]['dopplegangers'][identityId]['faces'] for face in [ face1, face2 ]: match = False for key in faceList: if face['id'] == key['id']: match = True break if not match: faceList.append(face) return photos def remove_face_from_identity(identity, face): identity['updated'] = True average = identity['descriptors'] average = np.dot(average, identity['faceCount']) average = np.subtract(average, face['descriptors']) identity['faceCount'] -= 1 average = np.divide(average, identity['faceCount']) identity['descriptors'] = average identity['sqrtsummul'] = np.sqrt(np.sum(np.multiply( average, average))) face['identity'] = None face['identityId'] = -1 face['distance'] = 0 print('Loading identities from database...') identities = load_identities(db_path) print(f'{len(identities)} identities loaded.') print('Loading dopplegangers from database...') photos = load_doppelganger_photos(db_path) print(f'{len(photos)} photos with dopplegangers loaded.') print('Binding dopplegangers to identities...') face_updates = [] identity_updates = [] for photoId in photos: photo = photos[photoId] print(f'Processing photo {photoId}...') for identityId in photo['dopplegangers']: if photo['dopplegangers'][identityId]['identity'] == None: photo['dopplegangers'][identityId]['identity'] = find_identity(identities, identityId) faces = photo['dopplegangers'][identityId]['faces'] identity = photo['dopplegangers'][identityId]['identity'] for face in faces: face['identity'] = identity face['distance'] = findCosineDistanceBaked(face, identity) faces.sort(reverse = False, key = lambda x: x['distance']) # First face closest to identity -- it stays with the photo faces = faces[1:] # for i, face in enumerate(faces): remove_face_from_identity(identity, face) identity_updates.append(identity) min = None for j, potential in enumerate(identities): if potential == identity: continue distance = findCosineDistanceBaked(face, potential) if distance > MAX_DISTANCE_FROM_CENTROID: continue if min == None or distance < min: face["distance"] = distance face['identity'] = potential face['identityId'] = potential['identityId'] face_updates.append(face) if face['identity'] != None: print(f' {i+1}: {face["id"]} moves from {identity["displayName"]} to:') print(f' {face["identity"]["displayName"]} with distance {face["distance"]}') else: print(f' {i+1}: {face["id"]} needs a new Identity.') conn = create_connection(db_path) with conn: cur = conn.cursor() for face in face_updates: print(f'Updating face {face["id"]} in DB') if face['identity'] == None: sql = ''' UPDATE faces SET identityId=NULL WHERE id=? ''' values=(face["id"], ) else: sql = ''' UPDATE faces SET identityId=? WHERE id=? ''' values=( face["identityId"], face["id"] ) cur.execute(sql, values) conn.commit()