James Ketrenos 1975b174a8 Functional for editing clusters; need to add merging
Signed-off-by: James Ketrenos <james_git@ketrenos.com>
2023-01-19 00:51:40 -08:00

256 lines
7.8 KiB
Python

import sys
import json
import os
import piexif
import sqlite3
from sqlite3 import Error
from PIL import Image
import numpy as np
import functools
from ketrface.util import *
from ketrface.dbscan import *
from ketrface.db import *
from ketrface.config import *
MAX_DISTANCE_FROM_CENTROID = 0.14 # Used to prune outliers
config = read_config()
html_path = merge_config_path(config['path'], 'frontend')
pictures_path = merge_config_path(config['path'], config['picturesPath'])
faces_path = merge_config_path(config['path'], config['facesPath'])
db_path = merge_config_path(config['path'], config["db"]["photos"]["host"])
html_base = config['basePath']
if html_base == "/":
html_base = "."
def update_cluster_averages(identities):
for identity in identities:
average = []
for face in identity['faces']:
if len(average) == 0:
average = face['descriptors']
else:
average = np.add(average, face['descriptors'])
average = np.divide(average, len(identity['faces']))
identity['descriptors'] = average
identity['sqrtsummul'] = np.sqrt(np.sum(np.multiply(
average, average)))
return identities
def sort_identities(identities):
identities.sort(reverse = True, key = lambda x: len(x['faces']))
for identity in identities:
identity['faces'].sort(reverse = False, key = lambda x: x['distance'])
def cluster_sort(A, B):
diff = A['cluster'] - B['cluster']
if diff > 0:
return 1
elif diff < 0:
return -1
diff = A['confidence'] - B['confidence']
if diff > 0:
return 1
elif diff < 0:
return -1
return 0
def load_identities(db_path):
conn = create_connection(db_path)
identities = []
with conn:
cur = conn.cursor()
res = cur.execute('''
SELECT
identities.id as identityId,
identities.displayName as displayName,
identities.descriptors as descriptors,
COUNT(faces.id) AS faceCount
FROM identities
JOIN faces ON faces.identityId=identities.id
GROUP BY identities.id
''')
for row in res.fetchall():
identityId, displayName, descriptors, faceCount = row
identity = {
'identityId': identityId,
'displayName': displayName,
'descriptors': np.frombuffer(descriptors),
'faceCount': faceCount,
'updated': False
}
# Pre-bake computations for cosine distance
identity['sqrtsummul'] = np.sqrt(np.sum(np.multiply(
identity['descriptors'], identity['descriptors'])))
identities.append(identity)
return identities
def find_identity(identities, identityId):
for element in identities:
if element['identityId'] == identityId:
return element
raise Exception(f'Identity {identityId} missing')
def load_doppelganger_photos(db_path):
conn = create_connection(db_path)
photos = {}
with conn:
cur = conn.cursor()
res = cur.execute('''
SELECT
f1.identityId AS identityId,
photos.id AS photoId,
f1.id AS f1_id,
f2.id AS f2_id,
f1_descriptors.descriptors AS f1_descriptors,
f2_descriptors.descriptors AS f2_descriptors
FROM faces AS f1
INNER JOIN faces AS f2 ON (
f2.identityId=f1.identityId AND f1.photoId=f2.photoId and f1_id!=f2_id)
INNER JOIN photos ON (
photos.duplicate == 0 OR photos.duplicate IS NULL)
INNER JOIN facedescriptors AS f1_descriptors ON (
f1.descriptorId=f1_descriptors.id)
INNER JOIN facedescriptors AS f2_descriptors ON (
f2.descriptorId=f2_descriptors.id)
WHERE f1.identityId IS NOT NULL AND f1.photoId=photos.id
ORDER BY photos.id,f1.identityId
''')
for row in res.fetchall():
identityId, photoId, f1_id, f2_id,f1_descriptors, f2_descriptors = row
face1 = {
'id': f1_id,
'type': 'face',
'distance': 0,
'descriptors': np.frombuffer(f1_descriptors),
'cluster': identityId, # Undefined from dbscan.py
}
face1['sqrtsummul'] = np.sqrt(np.sum(np.multiply(
face1['descriptors'], face1['descriptors'])))
face2 = {
'id': f2_id,
'type': 'face',
'distance': 0,
'descriptors': np.frombuffer(f2_descriptors),
'cluster': identityId, # Undefined from dbscan.py
}
face2['sqrtsummul'] = np.sqrt(np.sum(np.multiply(
face2['descriptors'], face2['descriptors'])))
if photoId not in photos:
photos[photoId] = {
'photoId': photoId,
'dopplegangers': {}
}
if identityId not in photos[photoId]['dopplegangers']:
photos[photoId]['dopplegangers'][identityId] = {
'identity': None,
'faces': []
}
faceList = photos[photoId]['dopplegangers'][identityId]['faces']
for face in [ face1, face2 ]:
match = False
for key in faceList:
if face['id'] == key['id']:
match = True
break
if not match:
faceList.append(face)
return photos
def remove_face_from_identity(identity, face):
identity['updated'] = True
average = identity['descriptors']
average = np.dot(average, identity['faceCount'])
average = np.subtract(average, face['descriptors'])
identity['faceCount'] -= 1
average = np.divide(average, identity['faceCount'])
identity['descriptors'] = average
identity['sqrtsummul'] = np.sqrt(np.sum(np.multiply(
average, average)))
face['identity'] = None
face['identityId'] = -1
face['distance'] = 0
print('Loading identities from database...')
identities = load_identities(db_path)
print(f'{len(identities)} identities loaded.')
print('Loading dopplegangers from database...')
photos = load_doppelganger_photos(db_path)
print(f'{len(photos)} photos with dopplegangers loaded.')
print('Binding dopplegangers to identities...')
face_updates = []
identity_updates = []
for photoId in photos:
photo = photos[photoId]
print(f'Processing photo {photoId}...')
for identityId in photo['dopplegangers']:
if photo['dopplegangers'][identityId]['identity'] == None:
photo['dopplegangers'][identityId]['identity'] = find_identity(identities, identityId)
faces = photo['dopplegangers'][identityId]['faces']
identity = photo['dopplegangers'][identityId]['identity']
for face in faces:
face['identity'] = identity
face['distance'] = findCosineDistanceBaked(face, identity)
faces.sort(reverse = False, key = lambda x: x['distance'])
# First face closest to identity -- it stays with the photo
faces = faces[1:]
#
for i, face in enumerate(faces):
remove_face_from_identity(identity, face)
identity_updates.append(identity)
min = None
for j, potential in enumerate(identities):
if potential == identity:
continue
distance = findCosineDistanceBaked(face, potential)
if distance > MAX_DISTANCE_FROM_CENTROID:
continue
if min == None or distance < min:
face["distance"] = distance
face['identity'] = potential
face['identityId'] = potential['identityId']
face_updates.append(face)
if face['identity'] != None:
print(f' {i+1}: {face["id"]} moves from {identity["displayName"]} to:')
print(f' {face["identity"]["displayName"]} with distance {face["distance"]}')
else:
print(f' {i+1}: {face["id"]} needs a new Identity.')
conn = create_connection(db_path)
with conn:
cur = conn.cursor()
for face in face_updates:
print(f'Updating face {face["id"]} in DB')
if face['identity'] == None:
sql = '''
UPDATE faces SET identityId=NULL WHERE id=?
'''
values=(face["id"], )
else:
sql = '''
UPDATE faces SET identityId=? WHERE id=?
'''
values=(
face["identityId"],
face["id"]
)
cur.execute(sql, values)
conn.commit()