import sys import json import os import piexif import sqlite3 from sqlite3 import Error from PIL import Image import numpy as np import functools from ketrface.util import * from ketrface.dbscan import * from ketrface.db import * from ketrface.config import * config = read_config() html_path = merge_config_path(config['path'], 'frontend') pictures_path = merge_config_path(config['path'], config['picturesPath']) faces_path = merge_config_path(config['path'], config['facesPath']) db_path = merge_config_path(config['path'], config["db"]["photos"]["host"]) html_base = config['basePath'] if html_base == "/": html_base = "." MAX_CLUSTER_DISTANCE = 0.14 # Used to merge clusters MAX_DISTANCE_FROM_CENTROID = 0.14 # Used to prune outliers def gen_html(identities): for identity in identities: print('
') print(f'
Identity {identity["id"]} has {len(identity["faces"])}
') print('
') for face in identity['faces']: faceId = face['id'] photoId = face['photoId'] distance = "{:0.4f}".format(face['distance']) confidence = "{:0.3f}".format(face['confidence']) focus = int(face['focus']) label = face['cluster'] if type(label) != str: label = f'Cluster ({face["cluster"]["id"]})' print('
') path = f'{html_base}/faces/{"{:02d}".format(faceId % 100)}' print(f'') print(f'
{label}: {distance}
') print(f'
{faceId} {photoId} {confidence} {focus}
') print('
') print('
') print('
') def update_cluster_averages(identities): for identity in identities: average = [] for face in identity['faces']: if len(average) == 0: average = face['descriptors'] else: average = np.add(average, face['descriptors']) average = np.divide(average, len(identity['faces'])) identity['descriptors'] = average identity['sqrtsummul'] = np.sqrt(np.sum(np.multiply( average, average))) return identities def update_distances(identities, prune = False, maxDistance = MAX_DISTANCE_FROM_CENTROID): removed = 0 for identity in identities: for face in identity['faces']: average = identity['descriptors'] distance = findCosineDistanceBaked(identity, face) if prune and distance > maxDistance: average = np.dot(average, len(identity['faces'])) average = np.subtract(average, face['descriptors']) face['cluster'] = Undefined face['distance'] = 0 identity['faces'].remove(face) average = np.divide(average, len(identity['faces'])) identity['descriptors'] = average identity['sqrtsummul'] = np.sqrt(np.sum(np.multiply( average, average))) removed += 1 else: face['distance'] = distance return removed def sort_identities(identities): identities.sort(reverse = True, key = lambda x: len(x['faces'])) for identity in identities: identity['faces'].sort(reverse = False, key = lambda x: x['distance']) def cluster_sort(A, B): diff = A['cluster'] - B['cluster'] if diff > 0: return 1 elif diff < 0: return -1 diff = A['confidence'] - B['confidence'] if diff > 0: return 1 elif diff < 0: return -1 return 0 def build_straglers(faces): noise = [] undefined = [] for face in faces: if face['cluster'] == Noise: noise.append(face) elif face['cluster'] == Undefined: undefined.append(face) return noise + undefined print('Loading faces from database') faces = load_faces(db_path = db_path) minPts = max(len(faces) / 500, 5) eps = 0.185 print(f'Scanning {len(faces)} faces for clusters (minPts: {minPts}, eps: {eps})') identities = DBSCAN(faces, minPts = minPts, eps = eps) print(f'{len(identities)} clusters grouped') # Compute average center for all clusters identities = update_cluster_averages(identities) epoch_prune = True merge_identities = True if epoch_prune: removed = -1 epoch = 1 # Filter each cluster removing any face that is > cluster_max_distance # from the average center point of the cluster while removed != 0: print(f'Epoch {epoch}...') epoch += 1 removed = update_distances( identities, prune = True, maxDistance = MAX_DISTANCE_FROM_CENTROID) if removed > 0: print(f'Excluded {removed} faces this epoch') print(f'{len(identities)} identities seeded.') reduced = identities if merge_identities: # Cluster the clusters... print('Reducing clusters via DBSCAN') reduced = DBSCAN(identities, eps = MAX_CLUSTER_DISTANCE, minPts = 3) if len(reduced) == 0: reduced = identities # For each cluster, merge the lists of faces referenced in the cluster's # "faces" field, which is pointing to clusters (and not actual faces) for cluster in reduced: merged = [] for identity in cluster['faces']: merged = merged + identity['faces'] cluster['faces'] = merged if False: # Creating a set containing those faces which have not been bound # to an identity to recluster them in isolation from the rest of # the faces straglers = build_straglers(faces) reduced = reduced + DBSCAN(straglers) # Build a final cluster with all remaining uncategorized faces if False: remaining_cluster = { 'id': len(reduced) + 1, 'distance': 0, 'descriptors': [], 'cluster': Undefined, 'faces': [] } straglers = build_straglers(faces) for face in straglers: face['cluster'] = remaining_cluster remaining_cluster['faces'].append(face) reduced.append(remaining_cluster) # Give all merged identity lists a unique ID for id, identity in enumerate(reduced): identity['id'] = id for face in identity['faces']: face['cluster'] = identity reduced = update_cluster_averages(reduced) update_distances(reduced) sort_identities(reduced) if False: # This generates a set of differences between clusters and makes # a recommendation to merge clusters (outside of DBSCAN) # # Worth testing on larger data set for i, A in enumerate(reduced): for k, B in enumerate(reduced): if k < i: continue if A == B: continue distance = findCosineDistanceBaked(A, B) if distance < MAX_CLUSTER_DISTANCE: distance = "{:0.4f}".format(distance) print(f'{A["id"]} to {B["id"]} = {distance}: MERGE') print('Writing to "auto-clusters.html"') redirect_on(os.path.join(html_path, 'auto-clusters.html')) gen_html(reduced) redirect_off() print(f'Connecting to database: {db_path}') conn = create_connection(db_path) with conn: for identity in reduced: print(f'Writing identity {identity["id"]} to DB') id = create_identity(conn, identity) for face in identity['faces']: update_face_identity(conn, face['id'], id)