323 lines
9.5 KiB
Python
323 lines
9.5 KiB
Python
import sys
|
|
import json
|
|
import os
|
|
import piexif
|
|
import sqlite3
|
|
from sqlite3 import Error
|
|
from PIL import Image
|
|
import numpy as np
|
|
from deepface import DeepFace
|
|
from deepface.detectors import FaceDetector
|
|
|
|
sqlite3.register_adapter(np.array, lambda arr: arr.tobytes())
|
|
sqlite3.register_converter("array", np.frombuffer)
|
|
|
|
class NpEncoder(json.JSONEncoder):
|
|
def default(self, obj):
|
|
if isinstance(obj, np.integer):
|
|
return int(obj)
|
|
if isinstance(obj, np.floating):
|
|
return float(obj)
|
|
if isinstance(obj, np.ndarray):
|
|
return obj.tolist()
|
|
|
|
model_name = 'VGG-Face' # 'ArcFace'
|
|
detector_backend = 'mtcnn' # 'retinaface'
|
|
model = DeepFace.build_model(model_name)
|
|
face_detector = FaceDetector.build_model(detector_backend)
|
|
input_shape = DeepFace.functions.find_input_shape(model)
|
|
|
|
def create_connection(db_file):
|
|
""" create a database connection to the SQLite database
|
|
specified by db_file
|
|
:param db_file: database file
|
|
:return: Connection object or None
|
|
"""
|
|
conn = None
|
|
try:
|
|
conn = sqlite3.connect(db_file)
|
|
except Error as e:
|
|
print(e)
|
|
|
|
return conn
|
|
|
|
def create_face(conn, face):
|
|
"""
|
|
Create a new face in the faces table
|
|
:param conn:
|
|
:param face:
|
|
:return: face id
|
|
"""
|
|
sql = '''
|
|
INSERT INTO faces(photoId,scanVersion,faceConfidence,top,left,bottom,right)
|
|
VALUES(?,?,?,?,?,?,?)
|
|
'''
|
|
cur = conn.cursor()
|
|
cur.execute(sql, (
|
|
face['photoId'],
|
|
face['scanVersion'],
|
|
face['faceConfidence'],
|
|
face['top'],
|
|
face['left'],
|
|
face['bottom'],
|
|
face['right']
|
|
))
|
|
conn.commit()
|
|
return cur.lastrowid
|
|
|
|
def create_face_descriptor(conn, faceId, descriptor):
|
|
"""
|
|
Create a new face in the faces table
|
|
:param conn:
|
|
:param faceId:
|
|
:param descriptor:
|
|
:return: descriptor id
|
|
"""
|
|
sql = '''
|
|
INSERT INTO facedescriptors(faceId,model,descriptors)
|
|
VALUES(?,?,?)
|
|
'''
|
|
cur = conn.cursor()
|
|
cur.execute(sql, (
|
|
faceId,
|
|
descriptor['model'],
|
|
np.array(descriptor['descriptors'])
|
|
))
|
|
conn.commit()
|
|
return cur.lastrowid
|
|
|
|
def update_face_count(conn, photoId, faces):
|
|
"""
|
|
Update the number of faces that have been matched on a photo
|
|
:param conn:
|
|
:param photoId:
|
|
:param faces:
|
|
:return: None
|
|
"""
|
|
sql = '''
|
|
UPDATE photos SET faces=? WHERE id=?
|
|
'''
|
|
cur = conn.cursor()
|
|
cur.execute(sql, (faces, photoId))
|
|
conn.commit()
|
|
return None
|
|
|
|
def findCosineDistance(source_representation, test_representation):
|
|
if type(source_representation) == list:
|
|
source_representation = np.array(source_representation)
|
|
if type(test_representation) == list:
|
|
test_representation = np.array(test_representation)
|
|
a = np.matmul(np.transpose(source_representation), test_representation)
|
|
b = np.sum(np.multiply(source_representation, source_representation))
|
|
c = np.sum(np.multiply(test_representation, test_representation))
|
|
return 1 - (a / (np.sqrt(b) * np.sqrt(c)))
|
|
|
|
def findEuclideanDistance(source_representation, test_representation):
|
|
if type(source_representation) == list:
|
|
source_representation = np.array(source_representation)
|
|
if type(test_representation) == list:
|
|
test_representation = np.array(test_representation)
|
|
euclidean_distance = source_representation - test_representation
|
|
euclidean_distance = np.sum(np.multiply(euclidean_distance, euclidean_distance))
|
|
euclidean_distance = np.sqrt(euclidean_distance)
|
|
return euclidean_distance
|
|
|
|
def l2_normalize(x):
|
|
return x / np.sqrt(np.sum(np.multiply(x, x)))
|
|
|
|
base = '/pictures/'
|
|
conn = create_connection('../db/photos.db')
|
|
faces = []
|
|
identities = []
|
|
|
|
def find_nearest_face(faces, identities, face, threshold = 0.99):
|
|
closest = None
|
|
closest_distance = -1
|
|
for target in identities + faces:
|
|
if target == face:
|
|
continue
|
|
target_distance = findCosineDistance(
|
|
target['descriptors'], face['descriptors']
|
|
)
|
|
if target_distance > threshold:
|
|
continue
|
|
if closest_distance == -1 or target_distance < closest_distance:
|
|
closest = target
|
|
closest_distance = target_distance
|
|
return closest
|
|
|
|
def merge_identities(identities, identity1, identity2):
|
|
sum1 = np.dot(
|
|
identity1['faces'],
|
|
identity1['descriptors']
|
|
)
|
|
sum2 = np.dot(
|
|
identity2['faces'],
|
|
identity2['descriptors']
|
|
)
|
|
sum = np.add(sum1, sum2)
|
|
faces = identity1['faces'] + identity2['faces']
|
|
id = 1
|
|
if len(identities):
|
|
id = identities[len(identities) - 1]['id'] + 1
|
|
|
|
return {
|
|
'id': id,
|
|
'descriptors': np.divide(sum, faces),
|
|
'faces': faces
|
|
}
|
|
|
|
def delete_identity(identities, identity):
|
|
for i, item in enumerate(identities):
|
|
if item['id'] == identity['id']:
|
|
return identities.pop(i)
|
|
return None
|
|
|
|
def update_face_identity(identities, face, closest):
|
|
if 'identified_as' in face:
|
|
face_identity = face['identified_as']
|
|
delete_identity(identities, face_identity)
|
|
else:
|
|
face_identity = face
|
|
|
|
if 'identified_as' in closest:
|
|
closest_identity = closest['identified_as']
|
|
delete_identity(identities, closest_identity)
|
|
else:
|
|
closest_identity = closest
|
|
|
|
identity = merge_identities(identities, face_identity, closest_identity)
|
|
identities.append(identity)
|
|
closest['identified_as'] = face['identified_as'] = identity
|
|
return identity
|
|
|
|
def cluster_faces(face):
|
|
identities = []
|
|
for i, face in enumerate(faces):
|
|
print(f'Clustering {i+1}/{len(faces)}')
|
|
closest = find_nearest_face(faces, identities, face, threshold = 0.25)
|
|
if closest == None:
|
|
print(f'Face {i+1} does not have any matches.')
|
|
continue
|
|
identity = update_face_identity(identities, face, closest)
|
|
# if identity['faces'] > 2:
|
|
# print(f'Updated identity {identity["id"]} to hold {identity["faces"]} faces.')
|
|
return identities
|
|
|
|
def identity_get_faces(item):
|
|
return item['faces']
|
|
|
|
with conn:
|
|
cur = conn.cursor()
|
|
res = cur.execute('''
|
|
SELECT faces.id,facedescriptors.descriptors,faces.faceConfidence,faces.photoId
|
|
FROM faces
|
|
JOIN facedescriptors ON (faces.descriptorId=facedescriptors.id)
|
|
WHERE faces.identityId IS null AND faces.faceConfidence>0.99
|
|
''')
|
|
for row in res.fetchall():
|
|
id, descriptors, confidence, photoId = row
|
|
face = None
|
|
for target in faces:
|
|
if target['id'] == id:
|
|
face = target
|
|
break
|
|
if face == None:
|
|
face = {
|
|
'id': id,
|
|
}
|
|
faces.append(face)
|
|
face['faces'] = 1
|
|
face['confidence'] = confidence
|
|
face['photoId'] = photoId
|
|
face['descriptors'] = np.frombuffer(descriptors)
|
|
|
|
identities = cluster_faces(faces)
|
|
identities.sort(reverse = True, key = identity_get_faces)
|
|
for identity in identities:
|
|
print(f'{identity["id"]} has {identity["faces"]} faces')
|
|
|
|
print(f'{len(identities)} identities seeded.')
|
|
|
|
exit(0)
|
|
if False:
|
|
for key2 in faces:
|
|
if key1 == key2:
|
|
continue
|
|
face2 = faces[key2]
|
|
if face2['scanned']:
|
|
continue
|
|
face = {
|
|
'between': (face1['id'], face2['id']),
|
|
'confidence': (face1['confidence'], face2['confidence'])
|
|
}
|
|
|
|
face['distanceCosine'] = findCosineDistance(
|
|
face1['descriptors'],
|
|
face2['descriptors']
|
|
)
|
|
face['distanceEuclidean'] = findEuclideanDistance(
|
|
face1['descriptors'],
|
|
face2['descriptors']
|
|
)
|
|
face['distanceEuclideanL2'] = findEuclideanDistance(
|
|
l2_normalize(face1['descriptors']),
|
|
l2_normalize(face2['descriptors'])
|
|
)
|
|
|
|
face['scoring'] = 0
|
|
|
|
if model_name == 'VGG-Face':
|
|
# thresholds = {'cosine': 0.40, 'euclidean': 0.60, 'euclidean_l2': 0.86}
|
|
# thresholds = {'cosine': 0.31, 'euclidean': 0.47, 'euclidean_l2': 0.79}
|
|
thresholds = {'cosine': 0.25, 'euclidean': 0.47, 'euclidean_l2': 0.79}
|
|
elif model_name == 'ArcFace':
|
|
thresholds = {'cosine': 0.68, 'euclidean': 4.15, 'euclidean_l2': 1.13}
|
|
|
|
if face['distanceCosine'] < thresholds['cosine']:
|
|
face['scoring'] += 1
|
|
if face['distanceEuclidean'] < thresholds['euclidean']:
|
|
face['scoring'] += 1
|
|
if face['distanceEuclideanL2'] < thresholds['euclidean_l2']:
|
|
face['scoring'] += 1
|
|
|
|
if face['scoring'] == 3: # Same face!
|
|
if ('identity' in face1) and ('identity' in face2):
|
|
if face1['identity'] != face2['identity']:
|
|
# print(f'Identity mismatch between {key1}({face1["confidence"]}) and {key2}({face2["confidence"]})')
|
|
continue
|
|
elif 'identity' in face1:
|
|
face2['identity'] = face1['identity']
|
|
face1['identity']['members'].append(face)
|
|
elif 'identity' in face2:
|
|
face1['identity'] = face2['identity']
|
|
face2['identity']['members'].append(face)
|
|
else:
|
|
# print(f'Creating new identity {len(identities)} {face["between"]}')
|
|
identity = {
|
|
'members': [],
|
|
}
|
|
face1['identity'] = face2['identity'] = identity
|
|
identity['members'].append(face)
|
|
identities.append(identity)
|
|
|
|
for idx, identity in enumerate(identities):
|
|
count = len(identity['members'])
|
|
print('<div>')
|
|
print(f'<div><b>Identity {idx} has {count}</b><br></div>')
|
|
print('<div>')
|
|
for member in identity['members']:
|
|
face1 = member['between'][0]
|
|
face2 = member['between'][1]
|
|
path1 = f'faces/{"{:02d}".format(face1 % 10)}'
|
|
path2 = f'faces/{"{:02d}".format(face2 % 10)}'
|
|
print('<div>')
|
|
print(f'<img src="{path1}/{face1}.jpg"/>{member["confidence"][0]}')
|
|
print(f'<img src="{path2}/{face2}.jpg"/>{member["confidence"][1]}')
|
|
print('</div>')
|
|
print(f'<div>Distance: {member["distanceCosine"]}, {member["distanceEuclidean"]}, {member["distanceEuclideanL2"]}</div>')
|
|
print('</div>')
|
|
print('</div>')
|
|
|
|
# update_face_count(conn, photoId, len(faces))
|