271 lines
8.3 KiB
Python
271 lines
8.3 KiB
Python
import sys
|
|
import json
|
|
import os
|
|
import piexif
|
|
|
|
import argparse
|
|
|
|
from PIL import Image, ImageOps
|
|
from deepface import DeepFace
|
|
import numpy as np
|
|
import cv2
|
|
|
|
from ketrface.util import *
|
|
from ketrface.db import *
|
|
from ketrface.config import *
|
|
|
|
config = read_config()
|
|
|
|
html_path = merge_config_path(config['path'], 'frontend')
|
|
pictures_path = merge_config_path(config['path'], config['picturesPath'])
|
|
faces_path = merge_config_path(config['path'], config['facesPath'])
|
|
db_path = merge_config_path(config['path'], config["db"]["photos"]["host"])
|
|
html_base = config['basePath']
|
|
|
|
model_name = 'VGG-Face'
|
|
detector_backend = 'mtcnn'
|
|
model = DeepFace.build_model(model_name)
|
|
|
|
# Derived from
|
|
# https://github.com/serengil/deepface/blob/master/deepface/detectors/MtcnnWrapper.py
|
|
# Add parameters to MTCNN
|
|
from mtcnn import MTCNN
|
|
face_detector = MTCNN(min_face_size = 30)
|
|
input_shape = DeepFace.functions.find_input_shape(model)
|
|
|
|
# Adapted from DeepFace
|
|
# https://github.com/serengil/deepface/blob/master/deepface/commons/functions.py
|
|
#
|
|
# Modified to use bicubic resampling and clip expansion, as well as to
|
|
# take a PIL Image instead of numpy array
|
|
def alignment_procedure(img, left_eye, right_eye):
|
|
"""
|
|
Given left and right eye coordinates in image, rotate around point
|
|
between eyes such that eyes are horizontal
|
|
:param img: Image (not np.array)
|
|
:param left_eye: Eye appearing on the left (right eye of person)
|
|
:param right_eye: Eye appearing on the right (left eye of person)
|
|
:return: adjusted image
|
|
"""
|
|
dY = right_eye[1] - left_eye[1]
|
|
dX = right_eye[0] - left_eye[0]
|
|
radians = np.arctan2(dY, dX)
|
|
rotation = 180 + 180 * radians / np.pi
|
|
|
|
if True:
|
|
img = img.rotate(
|
|
angle = rotation,
|
|
resample = Image.BICUBIC,
|
|
expand = True)
|
|
|
|
return img
|
|
|
|
def variance_of_laplacian(image):
|
|
# compute the Laplacian of the image and then return the focus
|
|
# measure, which is simply the variance of the Laplacian
|
|
return cv2.Laplacian(image, cv2.CV_64F).var()
|
|
|
|
def extract_faces(
|
|
img, threshold=0.95, allow_upscaling = True, focus_threshold = 100):
|
|
|
|
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # mtcnn expects RGB
|
|
|
|
redirect_on()
|
|
res = face_detector.detect_faces(img_rgb)
|
|
redirect_off()
|
|
|
|
if type(res) != list:
|
|
return None
|
|
|
|
faces = {}
|
|
for i, face in enumerate(res):
|
|
if threshold > face['confidence']:
|
|
continue
|
|
x = face['box'][0]
|
|
y = face['box'][1]
|
|
w = face['box'][2]
|
|
h = face['box'][3]
|
|
# If face is less than 2.5% of the image width and height,
|
|
# skip it (too small) -- filters out likely blurry faces in
|
|
# large group photos where the actual face may exceed
|
|
# min_face_size passed to MTCNN
|
|
if 0.025 > w / img.shape[0] and 0.025 > h / img.shape[1]:
|
|
print(f'Dropping due to small face size: {w / img.shape[0]} x {h / img.shape[1]}')
|
|
continue
|
|
faces[f'face_{i+1}'] = { # standardize properties
|
|
'facial_area': [ x, y, x + w, y + h ],
|
|
'landmarks': {
|
|
'left_eye': list(face['keypoints']['left_eye']),
|
|
'right_eye': list(face['keypoints']['right_eye']),
|
|
},
|
|
'score': face['confidence'],
|
|
}
|
|
|
|
# Re-implementation of 'extract_faces' with the addition of keeping a
|
|
# copy of the face image for caching on disk
|
|
for k, key in enumerate(faces):
|
|
print(f'Processing face {k+1}/{len(faces)}')
|
|
identity = faces[key]
|
|
identity['focus'] = 100 # Until laplacian variance is executed
|
|
|
|
facial_area = identity["facial_area"]
|
|
landmarks = identity["landmarks"]
|
|
left_eye = landmarks["left_eye"]
|
|
right_eye = landmarks["right_eye"]
|
|
# markup = True
|
|
markup = False
|
|
if markup == True: # Draw the face rectangle and eyes
|
|
cv2.rectangle(img,
|
|
(int(facial_area[0]), int(facial_area[1])),
|
|
(int(facial_area[2]), int(facial_area[3])),
|
|
(0, 0, 255), 2)
|
|
cv2.circle(img, (int(left_eye[0]), int(left_eye[1])), 5, (255, 0, 0), 2)
|
|
cv2.circle(img, (int(right_eye[0]), int(right_eye[1])), 5, (0, 255, 0), 2)
|
|
|
|
# Find center of face, then crop to square
|
|
# of equal width and height
|
|
width = facial_area[2] - facial_area[0]
|
|
height = facial_area[3] - facial_area[1]
|
|
x = facial_area[0] + width * 0.5
|
|
y = facial_area[1] + height * 0.5
|
|
|
|
# Make thumbnail a square crop
|
|
if width > height:
|
|
height = width
|
|
else:
|
|
width = height
|
|
|
|
#width *= 1.25
|
|
#height *= 1.25
|
|
|
|
left = max(round(x - width * 0.5), 0)
|
|
right = min(round(left + width), img.shape[1]) # Y is 1
|
|
top = max(round(y - height * 0.5), 0)
|
|
bottom = min(round(top + height), img.shape[0]) # X is 0
|
|
|
|
left_eye[0] -= top
|
|
left_eye[1] -= left
|
|
right_eye[0] -= top
|
|
right_eye[1] -= left
|
|
|
|
facial_img = img[top: bottom, left: right]
|
|
|
|
gray = cv2.cvtColor(facial_img, cv2.COLOR_BGR2GRAY)
|
|
focus = variance_of_laplacian(gray)
|
|
identity['focus'] = focus
|
|
|
|
# Eye order is reversed as the routine does them backwards
|
|
image = Image.fromarray(facial_img)
|
|
image = alignment_procedure(image, right_eye, left_eye)
|
|
image = image.resize(size = input_shape, resample = Image.LANCZOS)
|
|
resized = np.asarray(image)
|
|
|
|
redirect_on()
|
|
identity['vector'] = DeepFace.represent(
|
|
img_path = resized,
|
|
model_name = model_name,
|
|
model = model, # pre-built
|
|
detector_backend = detector_backend,
|
|
enforce_detection = False)
|
|
redirect_off()
|
|
|
|
identity["face"] = {
|
|
'top': facial_area[1] / img.shape[0],
|
|
'left': facial_area[0] / img.shape[1],
|
|
'bottom': facial_area[3] / img.shape[0],
|
|
'right': facial_area[2] / img.shape[1]
|
|
}
|
|
|
|
identity['image'] = Image.fromarray(resized)
|
|
|
|
return faces
|
|
|
|
|
|
parser = argparse.ArgumentParser(description = 'Detect faces in images.')
|
|
parser.add_argument(
|
|
'photos',
|
|
metavar = 'PHOTO',
|
|
type=int,
|
|
nargs='*',
|
|
help = 'PHOTO ID to scan (default: all unscanned photos)'
|
|
)
|
|
args = parser.parse_args()
|
|
print(args)
|
|
|
|
base = '/pictures/'
|
|
conn = create_connection('../db/photos.db')
|
|
with conn:
|
|
cur = conn.cursor()
|
|
res = cur.execute('''
|
|
SELECT photos.id,photos.faces,albums.path,photos.filename
|
|
FROM photos
|
|
LEFT JOIN albums ON (albums.id=photos.albumId)
|
|
WHERE photos.faces=-1 AND photos.duplicate=0
|
|
''')
|
|
rows = res.fetchall()
|
|
count = len(rows)
|
|
for i, row in enumerate(rows):
|
|
photoId, photoFaces, albumPath, photoFilename = row
|
|
img_path = f'{base}{albumPath}{photoFilename}'
|
|
print(f'Processing {i+1}/{count}: photoId = {photoId}: {img_path}')
|
|
try:
|
|
img = Image.open(img_path)
|
|
img = ImageOps.exif_transpose(img) # auto-rotate if needed
|
|
img = img.convert("RGB") # Catch "RGBA" and convert to 3-channel
|
|
except:
|
|
print(f'Unable to load / process {img_path}. Skipping.')
|
|
continue
|
|
|
|
img = np.asarray(img)
|
|
faces = extract_faces(img)
|
|
if faces is None:
|
|
print(f'Image no faces: {img_path}')
|
|
update_face_count(conn, photoId, 0)
|
|
continue
|
|
for j, key in enumerate(faces):
|
|
face = faces[key]
|
|
image = face['image']
|
|
print(f'Writing face {j+1}/{len(faces)}')
|
|
|
|
face['version'] = 1 # version 1 doesn't add much...
|
|
|
|
data = {k: face[k] for k in set(list(face.keys())) - set(['image', 'facial_area', 'landmarks'])}
|
|
json_str = json.dumps(data, ensure_ascii=False, cls=NpEncoder)
|
|
faceDescriptorId = create_face_descriptor(conn, face)
|
|
|
|
faceId = create_face(conn, {
|
|
'photoId': photoId,
|
|
'scanVersion': face['version'],
|
|
'faceConfidence': face['score'],
|
|
'focus': face['focus'],
|
|
'top': face['face']['top'],
|
|
'left': face['face']['left'],
|
|
'bottom': face['face']['bottom'],
|
|
'right': face['face']['right'],
|
|
'descriptorId': faceDescriptorId,
|
|
})
|
|
|
|
print(f'Face added to database with faceId = {faceId}')
|
|
|
|
path = f'{faces_path}/{"{:02d}".format(faceId % 100)}'
|
|
try:
|
|
os.makedirs(path)
|
|
except FileExistsError:
|
|
pass
|
|
|
|
with open(f'{path}/{faceId}.json', 'w', encoding = 'utf-8') as f:
|
|
f.write(json_str)
|
|
|
|
compressed_str = zlib_uuencode(json_str.encode())
|
|
|
|
# Encode this data into the JPG as Exif
|
|
exif_ifd = {piexif.ExifIFD.UserComment: compressed_str}
|
|
exif_dict = {"0th": {}, "Exif": exif_ifd, "1st": {},
|
|
"thumbnail": None, "GPS": {}}
|
|
image.save(
|
|
f'{path}/{faceId}.jpg',
|
|
quality = 'maximum',
|
|
exif = piexif.dump(exif_dict))
|
|
|
|
update_face_count(conn, photoId, len(faces))
|