ketr.photos/server/cluster.py
James Ketrenos 3b3c915080 more clustering
Signed-off-by: James Ketrenos <james_git@ketrenos.com>
2023-01-06 20:27:32 -08:00

345 lines
10 KiB
Python

import sys
import json
import os
import piexif
import sqlite3
from sqlite3 import Error
from PIL import Image
import numpy as np
from deepface import DeepFace
from deepface.detectors import FaceDetector
sqlite3.register_adapter(np.array, lambda arr: arr.tobytes())
sqlite3.register_converter("array", np.frombuffer)
class NpEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
if isinstance(obj, np.floating):
return float(obj)
if isinstance(obj, np.ndarray):
return obj.tolist()
model_name = 'VGG-Face' # 'ArcFace'
detector_backend = 'mtcnn' # 'retinaface'
model = DeepFace.build_model(model_name)
face_detector = FaceDetector.build_model(detector_backend)
input_shape = DeepFace.functions.find_input_shape(model)
def create_connection(db_file):
""" create a database connection to the SQLite database
specified by db_file
:param db_file: database file
:return: Connection object or None
"""
conn = None
try:
conn = sqlite3.connect(db_file)
except Error as e:
print(e)
return conn
def create_face(conn, face):
"""
Create a new face in the faces table
:param conn:
:param face:
:return: face id
"""
sql = '''
INSERT INTO faces(photoId,scanVersion,faceConfidence,top,left,bottom,right)
VALUES(?,?,?,?,?,?,?)
'''
cur = conn.cursor()
cur.execute(sql, (
face['photoId'],
face['scanVersion'],
face['faceConfidence'],
face['top'],
face['left'],
face['bottom'],
face['right']
))
conn.commit()
return cur.lastrowid
def create_face_descriptor(conn, faceId, descriptor):
"""
Create a new face in the faces table
:param conn:
:param faceId:
:param descriptor:
:return: descriptor id
"""
sql = '''
INSERT INTO facedescriptors(faceId,model,descriptors)
VALUES(?,?,?)
'''
cur = conn.cursor()
cur.execute(sql, (
faceId,
descriptor['model'],
np.array(descriptor['descriptors'])
))
conn.commit()
return cur.lastrowid
def update_face_count(conn, photoId, faces):
"""
Update the number of faces that have been matched on a photo
:param conn:
:param photoId:
:param faces:
:return: None
"""
sql = '''
UPDATE photos SET faces=? WHERE id=?
'''
cur = conn.cursor()
cur.execute(sql, (faces, photoId))
conn.commit()
return None
def findCosineDistance(source_representation, test_representation):
if type(source_representation) == list:
source_representation = np.array(source_representation)
if type(test_representation) == list:
test_representation = np.array(test_representation)
a = np.matmul(np.transpose(source_representation), test_representation)
b = np.sum(np.multiply(source_representation, source_representation))
c = np.sum(np.multiply(test_representation, test_representation))
return 1 - (a / (np.sqrt(b) * np.sqrt(c)))
def findEuclideanDistance(source_representation, test_representation):
if type(source_representation) == list:
source_representation = np.array(source_representation)
if type(test_representation) == list:
test_representation = np.array(test_representation)
euclidean_distance = source_representation - test_representation
euclidean_distance = np.sum(np.multiply(euclidean_distance, euclidean_distance))
euclidean_distance = np.sqrt(euclidean_distance)
return euclidean_distance
def l2_normalize(x):
return x / np.sqrt(np.sum(np.multiply(x, x)))
base = '/pictures/'
conn = create_connection('../db/photos.db')
faces = []
identities = []
def find_nearest_face(faces, identities, face, threshold = 0.99):
closest = None
closest_distance = -1
for target in identities + faces:
if target == face:
continue
target_distance = findCosineDistance(
target['descriptors'], face['descriptors']
)
if target_distance > threshold:
continue
if closest_distance == -1 or target_distance < closest_distance:
closest = target
closest_distance = target_distance
return closest
def merge_identities(identities, identity1, identity2):
sum1 = np.dot(
identity1['faces'],
identity1['descriptors']
)
sum2 = np.dot(
identity2['faces'],
identity2['descriptors']
)
sum = np.add(sum1, sum2)
faces = identity1['faces'] + identity2['faces']
id = 1
if len(identities):
id = identities[len(identities) - 1]['id'] + 1
return {
'id': id,
'descriptors': np.divide(sum, faces),
'faces': faces
}
def delete_identity(identities, identity):
for i, item in enumerate(identities):
if item['id'] == identity['id']:
return identities.pop(i)
return None
def update_face_identity(identities, face, closest):
if 'identified_as' in face:
face_identity = face['identified_as']
delete_identity(identities, face_identity)
else:
face_identity = face
if 'identified_as' in closest:
closest_identity = closest['identified_as']
delete_identity(identities, closest_identity)
else:
closest_identity = closest
identity = merge_identities(identities, face_identity, closest_identity)
identities.append(identity)
closest['identified_as'] = face['identified_as'] = identity
return identity
def cluster_faces(face):
identities = []
perc = -1
for i, face in enumerate(faces):
new_perc = int(100 * (i+1) / len(faces))
if new_perc != perc:
perc = new_perc
print(f'Clustering faces {perc}% complete with {len(identities)} identities.')
closest = find_nearest_face(faces, identities, face, threshold = 0.25)
if closest == None:
continue
identity = update_face_identity(identities, face, closest)
# if identity['faces'] > 2:
# print(f'Updated identity {identity["id"]} to hold {identity["faces"]} faces.')
return identities
def cluster_identities(identities):
perc = -1
last_len = 0
while last_len != len(identities):
last_len = len(identities)
for i, identity in enumerate(identities):
new_perc = int(100 * (i+1) / len(identities))
if new_perc != perc:
perc = new_perc
print(f'Clustering identities {perc}% complete with {len(identities)} identities.')
closest = find_nearest_face([], identities, face, threshold = 0.25)
if closest == None:
continue
update_face_identity(identities, identity, closest)
return identities
def identity_get_faces(item):
return item['faces']
with conn:
cur = conn.cursor()
res = cur.execute('''
SELECT faces.id,facedescriptors.descriptors,faces.faceConfidence,faces.photoId
FROM faces
JOIN facedescriptors ON (faces.descriptorId=facedescriptors.id)
WHERE faces.identityId IS null AND faces.faceConfidence>0.99
''')
for row in res.fetchall():
id, descriptors, confidence, photoId = row
face = None
for target in faces:
if target['id'] == id:
face = target
break
if face == None:
face = {
'id': id,
}
faces.append(face)
face['faces'] = 1
face['confidence'] = confidence
face['photoId'] = photoId
face['descriptors'] = np.frombuffer(descriptors)
identities = cluster_faces(faces)
#identities = cluster_identities(identities)
identities.sort(reverse = True, key = identity_get_faces)
sum = 0
for identity in identities:
sum += identity['faces']
print(f'{identity["id"]} has {identity["faces"]} faces')
print(f'{len(identities)} identities seeded with {sum} faces.')
exit(0)
if False:
for key2 in faces:
if key1 == key2:
continue
face2 = faces[key2]
if face2['scanned']:
continue
face = {
'between': (face1['id'], face2['id']),
'confidence': (face1['confidence'], face2['confidence'])
}
face['distanceCosine'] = findCosineDistance(
face1['descriptors'],
face2['descriptors']
)
face['distanceEuclidean'] = findEuclideanDistance(
face1['descriptors'],
face2['descriptors']
)
face['distanceEuclideanL2'] = findEuclideanDistance(
l2_normalize(face1['descriptors']),
l2_normalize(face2['descriptors'])
)
face['scoring'] = 0
if model_name == 'VGG-Face':
# thresholds = {'cosine': 0.40, 'euclidean': 0.60, 'euclidean_l2': 0.86}
# thresholds = {'cosine': 0.31, 'euclidean': 0.47, 'euclidean_l2': 0.79}
thresholds = {'cosine': 0.25, 'euclidean': 0.47, 'euclidean_l2': 0.79}
elif model_name == 'ArcFace':
thresholds = {'cosine': 0.68, 'euclidean': 4.15, 'euclidean_l2': 1.13}
if face['distanceCosine'] < thresholds['cosine']:
face['scoring'] += 1
if face['distanceEuclidean'] < thresholds['euclidean']:
face['scoring'] += 1
if face['distanceEuclideanL2'] < thresholds['euclidean_l2']:
face['scoring'] += 1
if face['scoring'] == 3: # Same face!
if ('identity' in face1) and ('identity' in face2):
if face1['identity'] != face2['identity']:
# print(f'Identity mismatch between {key1}({face1["confidence"]}) and {key2}({face2["confidence"]})')
continue
elif 'identity' in face1:
face2['identity'] = face1['identity']
face1['identity']['members'].append(face)
elif 'identity' in face2:
face1['identity'] = face2['identity']
face2['identity']['members'].append(face)
else:
# print(f'Creating new identity {len(identities)} {face["between"]}')
identity = {
'members': [],
}
face1['identity'] = face2['identity'] = identity
identity['members'].append(face)
identities.append(identity)
for idx, identity in enumerate(identities):
count = len(identity['members'])
print('<div>')
print(f'<div><b>Identity {idx} has {count}</b><br></div>')
print('<div>')
for member in identity['members']:
face1 = member['between'][0]
face2 = member['between'][1]
path1 = f'faces/{"{:02d}".format(face1 % 10)}'
path2 = f'faces/{"{:02d}".format(face2 % 10)}'
print('<div>')
print(f'<img src="{path1}/{face1}.jpg"/>{member["confidence"][0]}')
print(f'<img src="{path2}/{face2}.jpg"/>{member["confidence"][1]}')
print('</div>')
print(f'<div>Distance: {member["distanceCosine"]}, {member["distanceEuclidean"]}, {member["distanceEuclideanL2"]}</div>')
print('</div>')
print('</div>')
# update_face_count(conn, photoId, len(faces))