ketr.photos/ketrface/cluster.py
James Ketrenos b51e8fa307 Ready to try!
Signed-off-by: James Ketrenos <james_git@ketrenos.com>
2023-01-11 15:27:44 -08:00

244 lines
7.4 KiB
Python

import sys
import json
import os
import piexif
import sqlite3
from sqlite3 import Error
from PIL import Image
import numpy as np
import functools
from ketrface.util import *
from ketrface.dbscan import *
from ketrface.db import *
from ketrface.config import *
config = read_config()
html_path = merge_config_path(config['path'], 'frontend')
pictures_path = merge_config_path(config['path'], config['picturesPath'])
faces_path = merge_config_path(config['path'], config['facesPath'])
db_path = merge_config_path(config['path'], config["db"]["photos"]["host"])
html_base = config['basePath']
# TODO
# Switch to using DBSCAN
#
# Thoughts for determining number of clusters to try and target...
#
# Augment DBSCAN to rule out identity matching for the same face
# appearing more than once in a photo
#
# NOTE: This means twins or reflections won't both identify in the
# same photo -- those faces would then identify as a second face pairing
# which could merge with a cluster, but can not be used to match
def gen_html(identities):
for identity in identities:
print('<div>')
print(f'<div><b>Identity {identity["id"]} has {len(identity["faces"])}</b><br></div>')
print('<div>')
for face in identity['faces']:
faceId = face['id']
photoId = face['photoId']
distance = "{:0.4f}".format(face['distance'])
confidence = "{:0.3f}".format(face['confidence'])
focus = int(face['focus'])
label = face['cluster']
if type(label) != str:
label = f'Cluster ({face["cluster"]["id"]})'
print('<div style="position:relative;display:inline-flex;flex-direction:column">')
path = f'{html_base}/faces/{"{:02d}".format(faceId % 100)}'
print(f'<img src="{path}/{faceId}.jpg"/>')
print(f'<div style="background-color:rgba(255, 255, 255, 0.4);position:absolute;top:0px;left:0px;right:0px;padding:0.25rem">{label}: {distance}</div>')
print(f'<div style="background-color:rgba(255, 255, 255, 0.4);position:absolute;bottom:0px;left:0px;right:0px;padding:0.25rem">{faceId} {photoId} {confidence} {focus}</div>')
print('</div>')
print('</div>')
print('</div>')
def update_cluster_averages(identities):
for identity in identities:
average = []
for face in identity['faces']:
if len(average) == 0:
average = face['descriptors']
else:
average = np.add(average, face['descriptors'])
average = np.divide(average, len(identity['faces']))
identity['descriptors'] = average
return identities
def load_faces(db_path = db_path):
print(f'Connecting to database: {db_path}')
conn = create_connection(db_path)
faces = []
with conn:
print('Querying faces')
cur = conn.cursor()
res = cur.execute('''
SELECT faces.id,facedescriptors.descriptors,faces.faceConfidence,faces.photoId,faces.focus
FROM faces
JOIN facedescriptors ON (faces.descriptorId=facedescriptors.id)
WHERE faces.identityId IS null AND faces.faceConfidence>0.99
''')
for row in res.fetchall():
id, descriptors, confidence, photoId, focus = row
if focus is None:
focus = 100 # Assume full focus if focus not set
face = {
'id': id,
'type': 'face',
'confidence': confidence,
'distance': 0,
'photoId': photoId,
'descriptors': np.frombuffer(descriptors),
'cluster': Undefined,
'focus': focus
}
face['faces'] = [ face ]
faces.append(face)
return faces
def update_distances(identities, prune = False):
removed = 0
for identity in identities:
for face in identity['faces']:
average = identity['descriptors']
distance = findCosineDistance(average, face['descriptors'])
if prune and distance > MAX_EPOCH_DISTANCE:
average = np.dot(average, len(identity['faces']))
average = np.subtract(average, face['descriptors'])
face['cluster'] = Undefined
face['distance'] = 0
identity['faces'].remove(face)
identity['descriptors'] = np.divide(average, len(identity['faces']))
removed += 1
else:
face['distance'] = distance
return removed
def sort_identities(identities):
identities.sort(reverse = True, key = lambda x: len(x['faces']))
for identity in identities:
identity['faces'].sort(reverse = False, key = lambda x: x['distance'])
def cluster_sort(A, B):
diff = A['cluster'] - B['cluster']
if diff > 0:
return 1
elif diff < 0:
return -1
diff = A['confidence'] - B['confidence']
if diff > 0:
return 1
elif diff < 0:
return -1
return 0
def build_straglers(faces):
noise = []
undefined = []
for face in faces:
if face['cluster'] == Noise:
noise.append(face)
elif face['cluster'] == Undefined:
undefined.append(face)
return noise + undefined
print('Loading faces from database')
faces = load_faces()
print(f'{len(faces)} faces loaded')
print('Scanning for clusters')
identities = DBSCAN(faces) # process_faces(faces)
print(f'{len(identities)} clusters grouped')
MAX_CLUSTER_DISTANCE = 0.15 # Used to merge clusters
MAX_EPOCH_DISTANCE = 0.14 # Used to prune outliers
# Compute average center for all clusters
identities = update_cluster_averages(identities)
removed = -1
epoch = 1
# Filter each cluster removing any face that is > cluster_max_distance
# from the average center point of the cluster
while removed != 0:
print(f'Epoch {epoch}...')
epoch += 1
removed = update_distances(identities, prune = True)
if removed > 0:
print(f'Excluded {removed} faces this epoch')
print(f'{len(identities)} identities seeded.')
# Cluster the clusters...
print('Reducing clusters via DBSCAN')
reduced = DBSCAN(identities, eps = MAX_CLUSTER_DISTANCE, minPts = 2)
if len(reduced) == 0:
reduced = identities
# For each cluster, merge the lists of faces referenced in the cluster's
# "faces" field, which is pointing to clusters (and not actual faces)
for cluster in reduced:
merged = []
for identity in cluster['faces']:
merged = merged + identity['faces']
cluster['faces'] = merged
# Creating a set containing those faces which have not been bound
# to an identity to recluster them in isolation from the rest of
# the faces
straglers = build_straglers(faces)
reduced = reduced + DBSCAN(straglers)
# Build a final cluster with all remaining uncategorized faces
remaining_cluster = {
'id': len(reduced) + 1,
'distance': 0,
'descriptors': [],
'cluster': Undefined,
'faces': []
}
straglers = build_straglers(faces)
for face in straglers:
face['cluster'] = remaining_cluster
remaining_cluster['faces'].append(face)
reduced.append(remaining_cluster)
# Give all merged identity lists a unique ID
for id, identity in enumerate(reduced):
identity['id'] = id
for face in identity['faces']:
face['cluster'] = identity
reduced = update_cluster_averages(reduced)
update_distances(reduced)
sort_identities(reduced)
# This generates a set of differences between clusters and makes
# a recommendation to merge clusters (outside of DBSCAN)
#
# Worth testing on larger data set
for i, A in enumerate(reduced):
for k, B in enumerate(reduced):
if k < i:
continue
if A == B:
continue
distance = findCosineDistance(A['descriptors'], B['descriptors'])
if distance < MAX_CLUSTER_DISTANCE:
distance = "{:0.4f}".format(distance)
print(f'{A["id"]} to {B["id"]} = {distance}: MERGE')
print('Writing to "auto-clusters.html"')
redirect_on(os.path.join(html_path, 'auto-clusters.html'))
gen_html(reduced)
redirect_off()