import sys
import json
import os
import piexif
import sqlite3
from sqlite3 import Error
from PIL import Image
import numpy as np
from deepface import DeepFace
from deepface.detectors import FaceDetector
import functools
from ketrface.util import *
from ketrface.dbscan import *
from ketrface.db import *
html_base = '../'
db_path = '../db/photos.db'
# TODO
# Switch to using DBSCAN
#
# Thoughts for determining number of clusters to try and target...
#
# Augment DBSCAN to rule out identity matching for the same face
# appearing more than once in a photo
#
# NOTE: This means twins or reflections won't both identify in the
# same photo -- those faces would then identify as a second face pairing
# which could merge with a cluster, but can not be used to match
def gen_html(identities):
for identity in identities:
print('
')
print(f'
Identity {identity["id"]} has {len(identity["faces"])}
')
print('
')
for face in identity['faces']:
faceId = face['id']
photoId = face['photoId']
distance = "{:0.4f}".format(face['distance'])
confidence = "{:0.3f}".format(face['confidence'])
label = face['cluster']
if type(label) != str:
label = f'Cluster ({face["cluster"]["id"]})'
print('
')
path = f'{html_base}/faces/{"{:02d}".format(faceId % 10)}'
print(f'

')
print(f'
{label}: {distance}
')
print(f'
{faceId} {photoId} {confidence}
')
print('
')
print('
')
print('
')
def load_faces(db_path = db_path):
conn = create_connection(db_path)
faces = []
with conn:
cur = conn.cursor()
res = cur.execute('''
SELECT faces.id,facedescriptors.descriptors,faces.faceConfidence,faces.photoId
FROM faces
JOIN facedescriptors ON (faces.descriptorId=facedescriptors.id)
WHERE faces.identityId IS null AND faces.faceConfidence>0.99
''')
for row in res.fetchall():
id, descriptors, confidence, photoId = row
face = {
'id': id,
'type': 'face',
'confidence': confidence,
'distance': 0,
'photoId': photoId,
'descriptors': np.frombuffer(descriptors),
'cluster': Undefined
}
face['faces'] = [ face ]
faces.append(face)
return faces
def cluster_sort(A, B):
diff = A['cluster'] - B['cluster']
if diff > 0:
return 1
elif diff < 0:
return -1
diff = A['confidence'] - B['confidence']
if diff > 0:
return 1
elif diff < 0:
return -1
return 0
print('Loading faces from database')
faces = load_faces()
print(f'{len(faces)} faces loaded')
print('Scanning for clusters')
identities = DBSCAN(faces) # process_faces(faces)
print(f'{len(identities)} clusters grouped')
# Compute average center for all clusters
sum = 0
for identity in identities:
sum += len(identity['faces'])
print(f'{identity["id"]} has {len(identity["faces"])} faces')
average = []
for face in identity['faces']:
if len(average) == 0:
average = face['descriptors']
else:
average = np.add(average, face['descriptors'])
average = np.divide(average, len(identity['faces']))
identity['descriptors'] = average
removed = -1
epoch = 1
# Filter each cluster removing any face that is > cluster_max_distance
# from the average center point of the cluster
while removed != 0:
print(f'Epoch {epoch}...')
epoch += 1
removed = 0
for identity in identities:
for face in identity['faces']:
average = identity['descriptors']
distance = findCosineDistance(average, face['descriptors'])
if distance > 0.14:
average = np.dot(average, len(identity['faces']))
average = np.subtract(average, face['descriptors'])
face['cluster'] = Undefined
face['distance'] = 0
identity['faces'].remove(face)
identity['descriptors'] = np.divide(average, len(identity['faces']))
removed += 1
else:
face['distance'] = distance
if removed > 0:
print(f'Excluded {removed} faces this epoch')
identities.sort(reverse = True, key = lambda x: len(x['faces']))
for identity in identities:
identity['faces'].sort(reverse = False, key = lambda x: x['distance'])
print(f'{len(identities)} identities seeded.')
print('Writing to "identities.html"')
redirect_on('identities.html')
gen_html(identities)
redirect_off()