Compare commits
2 Commits
4263d18fec
...
798712aa7f
Author | SHA1 | Date | |
---|---|---|---|
798712aa7f | |||
6847a35ca5 |
127
server/detect.py
127
server/detect.py
@ -1,15 +1,49 @@
|
|||||||
import sys
|
import sys
|
||||||
|
import zlib
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import piexif
|
import piexif
|
||||||
import sqlite3
|
import sqlite3
|
||||||
from sqlite3 import Error
|
from sqlite3 import Error
|
||||||
from PIL import Image
|
from PIL import Image, ImageOps
|
||||||
from deepface import DeepFace
|
from deepface import DeepFace
|
||||||
|
from deepface.detectors import FaceDetector
|
||||||
from retinaface import RetinaFace
|
from retinaface import RetinaFace
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import cv2
|
import cv2
|
||||||
|
|
||||||
|
import uu
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
original = None
|
||||||
|
|
||||||
|
def redirect_on():
|
||||||
|
global original
|
||||||
|
if original == None:
|
||||||
|
original = sys.stdout
|
||||||
|
sys.stdout = open(os.devnull, 'w')
|
||||||
|
|
||||||
|
def redirect_off():
|
||||||
|
global original
|
||||||
|
if original != None:
|
||||||
|
sys.stdout.close()
|
||||||
|
sys.stdout = original
|
||||||
|
original = None
|
||||||
|
|
||||||
|
def zlib_uuencode(databytes, name='<data>'):
|
||||||
|
''' Compress databytes with zlib & uuencode the result '''
|
||||||
|
inbuff = BytesIO(zlib.compress(databytes, 9))
|
||||||
|
outbuff = BytesIO()
|
||||||
|
uu.encode(inbuff, outbuff, name=name)
|
||||||
|
return outbuff.getvalue()
|
||||||
|
|
||||||
|
def zlib_uudecode(databytes):
|
||||||
|
''' uudecode databytes and decompress the result with zlib '''
|
||||||
|
inbuff = BytesIO(databytes)
|
||||||
|
outbuff = BytesIO()
|
||||||
|
uu.decode(inbuff, outbuff)
|
||||||
|
return zlib.decompress(outbuff.getvalue())
|
||||||
|
|
||||||
class NpEncoder(json.JSONEncoder):
|
class NpEncoder(json.JSONEncoder):
|
||||||
def default(self, obj):
|
def default(self, obj):
|
||||||
if isinstance(obj, np.integer):
|
if isinstance(obj, np.integer):
|
||||||
@ -21,7 +55,10 @@ class NpEncoder(json.JSONEncoder):
|
|||||||
|
|
||||||
models = ["VGG-Face", "Facenet", "Facenet512", "OpenFace", "DeepFace", "DeepID", "ArcFace", "Dlib", "SFace"]
|
models = ["VGG-Face", "Facenet", "Facenet512", "OpenFace", "DeepFace", "DeepID", "ArcFace", "Dlib", "SFace"]
|
||||||
|
|
||||||
model = DeepFace.build_model('ArcFace')
|
model_name = 'VGG-Face' # 'ArcFace'
|
||||||
|
detector_backend = 'mtcnn' # 'retinaface'
|
||||||
|
model = DeepFace.build_model(model_name)
|
||||||
|
face_detector = FaceDetector.build_model(detector_backend)
|
||||||
input_shape = DeepFace.functions.find_input_shape(model)
|
input_shape = DeepFace.functions.find_input_shape(model)
|
||||||
|
|
||||||
|
|
||||||
@ -42,7 +79,7 @@ def alignment_procedure(img, left_eye, right_eye):
|
|||||||
dY = right_eye[1] - left_eye[1]
|
dY = right_eye[1] - left_eye[1]
|
||||||
dX = right_eye[0] - left_eye[0]
|
dX = right_eye[0] - left_eye[0]
|
||||||
radians = np.arctan2(dY, dX)
|
radians = np.arctan2(dY, dX)
|
||||||
rotation = 180 * radians / np.pi
|
rotation = 180 + 180 * radians / np.pi
|
||||||
|
|
||||||
if True:
|
if True:
|
||||||
img = img.rotate(
|
img = img.rotate(
|
||||||
@ -52,27 +89,50 @@ def alignment_procedure(img, left_eye, right_eye):
|
|||||||
|
|
||||||
return img
|
return img
|
||||||
|
|
||||||
def extract_faces(img, threshold=0.75, model = None, allow_upscaling = True):
|
def extract_faces(img, threshold=0.75, allow_upscaling = True):
|
||||||
faces = RetinaFace.detect_faces(
|
if detector_backend == 'retinaface':
|
||||||
img_path = img,
|
faces = RetinaFace.detect_faces(
|
||||||
threshold = threshold,
|
img_path = img,
|
||||||
model = model,
|
threshold = threshold,
|
||||||
allow_upscaling = allow_upscaling)
|
model = model,
|
||||||
|
allow_upscaling = allow_upscaling)
|
||||||
|
elif detector_backend == 'mtcnn':
|
||||||
|
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) # mtcnn expects RGB
|
||||||
|
|
||||||
|
redirect_on()
|
||||||
|
res = face_detector.detect_faces(img_rgb)
|
||||||
|
redirect_off()
|
||||||
|
|
||||||
|
faces = {}
|
||||||
|
if type(res) == list:
|
||||||
|
for i, face in enumerate(res):
|
||||||
|
x = face['box'][0]
|
||||||
|
y = face['box'][1]
|
||||||
|
w = face['box'][2]
|
||||||
|
h = face['box'][3]
|
||||||
|
faces[f'face_{i+1}'] = { # standardize properties
|
||||||
|
'facial_area': [ x, y, x + w, y + h ],
|
||||||
|
'landmarks': {
|
||||||
|
'left_eye': list(face['keypoints']['left_eye']),
|
||||||
|
'right_eye': list(face['keypoints']['right_eye']),
|
||||||
|
},
|
||||||
|
'score': face['confidence'],
|
||||||
|
}
|
||||||
|
|
||||||
# Re-implementation of 'extract_faces' with the addition of keeping a
|
# Re-implementation of 'extract_faces' with the addition of keeping a
|
||||||
# copy of the face image for caching on disk
|
# copy of the face image for caching on disk
|
||||||
if type(faces) == dict:
|
if type(faces) == dict:
|
||||||
k=1
|
for k, key in enumerate(faces):
|
||||||
for key in faces:
|
print(f'Processing face {k+1}/{len(faces)}')
|
||||||
print(f'Processing face {k}/{len(faces)}')
|
|
||||||
k+=1
|
|
||||||
identity = faces[key]
|
identity = faces[key]
|
||||||
facial_area = identity["facial_area"]
|
facial_area = identity["facial_area"]
|
||||||
landmarks = identity["landmarks"]
|
landmarks = identity["landmarks"]
|
||||||
left_eye = landmarks["left_eye"]
|
left_eye = landmarks["left_eye"]
|
||||||
right_eye = landmarks["right_eye"]
|
right_eye = landmarks["right_eye"]
|
||||||
|
|
||||||
if False: # Draw the face rectangle and eyes
|
# markup = True
|
||||||
|
markup = False
|
||||||
|
if markup == True: # Draw the face rectangle and eyes
|
||||||
cv2.rectangle(img,
|
cv2.rectangle(img,
|
||||||
(int(facial_area[0]), int(facial_area[1])),
|
(int(facial_area[0]), int(facial_area[1])),
|
||||||
(int(facial_area[2]), int(facial_area[3])),
|
(int(facial_area[2]), int(facial_area[3])),
|
||||||
@ -114,27 +174,28 @@ def extract_faces(img, threshold=0.75, model = None, allow_upscaling = True):
|
|||||||
image = image.resize(size = input_shape, resample = Image.LANCZOS)
|
image = image.resize(size = input_shape, resample = Image.LANCZOS)
|
||||||
resized = np.asarray(image)
|
resized = np.asarray(image)
|
||||||
|
|
||||||
|
redirect_on()
|
||||||
identity['vector'] = DeepFace.represent(
|
identity['vector'] = DeepFace.represent(
|
||||||
img_path = resized,
|
img_path = resized,
|
||||||
model_name = 'ArcFace',
|
model_name = model_name,
|
||||||
model = model, # pre-built
|
model = model, # pre-built
|
||||||
detector_backend = 'retinaface',
|
detector_backend = detector_backend,
|
||||||
enforce_detection = False)
|
enforce_detection = False)
|
||||||
|
redirect_off()
|
||||||
|
|
||||||
|
redirect_on()
|
||||||
identity["face"] = {
|
identity["face"] = {
|
||||||
'top': facial_area[1] / img.shape[0],
|
'top': facial_area[1] / img.shape[0],
|
||||||
'left': facial_area[0] / img.shape[1],
|
'left': facial_area[0] / img.shape[1],
|
||||||
'bottom': facial_area[3] / img.shape[0],
|
'bottom': facial_area[3] / img.shape[0],
|
||||||
'right': facial_area[2] / img.shape[1]
|
'right': facial_area[2] / img.shape[1]
|
||||||
}
|
}
|
||||||
|
redirect_off()
|
||||||
|
|
||||||
identity['image'] = Image.fromarray(resized)
|
identity['image'] = Image.fromarray(resized)
|
||||||
|
|
||||||
return faces
|
return faces
|
||||||
|
|
||||||
#face verification
|
|
||||||
#img_path = sys.argv[1]
|
|
||||||
|
|
||||||
def create_connection(db_file):
|
def create_connection(db_file):
|
||||||
""" create a database connection to the SQLite database
|
""" create a database connection to the SQLite database
|
||||||
specified by db_file
|
specified by db_file
|
||||||
@ -217,25 +278,23 @@ with conn:
|
|||||||
''')
|
''')
|
||||||
rows = res.fetchall()
|
rows = res.fetchall()
|
||||||
count = len(rows)
|
count = len(rows)
|
||||||
i=1
|
for i, row in enumerate(rows):
|
||||||
for row in rows:
|
|
||||||
photoId, photoFaces, albumPath, photoFilename = row
|
photoId, photoFaces, albumPath, photoFilename = row
|
||||||
img_path = f'{base}{albumPath}{photoFilename}'
|
img_path = f'{base}{albumPath}{photoFilename}'
|
||||||
print(f'Processing {i}/{count}: {img_path}')
|
print(f'Processing {i+1}/{count}: {img_path}')
|
||||||
i+=1
|
|
||||||
img = Image.open(img_path)
|
img = Image.open(img_path)
|
||||||
|
img = ImageOps.exif_transpose(img) # auto-rotate if needed
|
||||||
img = img.convert()
|
img = img.convert()
|
||||||
img = np.asarray(img)
|
img = np.asarray(img)
|
||||||
faces = extract_faces(img)
|
faces = extract_faces(img)
|
||||||
if faces is None:
|
if faces is None:
|
||||||
|
print(f'Image no faces: {img_path}')
|
||||||
update_face_count(conn, photoId, 0)
|
update_face_count(conn, photoId, 0)
|
||||||
continue
|
continue
|
||||||
j=1
|
for j, key in enumerate(faces):
|
||||||
for key in faces:
|
|
||||||
face = faces[key]
|
face = faces[key]
|
||||||
image = face['image']
|
image = face['image']
|
||||||
print(f'Writing face {j}/{len(faces)}')
|
print(f'Writing face {j+1}/{len(faces)}')
|
||||||
j+=1
|
|
||||||
|
|
||||||
#face['analysis'] = DeepFace.analyze(img_path = img, actions = ['age', 'gender', 'race', 'emotion'], enforce_detection = False)
|
#face['analysis'] = DeepFace.analyze(img_path = img, actions = ['age', 'gender', 'race', 'emotion'], enforce_detection = False)
|
||||||
#face['analysis'] = DeepFace.analyze(img, actions = ['emotion'])
|
#face['analysis'] = DeepFace.analyze(img, actions = ['emotion'])
|
||||||
@ -245,8 +304,7 @@ with conn:
|
|||||||
face['version'] = 1 # version 1 doesn't add much...
|
face['version'] = 1 # version 1 doesn't add much...
|
||||||
|
|
||||||
data = {k: face[k] for k in set(list(face.keys())) - set(['image', 'facial_area', 'landmarks'])}
|
data = {k: face[k] for k in set(list(face.keys())) - set(['image', 'facial_area', 'landmarks'])}
|
||||||
json_str = json.dumps(data, ensure_ascii=False, indent=2, cls=NpEncoder)
|
json_str = json.dumps(data, ensure_ascii=False, cls=NpEncoder)
|
||||||
|
|
||||||
faceDescriptorId = create_face_descriptor(conn, face)
|
faceDescriptorId = create_face_descriptor(conn, face)
|
||||||
|
|
||||||
faceId = create_face(conn, {
|
faceId = create_face(conn, {
|
||||||
@ -260,7 +318,7 @@ with conn:
|
|||||||
'descriptorId': faceDescriptorId,
|
'descriptorId': faceDescriptorId,
|
||||||
})
|
})
|
||||||
|
|
||||||
path = f'faces/{faceId % 10}'
|
path = f'faces/{"{:02d}".format(faceId % 10)}'
|
||||||
try:
|
try:
|
||||||
os.mkdir(path)
|
os.mkdir(path)
|
||||||
except FileExistsError:
|
except FileExistsError:
|
||||||
@ -269,13 +327,12 @@ with conn:
|
|||||||
with open(f'{path}/{faceId}.json', 'w', encoding = 'utf-8') as f:
|
with open(f'{path}/{faceId}.json', 'w', encoding = 'utf-8') as f:
|
||||||
f.write(json_str)
|
f.write(json_str)
|
||||||
|
|
||||||
|
compressed_str = zlib_uuencode(json_str.encode())
|
||||||
|
|
||||||
# Encode this data into the JPG as Exif
|
# Encode this data into the JPG as Exif
|
||||||
exif_ifd = {piexif.ExifIFD.UserComment: json_str.encode()}
|
exif_ifd = {piexif.ExifIFD.UserComment: compressed_str}
|
||||||
exif_dict = {"0th": {}, "Exif": exif_ifd, "1st": {},
|
exif_dict = {"0th": {}, "Exif": exif_ifd, "1st": {},
|
||||||
"thumbnail": None, "GPS": {}}
|
"thumbnail": None, "GPS": {}}
|
||||||
image.save(f'{path}/{faceId}.jpg', exif = piexif.dump(exif_dict))
|
image.save(f'{path}/{faceId}.jpg', exif = piexif.dump(exif_dict))
|
||||||
|
|
||||||
#df = DeepFace.find(img, db_path = '/db')
|
|
||||||
#print(df.head())
|
|
||||||
|
|
||||||
update_face_count(conn, photoId, len(faces))
|
update_face_count(conn, photoId, len(faces))
|
||||||
|
25
server/headers.py
Normal file
25
server/headers.py
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
import sys
|
||||||
|
import zlib
|
||||||
|
import json
|
||||||
|
import piexif
|
||||||
|
from PIL import Image
|
||||||
|
import uu
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
def zlib_uudecode(databytes):
|
||||||
|
''' uudecode databytes and decompress the result with zlib '''
|
||||||
|
inbuff = BytesIO(databytes)
|
||||||
|
outbuff = BytesIO()
|
||||||
|
uu.decode(inbuff, outbuff)
|
||||||
|
return zlib.decompress(outbuff.getvalue())
|
||||||
|
|
||||||
|
faceId = int(sys.argv[1])
|
||||||
|
path = f'faces/{"{:02d}".format(faceId % 10)}'
|
||||||
|
|
||||||
|
img = Image.open(f'{path}/{faceId}.jpg')
|
||||||
|
exif_dict = piexif.load(img.info["exif"])
|
||||||
|
compressed_str = exif_dict["Exif"][piexif.ExifIFD.UserComment]
|
||||||
|
|
||||||
|
str = zlib_uudecode(compressed_str)
|
||||||
|
json = json.loads(str)
|
||||||
|
print(json)
|
Loading…
x
Reference in New Issue
Block a user