188 lines
4.8 KiB
Python
188 lines
4.8 KiB
Python
import sys
|
|
import json
|
|
import os
|
|
import piexif
|
|
import sqlite3
|
|
from sqlite3 import Error
|
|
from PIL import Image
|
|
import numpy as np
|
|
from deepface import DeepFace
|
|
from retinaface import RetinaFace
|
|
|
|
|
|
sqlite3.register_adapter(np.array, lambda arr: arr.tobytes())
|
|
sqlite3.register_converter("array", np.frombuffer)
|
|
|
|
class NpEncoder(json.JSONEncoder):
|
|
def default(self, obj):
|
|
if isinstance(obj, np.integer):
|
|
return int(obj)
|
|
if isinstance(obj, np.floating):
|
|
return float(obj)
|
|
if isinstance(obj, np.ndarray):
|
|
return obj.tolist()
|
|
|
|
model = DeepFace.build_model('ArcFace')
|
|
input_shape = DeepFace.functions.find_input_shape(model)
|
|
|
|
def create_connection(db_file):
|
|
""" create a database connection to the SQLite database
|
|
specified by db_file
|
|
:param db_file: database file
|
|
:return: Connection object or None
|
|
"""
|
|
conn = None
|
|
try:
|
|
conn = sqlite3.connect(db_file)
|
|
except Error as e:
|
|
print(e)
|
|
|
|
return conn
|
|
|
|
def create_face(conn, face):
|
|
"""
|
|
Create a new face in the faces table
|
|
:param conn:
|
|
:param face:
|
|
:return: face id
|
|
"""
|
|
sql = '''
|
|
INSERT INTO faces(photoId,scanVersion,faceConfidence,top,left,bottom,right)
|
|
VALUES(?,?,?,?,?,?,?)
|
|
'''
|
|
cur = conn.cursor()
|
|
cur.execute(sql, (
|
|
face['photoId'],
|
|
face['scanVersion'],
|
|
face['faceConfidence'],
|
|
face['top'],
|
|
face['left'],
|
|
face['bottom'],
|
|
face['right']
|
|
))
|
|
conn.commit()
|
|
return cur.lastrowid
|
|
|
|
def create_face_descriptor(conn, faceId, descriptor):
|
|
"""
|
|
Create a new face in the faces table
|
|
:param conn:
|
|
:param faceId:
|
|
:param descriptor:
|
|
:return: descriptor id
|
|
"""
|
|
sql = '''
|
|
INSERT INTO facedescriptors(faceId,model,descriptors)
|
|
VALUES(?,?,?)
|
|
'''
|
|
cur = conn.cursor()
|
|
cur.execute(sql, (
|
|
faceId,
|
|
descriptor['model'],
|
|
np.array(descriptor['descriptors'])
|
|
))
|
|
conn.commit()
|
|
return cur.lastrowid
|
|
|
|
def update_face_count(conn, photoId, faces):
|
|
"""
|
|
Update the number of faces that have been matched on a photo
|
|
:param conn:
|
|
:param photoId:
|
|
:param faces:
|
|
:return: None
|
|
"""
|
|
sql = '''
|
|
UPDATE photos SET faces=? WHERE id=?
|
|
'''
|
|
cur = conn.cursor()
|
|
cur.execute(sql, (faces, photoId))
|
|
conn.commit()
|
|
return None
|
|
|
|
def findCosineDistance(source_representation, test_representation):
|
|
if type(source_representation) == list:
|
|
source_representation = np.array(source_representation)
|
|
if type(test_representation) == list:
|
|
test_representation = np.array(test_representation)
|
|
a = np.matmul(np.transpose(source_representation), test_representation)
|
|
b = np.sum(np.multiply(source_representation, source_representation))
|
|
c = np.sum(np.multiply(test_representation, test_representation))
|
|
return 1 - (a / (np.sqrt(b) * np.sqrt(c)))
|
|
|
|
def findEuclideanDistance(source_representation, test_representation):
|
|
if type(source_representation) == list:
|
|
source_representation = np.array(source_representation)
|
|
if type(test_representation) == list:
|
|
test_representation = np.array(test_representation)
|
|
euclidean_distance = source_representation - test_representation
|
|
euclidean_distance = np.sum(np.multiply(euclidean_distance, euclidean_distance))
|
|
euclidean_distance = np.sqrt(euclidean_distance)
|
|
return euclidean_distance
|
|
|
|
def l2_normalize(x):
|
|
return x / np.sqrt(np.sum(np.multiply(x, x)))
|
|
|
|
base = '/pictures/'
|
|
conn = create_connection('../db/photos.db')
|
|
faces = {}
|
|
identities = {}
|
|
|
|
with conn:
|
|
cur = conn.cursor()
|
|
res = cur.execute('''
|
|
SELECT faces.id,facedescriptors.descriptors
|
|
FROM faces
|
|
JOIN facedescriptors ON (faces.descriptorId=facedescriptors.id)
|
|
WHERE faces.identityId IS null
|
|
''')
|
|
for row in res.fetchall():
|
|
id, descriptors = row
|
|
if id in faces:
|
|
face = faces[id]
|
|
else:
|
|
face = {
|
|
'id': id,
|
|
'scanned': False
|
|
}
|
|
faces[id] = face
|
|
face['descriptors'] = np.frombuffer(descriptors)
|
|
|
|
for key1 in faces:
|
|
face1 = faces[key1]
|
|
if face1['scanned'] == True:
|
|
continue
|
|
face1['scanned'] = True
|
|
for key2 in faces:
|
|
if key1 == key2:
|
|
continue
|
|
face2 = faces[key2]
|
|
face = {
|
|
'between': (face1['id'], face2['id'])
|
|
}
|
|
|
|
face['distanceCosine'] = findCosineDistance(
|
|
face1['descriptors'],
|
|
face2['descriptors']
|
|
)
|
|
face['distanceEuclidian'] = findEuclideanDistance(
|
|
face1['descriptors'],
|
|
face2['descriptors']
|
|
)
|
|
face['distanceEuclidianL2'] = findEuclideanDistance(
|
|
l2_normalize(face1['descriptors']),
|
|
l2_normalize(face2['descriptors'])
|
|
)
|
|
face['scoring'] = 0
|
|
if face['distanceCosine'] >= 0.68:
|
|
face['scoring'] += 1
|
|
if face['distanceEuclidian'] >= 4.15:
|
|
face['scoring'] += 1
|
|
if face['distanceEuclidianL2'] >= 1.13:
|
|
face['scoring'] += 1
|
|
|
|
if face['scoring'] == 3: # Same face!
|
|
print(face)
|
|
|
|
# update_face_count(conn, photoId, len(faces))
|