ketr.photos/server/detect.py
James Ketrenos ce05de871b Clustering based on VGG and mtcnn
Signed-off-by: James Ketrenos <james_git@ketrenos.com>
2023-01-06 14:18:21 -08:00

337 lines
9.7 KiB
Python

import sys
import zlib
import json
import os
import piexif
import sqlite3
from sqlite3 import Error
from PIL import Image, ImageOps
from deepface import DeepFace
from deepface.detectors import FaceDetector
from retinaface import RetinaFace
import numpy as np
import cv2
import uu
from io import BytesIO
original = None
def redirect_on():
global original
if original == None:
original = sys.stdout
sys.stdout = open(os.devnull, 'w')
def redirect_off():
global original
if original != None:
sys.stdout.close()
sys.stdout = original
original = None
def zlib_uuencode(databytes, name='<data>'):
''' Compress databytes with zlib & uuencode the result '''
inbuff = BytesIO(zlib.compress(databytes, 9))
outbuff = BytesIO()
uu.encode(inbuff, outbuff, name=name)
return outbuff.getvalue()
def zlib_uudecode(databytes):
''' uudecode databytes and decompress the result with zlib '''
inbuff = BytesIO(databytes)
outbuff = BytesIO()
uu.decode(inbuff, outbuff)
return zlib.decompress(outbuff.getvalue())
class NpEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.integer):
return int(obj)
if isinstance(obj, np.floating):
return float(obj)
if isinstance(obj, np.ndarray):
return obj.tolist()
model_name = 'VGG-Face' # 'ArcFace'
detector_backend = 'mtcnn' # 'retinaface'
model = DeepFace.build_model(model_name)
face_detector = FaceDetector.build_model(detector_backend)
input_shape = DeepFace.functions.find_input_shape(model)
# Adapted from DeepFace
# https://github.com/serengil/deepface/blob/master/deepface/commons/functions.py
#
# Modified to use bicubic resampling and clip expansion, as well as to
# take a PIL Image instead of numpy array
def alignment_procedure(img, left_eye, right_eye):
"""
Given left and right eye coordinates in image, rotate around point
between eyes such that eyes are horizontal
:param img: Image (not np.array)
:param left_eye: Eye appearing on the left (right eye of person)
:param right_eye: Eye appearing on the right (left eye of person)
:return: adjusted image
"""
dY = right_eye[1] - left_eye[1]
dX = right_eye[0] - left_eye[0]
radians = np.arctan2(dY, dX)
rotation = 180 + 180 * radians / np.pi
if True:
img = img.rotate(
angle = rotation,
resample = Image.BICUBIC,
expand = True)
return img
def extract_faces(img, threshold=0.75, allow_upscaling = True):
if detector_backend == 'retinaface':
faces = RetinaFace.detect_faces(
img_path = img,
threshold = threshold,
model = model,
allow_upscaling = allow_upscaling)
elif detector_backend == 'mtcnn':
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # mtcnn expects RGB
redirect_on()
res = face_detector.detect_faces(img_rgb)
redirect_off()
faces = {}
if type(res) == list:
for i, face in enumerate(res):
x = face['box'][0]
y = face['box'][1]
w = face['box'][2]
h = face['box'][3]
faces[f'face_{i+1}'] = { # standardize properties
'facial_area': [ x, y, x + w, y + h ],
'landmarks': {
'left_eye': list(face['keypoints']['left_eye']),
'right_eye': list(face['keypoints']['right_eye']),
},
'score': face['confidence'],
}
# Re-implementation of 'extract_faces' with the addition of keeping a
# copy of the face image for caching on disk
if type(faces) == dict:
for k, key in enumerate(faces):
print(f'Processing face {k+1}/{len(faces)}')
identity = faces[key]
facial_area = identity["facial_area"]
landmarks = identity["landmarks"]
left_eye = landmarks["left_eye"]
right_eye = landmarks["right_eye"]
# markup = True
markup = False
if markup == True: # Draw the face rectangle and eyes
cv2.rectangle(img,
(int(facial_area[0]), int(facial_area[1])),
(int(facial_area[2]), int(facial_area[3])),
(0, 0, 255), 2)
cv2.circle(img, (int(left_eye[0]), int(left_eye[1])), 5, (255, 0, 0), 2)
cv2.circle(img, (int(right_eye[0]), int(right_eye[1])), 5, (0, 255, 0), 2)
# Find center of face, then crop to square
# of equal width and height
width = facial_area[2] - facial_area[0]
height = facial_area[3] - facial_area[1]
x = facial_area[0] + width * 0.5
y = facial_area[1] + height * 0.5
# Make thumbnail a square crop
if width > height:
height = width
else:
width = height
#width *= 1.25
#height *= 1.25
left = max(round(x - width * 0.5), 0)
right = min(round(left + width), img.shape[1]) # Y is 1
top = max(round(y - height * 0.5), 0)
bottom = min(round(top + height), img.shape[0]) # X is 0
left_eye[0] -= top
left_eye[1] -= left
right_eye[0] -= top
right_eye[1] -= left
facial_img = img[top: bottom, left: right]
# Eye order is reversed as the routine does them backwards
image = Image.fromarray(facial_img)
image = alignment_procedure(image, right_eye, left_eye)
image = image.resize(size = input_shape, resample = Image.LANCZOS)
resized = np.asarray(image)
redirect_on()
identity['vector'] = DeepFace.represent(
img_path = resized,
model_name = model_name,
model = model, # pre-built
detector_backend = detector_backend,
enforce_detection = False)
redirect_off()
redirect_on()
identity["face"] = {
'top': facial_area[1] / img.shape[0],
'left': facial_area[0] / img.shape[1],
'bottom': facial_area[3] / img.shape[0],
'right': facial_area[2] / img.shape[1]
}
redirect_off()
identity['image'] = Image.fromarray(resized)
return faces
def create_connection(db_file):
""" create a database connection to the SQLite database
specified by db_file
:param db_file: database file
:return: Connection object or None
"""
conn = None
try:
conn = sqlite3.connect(db_file)
except Error as e:
print(e)
return conn
def create_face(conn, face):
"""
Create a new face in the faces table
:param conn:
:param face:
:return: face id
"""
sql = '''
INSERT INTO faces(photoId,scanVersion,faceConfidence,top,left,bottom,right,descriptorId)
VALUES(?,?,?,?,?,?,?,?)
'''
cur = conn.cursor()
cur.execute(sql, (
face['photoId'],
face['scanVersion'],
face['faceConfidence'],
face['top'],
face['left'],
face['bottom'],
face['right'],
face['descriptorId']
))
conn.commit()
return cur.lastrowid
def create_face_descriptor(conn, face):
"""
Create a new face in the faces table
:param conn:
:param face:
:return: descriptor id
"""
sql = '''
INSERT INTO facedescriptors(descriptors)
VALUES(?)
'''
cur = conn.cursor()
cur.execute(sql, (np.array(face['vector']),))
conn.commit()
return cur.lastrowid
def update_face_count(conn, photoId, faces):
"""
Update the number of faces that have been matched on a photo
:param conn:
:param photoId:
:param faces:
:return: None
"""
sql = '''
UPDATE photos SET faces=? WHERE id=?
'''
cur = conn.cursor()
cur.execute(sql, (faces, photoId))
conn.commit()
return None
base = '/pictures/'
conn = create_connection('../db/photos.db')
with conn:
cur = conn.cursor()
res = cur.execute('''
SELECT photos.id,photos.faces,albums.path,photos.filename FROM photos
LEFT JOIN albums ON (albums.id=photos.albumId)
WHERE photos.faces=-1
''')
rows = res.fetchall()
count = len(rows)
for i, row in enumerate(rows):
photoId, photoFaces, albumPath, photoFilename = row
img_path = f'{base}{albumPath}{photoFilename}'
print(f'Processing {i+1}/{count}: {img_path}')
img = Image.open(img_path)
img = ImageOps.exif_transpose(img) # auto-rotate if needed
img = img.convert()
img = np.asarray(img)
faces = extract_faces(img)
if faces is None:
print(f'Image no faces: {img_path}')
update_face_count(conn, photoId, 0)
continue
for j, key in enumerate(faces):
face = faces[key]
image = face['image']
print(f'Writing face {j+1}/{len(faces)}')
#face['analysis'] = DeepFace.analyze(img_path = img, actions = ['age', 'gender', 'race', 'emotion'], enforce_detection = False)
#face['analysis'] = DeepFace.analyze(img, actions = ['emotion'])
# TODO: Add additional meta-data allowing back referencing to original
# photo
face['version'] = 1 # version 1 doesn't add much...
data = {k: face[k] for k in set(list(face.keys())) - set(['image', 'facial_area', 'landmarks'])}
json_str = json.dumps(data, ensure_ascii=False, cls=NpEncoder)
faceDescriptorId = create_face_descriptor(conn, face)
faceId = create_face(conn, {
'photoId': photoId,
'scanVersion': face['version'],
'faceConfidence': face['score'],
'top': face['face']['top'],
'left': face['face']['left'],
'bottom': face['face']['bottom'],
'right': face['face']['right'],
'descriptorId': faceDescriptorId,
})
path = f'faces/{"{:02d}".format(faceId % 10)}'
try:
os.mkdir(path)
except FileExistsError:
pass
with open(f'{path}/{faceId}.json', 'w', encoding = 'utf-8') as f:
f.write(json_str)
compressed_str = zlib_uuencode(json_str.encode())
# Encode this data into the JPG as Exif
exif_ifd = {piexif.ExifIFD.UserComment: compressed_str}
exif_dict = {"0th": {}, "Exif": exif_ifd, "1st": {},
"thumbnail": None, "GPS": {}}
image.save(f'{path}/{faceId}.jpg', exif = piexif.dump(exif_dict))
update_face_count(conn, photoId, len(faces))