ketr.photos/ketrface/cluster.py
James Ketrenos 3e9438bb27 Switched back to port 80 (http) instead of https in the container
Signed-off-by: James Ketrenos <james_git@ketrenos.com>
2023-01-23 07:54:15 -08:00

233 lines
7.0 KiB
Python

import sys
import json
import os
import piexif
import sqlite3
from sqlite3 import Error
from PIL import Image
import numpy as np
import functools
from ketrface.util import *
from ketrface.dbscan import *
from ketrface.db import *
from ketrface.config import *
config = read_config()
html_path = merge_config_path(config['path'], 'frontend')
pictures_path = merge_config_path(config['path'], config['picturesPath'])
faces_path = merge_config_path(config['path'], config['facesPath'])
db_path = merge_config_path(config['path'], config["db"]["photos"]["host"])
html_base = config['basePath']
if html_base == "/":
html_base = "."
MAX_CLUSTER_DISTANCE = 0.14 # Used to merge clusters
MAX_DISTANCE_FROM_CENTROID = 0.14 # Used to prune outliers
def gen_html(identities):
for identity in identities:
print('<div>')
print(f'<div><b>Identity {identity["id"]} has {len(identity["faces"])}</b><br></div>')
print('<div>')
for face in identity['faces']:
faceId = face['id']
photoId = face['photoId']
distance = "{:0.4f}".format(face['distance'])
confidence = "{:0.3f}".format(face['confidence'])
focus = int(face['focus'])
label = face['cluster']
if type(label) != str:
label = f'Cluster ({face["cluster"]["id"]})'
print('<div style="position:relative;display:inline-flex;flex-direction:column">')
path = f'{html_base}/faces/{"{:02d}".format(faceId % 100)}'
print(f'<img src="{path}/{faceId}.jpg"/>')
print(f'<div style="background-color:rgba(255, 255, 255, 0.4);position:absolute;top:0px;left:0px;right:0px;padding:0.25rem">{label}: {distance}</div>')
print(f'<div style="background-color:rgba(255, 255, 255, 0.4);position:absolute;bottom:0px;left:0px;right:0px;padding:0.25rem">{faceId} {photoId} {confidence} {focus}</div>')
print('</div>')
print('</div>')
print('</div>')
def update_cluster_averages(identities):
for identity in identities:
average = []
for face in identity['faces']:
if len(average) == 0:
average = face['descriptors']
else:
average = np.add(average, face['descriptors'])
average = np.divide(average, len(identity['faces']))
identity['descriptors'] = average
identity['sqrtsummul'] = np.sqrt(np.sum(np.multiply(
average, average)))
return identities
def update_distances(identities,
prune = False,
maxDistance = MAX_DISTANCE_FROM_CENTROID):
removed = 0
for identity in identities:
for face in identity['faces']:
average = identity['descriptors']
distance = findCosineDistanceBaked(identity, face)
if prune and distance > maxDistance:
average = np.dot(average, len(identity['faces']))
average = np.subtract(average, face['descriptors'])
face['cluster'] = Undefined
face['distance'] = 0
identity['faces'].remove(face)
average = np.divide(average, len(identity['faces']))
identity['descriptors'] = average
identity['sqrtsummul'] = np.sqrt(np.sum(np.multiply(
average, average)))
removed += 1
else:
face['distance'] = distance
return removed
def sort_identities(identities):
identities.sort(reverse = True, key = lambda x: len(x['faces']))
for identity in identities:
identity['faces'].sort(reverse = False, key = lambda x: x['distance'])
def cluster_sort(A, B):
diff = A['cluster'] - B['cluster']
if diff > 0:
return 1
elif diff < 0:
return -1
diff = A['confidence'] - B['confidence']
if diff > 0:
return 1
elif diff < 0:
return -1
return 0
def build_straglers(faces):
noise = []
undefined = []
for face in faces:
if face['cluster'] == Noise:
noise.append(face)
elif face['cluster'] == Undefined:
undefined.append(face)
return noise + undefined
print('Loading faces from database')
faces = load_faces(db_path = db_path)
minPts = max(len(faces) / 500, 5)
eps = 0.25
#eps = 0.185
print(f'Scanning {len(faces)} faces for clusters (minPts: {minPts}, eps: {eps})')
identities = DBSCAN(faces, minPts = minPts, eps = eps)
print(f'{len(identities)} clusters grouped')
# Compute average center for all clusters
identities = update_cluster_averages(identities)
epoch_prune = True
merge_identities = True
if epoch_prune:
removed = -1
epoch = 1
# Filter each cluster removing any face that is > cluster_max_distance
# from the average center point of the cluster
while removed != 0:
print(f'Epoch {epoch}...')
epoch += 1
removed = update_distances(
identities,
prune = True,
maxDistance = MAX_DISTANCE_FROM_CENTROID)
if removed > 0:
print(f'Excluded {removed} faces this epoch')
print(f'{len(identities)} identities seeded.')
reduced = identities
if merge_identities:
# Cluster the clusters...
print('Reducing clusters via DBSCAN')
reduced = DBSCAN(identities, eps = MAX_CLUSTER_DISTANCE, minPts = 3)
if len(reduced) == 0:
reduced = identities
# For each cluster, merge the lists of faces referenced in the cluster's
# "faces" field, which is pointing to clusters (and not actual faces)
for cluster in reduced:
merged = []
for identity in cluster['faces']:
merged = merged + identity['faces']
cluster['faces'] = merged
if False:
# Creating a set containing those faces which have not been bound
# to an identity to recluster them in isolation from the rest of
# the faces
straglers = build_straglers(faces)
reduced = reduced + DBSCAN(straglers)
# Build a final cluster with all remaining uncategorized faces
if False:
remaining_cluster = {
'id': len(reduced) + 1,
'distance': 0,
'descriptors': [],
'cluster': Undefined,
'faces': []
}
straglers = build_straglers(faces)
for face in straglers:
face['cluster'] = remaining_cluster
remaining_cluster['faces'].append(face)
reduced.append(remaining_cluster)
# Give all merged identity lists a unique ID
for id, identity in enumerate(reduced):
identity['id'] = id
for face in identity['faces']:
face['cluster'] = identity
reduced = update_cluster_averages(reduced)
update_distances(reduced)
sort_identities(reduced)
if False:
# This generates a set of differences between clusters and makes
# a recommendation to merge clusters (outside of DBSCAN)
#
# Worth testing on larger data set
for i, A in enumerate(reduced):
for k, B in enumerate(reduced):
if k < i:
continue
if A == B:
continue
distance = findCosineDistanceBaked(A, B)
if distance < MAX_CLUSTER_DISTANCE:
distance = "{:0.4f}".format(distance)
print(f'{A["id"]} to {B["id"]} = {distance}: MERGE')
print('Writing to "auto-clusters.html"')
redirect_on(os.path.join(html_path, 'auto-clusters.html'))
gen_html(reduced)
redirect_off()
print(f'Connecting to database: {db_path}')
conn = create_connection(db_path)
with conn:
for identity in reduced:
print(f'Writing identity {identity["id"]} to DB')
id = create_identity(conn, identity)
first = True
for face in identity['faces']:
update_face_identity(conn, face['id'], id, first)
first = False