import sys import zlib import json import os import piexif import sqlite3 from sqlite3 import Error from PIL import Image, ImageOps from deepface import DeepFace from deepface.detectors import FaceDetector from retinaface import RetinaFace import numpy as np import cv2 import uu from io import BytesIO def zlib_uuencode(databytes, name=''): ''' Compress databytes with zlib & uuencode the result ''' inbuff = BytesIO(zlib.compress(databytes, 9)) outbuff = BytesIO() uu.encode(inbuff, outbuff, name=name) return outbuff.getvalue() def zlib_uudecode(databytes): ''' uudecode databytes and decompress the result with zlib ''' inbuff = BytesIO(databytes) outbuff = BytesIO() uu.decode(inbuff, outbuff) return zlib.decompress(outbuff.getvalue()) class NpEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): return float(obj) if isinstance(obj, np.ndarray): return obj.tolist() models = ["VGG-Face", "Facenet", "Facenet512", "OpenFace", "DeepFace", "DeepID", "ArcFace", "Dlib", "SFace"] model_name = 'VGG-Face' # 'ArcFace' detector_backend = 'mtcnn' # 'retinaface' model = DeepFace.build_model(model_name) face_detector = FaceDetector.build_model(detector_backend) input_shape = DeepFace.functions.find_input_shape(model) # Adapted from DeepFace # https://github.com/serengil/deepface/blob/master/deepface/commons/functions.py # # Modified to use bicubic resampling and clip expansion, as well as to # take a PIL Image instead of numpy array def alignment_procedure(img, left_eye, right_eye): """ Given left and right eye coordinates in image, rotate around point between eyes such that eyes are horizontal :param img: Image (not np.array) :param left_eye: Eye appearing on the left (right eye of person) :param right_eye: Eye appearing on the right (left eye of person) :return: adjusted image """ dY = right_eye[1] - left_eye[1] dX = right_eye[0] - left_eye[0] radians = np.arctan2(dY, dX) rotation = 180 * radians / np.pi if True: img = img.rotate( angle = rotation, resample = Image.BICUBIC, expand = True) return img def extract_faces(img, threshold=0.75, allow_upscaling = True): if detector_backend == 'retinaface': faces = RetinaFace.detect_faces( img_path = img, threshold = threshold, model = model, allow_upscaling = allow_upscaling) elif detector_backend == 'mtcnn': img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # mtcnn expects RGB res = face_detector.detect_faces(img_rgb) faces = {} if type(res) == list: for i, face in enumerate(res): faces[f'face_{i+1}'] = { # standardize properties 'facial_area': [ face['box'][1], face['box'][1] + face['box'][3], face['box'][0], face['box'][0] + face['box'][2], ], 'landmarks': { 'left_eye': list(face['keypoints']['left_eye']), 'right_eye': list(face['keypoints']['right_eye']), }, 'score': face['confidence'], } print(face, faces[f'face_{i+1}']) # Re-implementation of 'extract_faces' with the addition of keeping a # copy of the face image for caching on disk if type(faces) == dict: for k, key in enumerate(faces): print(f'Processing face {k+1}/{len(faces)}') identity = faces[key] facial_area = identity["facial_area"] landmarks = identity["landmarks"] left_eye = landmarks["left_eye"] right_eye = landmarks["right_eye"] markup = True # markup = False if markup == True: # Draw the face rectangle and eyes cv2.rectangle(img, (int(facial_area[0]), int(facial_area[1])), (int(facial_area[2]), int(facial_area[3])), (0, 0, 255), 2) cv2.circle(img, (int(left_eye[0]), int(left_eye[1])), 5, (255, 0, 0), 2) cv2.circle(img, (int(right_eye[0]), int(right_eye[1])), 5, (0, 255, 0), 2) # Find center of face, then crop to square # of equal width and height width = facial_area[2] - facial_area[0] height = facial_area[3] - facial_area[1] x = facial_area[0] + width * 0.5 y = facial_area[1] + height * 0.5 # Make thumbnail a square crop if width > height: height = width else: width = height #width *= 1.25 #height *= 1.25 left = max(round(x - width * 0.5), 0) right = min(round(left + width), img.shape[1]) # Y is 1 top = max(round(y - height * 0.5), 0) bottom = min(round(top + height), img.shape[0]) # X is 0 left_eye[0] -= top left_eye[1] -= left right_eye[0] -= top right_eye[1] -= left facial_img = img[top: bottom, left: right] # Eye order is reversed as the routine does them backwards image = Image.fromarray(facial_img) image = alignment_procedure(image, right_eye, left_eye) image = image.resize(size = input_shape, resample = Image.LANCZOS) resized = np.asarray(image) identity['vector'] = DeepFace.represent( img_path = resized, model_name = model_name, model = model, # pre-built detector_backend = detector_backend, enforce_detection = False) identity["face"] = { 'top': facial_area[1] / img.shape[0], 'left': facial_area[0] / img.shape[1], 'bottom': facial_area[3] / img.shape[0], 'right': facial_area[2] / img.shape[1] } identity['image'] = Image.fromarray(resized) return faces #face verification #img_path = sys.argv[1] def create_connection(db_file): """ create a database connection to the SQLite database specified by db_file :param db_file: database file :return: Connection object or None """ conn = None try: conn = sqlite3.connect(db_file) except Error as e: print(e) return conn def create_face(conn, face): """ Create a new face in the faces table :param conn: :param face: :return: face id """ sql = ''' INSERT INTO faces(photoId,scanVersion,faceConfidence,top,left,bottom,right,descriptorId) VALUES(?,?,?,?,?,?,?,?) ''' cur = conn.cursor() cur.execute(sql, ( face['photoId'], face['scanVersion'], face['faceConfidence'], face['top'], face['left'], face['bottom'], face['right'], face['descriptorId'] )) conn.commit() return cur.lastrowid def create_face_descriptor(conn, face): """ Create a new face in the faces table :param conn: :param face: :return: descriptor id """ sql = ''' INSERT INTO facedescriptors(descriptors) VALUES(?) ''' cur = conn.cursor() cur.execute(sql, (np.array(face['vector']),)) conn.commit() return cur.lastrowid def update_face_count(conn, photoId, faces): """ Update the number of faces that have been matched on a photo :param conn: :param photoId: :param faces: :return: None """ sql = ''' UPDATE photos SET faces=? WHERE id=? ''' cur = conn.cursor() cur.execute(sql, (faces, photoId)) conn.commit() return None base = '/pictures/' conn = create_connection('../db/photos.db') with conn: cur = conn.cursor() res = cur.execute(''' SELECT photos.id,photos.faces,albums.path,photos.filename FROM photos LEFT JOIN albums ON (albums.id=photos.albumId) WHERE photos.faces=-1 ''') rows = res.fetchall() count = len(rows) for i, row in enumerate(rows): photoId, photoFaces, albumPath, photoFilename = row img_path = f'{base}{albumPath}{photoFilename}' print(f'Processing {i+1}/{count}: {img_path}') img = Image.open(img_path) img = ImageOps.exif_transpose(img) # auto-rotate if needed img = img.convert() img = np.asarray(img) faces = extract_faces(img) if faces is None: print(f'Image no faces: {img_path}') update_face_count(conn, photoId, 0) continue for j, key in enumerate(faces): face = faces[key] image = face['image'] print(f'Writing face {j+1}/{len(faces)}') #face['analysis'] = DeepFace.analyze(img_path = img, actions = ['age', 'gender', 'race', 'emotion'], enforce_detection = False) #face['analysis'] = DeepFace.analyze(img, actions = ['emotion']) # TODO: Add additional meta-data allowing back referencing to original # photo face['version'] = 1 # version 1 doesn't add much... data = {k: face[k] for k in set(list(face.keys())) - set(['image', 'facial_area', 'landmarks'])} json_str = json.dumps(data, ensure_ascii=False, cls=NpEncoder) faceDescriptorId = create_face_descriptor(conn, face) faceId = create_face(conn, { 'photoId': photoId, 'scanVersion': face['version'], 'faceConfidence': face['score'], 'top': face['face']['top'], 'left': face['face']['left'], 'bottom': face['face']['bottom'], 'right': face['face']['right'], 'descriptorId': faceDescriptorId, }) path = f'faces/{"{:02d}".format(faceId % 10)}' try: os.mkdir(path) except FileExistsError: pass with open(f'{path}/{faceId}.json', 'w', encoding = 'utf-8') as f: f.write(json_str) compressed_str = zlib_uuencode(json_str.encode()) # Encode this data into the JPG as Exif exif_ifd = {piexif.ExifIFD.UserComment: compressed_str} exif_dict = {"0th": {}, "Exif": exif_ifd, "1st": {}, "thumbnail": None, "GPS": {}} image.save(f'{path}/{faceId}.jpg', exif = piexif.dump(exif_dict)) exit(1) update_face_count(conn, photoId, len(faces))