ketr.photos/server/scanner.js
James Ketrenos 6f51d5dc4d Adding face-recognition backend
Signed-off-by: James Ketrenos <james_gitlab@ketrenos.com>
2020-01-03 15:31:02 -08:00

773 lines
26 KiB
JavaScript
Executable File

/**
* scanner
*
* Face recognition:
* 1. For each photo, extract all faces. Store face rectangles.
* face_id unique
* photo_id foreign key
* top left bottom right
* identity_id
* distance (0 == truth; manually assigned identity)
* 2. For each face_id, create:
* normalized_file
* original_file
* 128 float
*/
"use strict";
/* meta directories are not scanned for photos */
const metaDirectories = [ "thumbs", "raw", "face-data", ".git", "corrupt" ];
const Promise = require("bluebird"),
fs = require("fs"),
config = require("config"),
moment = require("moment"),
crypto = require("crypto"),
{ stat, mkdir, exists } = require("./lib/util");
let photoDB = null;
const picturesPath = config.get("picturesPath").replace(/\/$/, "") + "/";
let processQueue = [], triedClean = [], lastScan = new Date("1800-01-01");
//const rawExtension = /\.(nef|orf)$/i, extensions = [ "jpg", "jpeg", "png", "gif", "nef", "orf" ];
const rawExtension = /\.nef$/i, extensions = [ "jpg", "jpeg", "png", "gif", "nef" ];
function removeNewerFile(path, fileA, fileB) {
fs.stat(path + fileA, function(err, statsA) {
if (err) {
return;
}
fs.stat(path + fileB, function(err, statsB) {
if (err) {
return;
}
if (statsA.mtime > statsB.mtime) {
setStatus("Removing file by moving to 'corrupt':" + fileA);
moveCorrupt(path, fileA);
} else {
setStatus("Removing file by moving to 'corrupt':" + fileB);
moveCorrupt(path, fileB);
}
});
});
}
let processRunning = false;
const { spawn } = require('child_process');
const sharp = require("sharp"), exif = require("exif-reader");
function convertRawToJpg(path, file) {
setStatus("Converting " + path + file);
path = picturesPath + path;
return new Promise(function(resolve, reject) {
return exists(path + file.replace(rawExtension, ".jpg")).then(function(exist) {
if (exist) {
setStatus("Skipping already converted file: " + file);
return;
}
const ufraw = spawn("ufraw-batch", [
"--silent",
"--wb=camera",
"--rotate=camera",
"--out-type=jpg",
"--compression=90",
"--exif",
"--overwrite",
"--output", path + file.replace(rawExtension, ".jpg"),
path + file
]);
const stderr = [];
ufraw.stderr.on('data', function(data) {
stderr.push(data);
});
return new Promise(function(ufraw, resolve, reject) {
ufraw.on('exit', function(stderr, code, signal) {
if (signal || code != 0) {
let error = "UFRAW for " + path + file + " returned an error: " + code + "\n" + signal + "\n" + stderr.join("\n") + "\n";
setStatus(error, "error");
return moveCorrupt(path, file).then(function() {
setStatus("ufraw failed", "warn");
return reject(error);
}).catch(function(error) {
setStatus("moveCorrupt failed", "warn");
return reject(error);
});
}
return mkdir(path + "raw").then(function() {
fs.rename(path + file, path + "raw/" + file, function(err) {
if (err) {
setStatus("Unable to move RAW file: " + path + file, "error");
return reject(err);
}
return resolve();
});
}).catch(function(error) {
setStatus("mkdir failed", "warn");
return reject(error);
});
}.bind(this, ufraw));
}.bind(this, stderr));
});
});
}
function moveCorrupt(path, file) {
if (path.indexOf(picturesPath) != 0) {
path = picturesPath + path;
}
setStatus("Moving corrupt file '" + file + "' to " + path + "corrupt", "warn");
return mkdir(path + "corrupt").then(function() {
return new Promise(function(resolve, reject) {
fs.rename(path + file, path + "corrupt/" + file, function(err) {
if (err) {
setStatus("Unable to move corrupt file: " + path + file, "error");
return reject(err);
}
return resolve();
});
});
});
}
function processBlock(items) {
if (items) {
processQueue = processQueue.concat(items);
}
if (processRunning) {
/* Invoke once per second to check if there are items to process */
setTimeout(processBlock, 1000);
return;
}
let processing = processQueue.splice(0), needsProcessing = [], duplicates = [];
processRunning = true;
/* Sort to newest files to be processed first */
processing.sort(function(a, b) {
return b.stats.mtime - a.stats.mtime;
});
let toProcess = processing.length, lastMessage = moment();
setStatus("Items to be processed: " + toProcess);
return Promise.mapSeries(processing, function(asset) {
return computeHash(picturesPath + asset.album.path + asset.filename).then(function(hash) {
asset.hash = hash;
return asset;
}).then(function(asset) {
return photoDB.sequelize.query("SELECT photohashes.*,photos.filename,albums.path FROM photohashes " +
"LEFT JOIN photos ON (photos.id=photohashes.photoId) " +
"LEFT JOIN albums ON (albums.id=photos.albumId) " +
"WHERE hash=:hash OR photoId=:id", {
replacements: asset,
type: photoDB.sequelize.QueryTypes.SELECT
}).then(function(results) {
let query;
if (results.length == 0) {
query = "INSERT INTO photohashes (hash,photoId) VALUES(:hash,:id)";
} else if (results[0].hash != asset.hash) {
query = "UPDATE photohashes SET hash=:hash WHERE photoId=:id";
} else if (results[0].photoId != asset.id) {
setStatus("Duplicate asset: " +
"'" + asset.album.path + asset.filename + "' is a copy of " +
"'" + results[0].path + results[0].filename + "'");
if (asset.duplicate != results[0].photoId) {
asset.duplicate = results[0].photoId;
duplicates.push(asset);
}
return null;
}
/* Even if the hash doesn't need to be updated, the entry needs to be scanned */
// console.log("process needed because of " + query);
needsProcessing.push(asset);
if (!query) {
return asset;
}
return photoDB.sequelize.query(query, {
replacements: asset,
}).then(function() {
return asset;
});
});
}).then(function(asset) {
if (!asset) { /* The processed entry is a DUPLICATE. Skip it. */
return;
}
var path = asset.album.path,
file = asset.filename,
created = asset.stats.mtime,
albumId = asset.album.id;
let tmp = Promise.resolve(file);
/* If this is a Nikon RAW file, convert it to JPG and move to /raw dir */
if (rawExtension.exec(file)) {
tmp = exists(picturesPath + path + file.replace(rawExtension, ".jpg")).then(function(exist) {
if (exist) {
return file.replace(rawExtension, ".jpg"); /* We converted from NEF/ORF => JPG */
}
return mkdir(picturesPath + path + "raw").then(function() {
return convertRawToJpg(path, file);
}).then(function() {
return file.replace(rawExtension, ".jpg"); /* We converted from NEF/ORF => JPG */
});
});
}
return tmp.then(function(file) {
var src = picturesPath + path + file,
image = sharp(src);
return image.limitInputPixels(1073741824).metadata().then(function(metadata) {
if (metadata.exif) {
metadata.exif = exif(metadata.exif);
delete metadata.exif.thumbnail;
delete metadata.exif.image;
for (var key in metadata.exif.exif) {
if (Buffer.isBuffer(metadata.exif.exif[key])) {
metadata.exif.exif[key] = "Buffer[" + metadata.exif.exif[key].length + "]";
}
}
}
asset.width = metadata.width;
asset.height = metadata.height;
asset.added = moment().format();
if (metadata.exif && metadata.exif.exif && metadata.exif.exif.DateTimeOriginal && !isNaN(metadata.exif.exif.DateTimeOriginal.valueOf())) {
asset.taken = moment(metadata.exif.exif.DateTimeOriginal).format();
asset.modified = moment(metadata.exif.exif.DateTimeOriginal).format();
if (asset.taken == "Invalid date" || asset.taken.replace(/T.*/, "") == "1899-11-30") {
setStatus("Invalid EXIF date information for " + asset.album.path + asset.filename);
asset.taken = asset.modified = moment(created).format();
}
} else {
/* Attempt to infer the datestamp from the filename */
let date = moment(created).format();
let match = file.match(/WhatsApp Image (20[0-9][0-9]-[0-9][0-9]-[0-9][0-9]) at (.*).(jpeg|jpg)/);
if (match) {
date = moment((match[1]+" "+match[2]), "YYYY-MM-DD h.mm.ss a").format();
if (date == "Invalid date") {
date = moment(created).format();
}
} else {
match = file.match(/(20[0-9][0-9]-?[0-9][0-9]-?[0-9][0-9])[_\-]?([0-9]{6})?/);
if (match) {
if (match[2]) { /* Stamp had time in it */
date = moment((match[1]+""+match[2]).replace(/-/g, ""), "YYYYMMDDHHmmss").format();
} else {
date = moment(match[1].replace(/-/g, ""), "YYYYMMDD").format();
}
if (date == "Invalid date") {
date = moment(created).format();
}
} else {
date = moment(created).format();
}
}
asset.taken = asset.modified = date;
}
let dst = picturesPath + path + "thumbs/" + file;
return exists(dst).then(function(exist) {
if (exist) {
return;
}
return image.resize(256, 256).withMetadata().toFile(dst).catch(function(error) {
setStatus("Error resizing image: " + dst + "\n" + error, "error");
throw error;
});
}).then(function() {
let dst = picturesPath + path + "thumbs/scaled/" + file;
return exists(dst).then(function(exist) {
if (exist) {
return;
}
return image.resize(Math.min(1024, metadata.width)).withMetadata().toFile(dst).catch(function(error) {
setStatus("Error resizing image: " + dst + "\n" + error, "error");
throw error;
});
});
}).then(function() {
return photoDB.sequelize.query("UPDATE photos SET " +
"added=:added,modified=:modified,taken=:taken,width=:width,height=:height,size=:size,scanned=CURRENT_TIMESTAMP " +
"WHERE id=:id", {
replacements: asset,
});
});
}).catch(function(error) {
setStatus("Error reading image " + src + ":\n" + error, "error");
return moveCorrupt(path, file).then(function() {
/* If the original file was not a NEF/ORF, we are done... */
if (!rawExtension.exec(asset.filename)) {
return;
}
/* ... otherwise, attempt to re-convert the NEF/ORF->JPG and then resize again */
for (var i = 0; i < triedClean.length; i++) {
if (triedClean[i] == path + file) {
/* Move the NEF/ORF to /corrupt as well so it isn't re-checked again and again... */
// return moveCorrupt(path, asset.filename);
setStatus("Already attempted to convert NEF/ORF to JPG: " + path + file, "error");
return;
}
}
setStatus("Adding " + path + file + " back onto processing queue.", "error");
triedClean.push(path + file);
processBlock([ path, file, created, albumId ]);
});
});
}).catch(function() {
setStatus("Continuing file processing.", "warn");
});
}).then(function() {
toProcess--;
if (moment().add(-5, 'seconds') > lastMessage) {
setStatus("Items to be processed: " + toProcess);
lastMessage = moment();
}
});
}).catch(function(error) {
setStatus("Error processing file. Continuing.", "error");
throw error;
}).then(function() {
setStatus("Completed processing queue. Marking " + duplicates.length + " duplicates.");
return photoDB.sequelize.transaction(function(transaction) {
return Promise.mapSeries(duplicates, function(asset) {
return photoDB.sequelize.query("UPDATE photos " +
"SET duplicate=:duplicate,modified=CURRENT_TIMESTAMP,scanned=CURRENT_TIMESTAMP WHERE id=:id", {
replacements: asset,
transaction: transaction
});
});
});
}).then(function() {
setStatus("Looking for removed assets");
return photoDB.sequelize.query("SELECT photos.scanned,photos.id,photos.filename,albums.path FROM photos " +
"LEFT JOIN albums ON (albums.id=photos.albumId) " +
"WHERE photos.deleted=0 AND (DATETIME(photos.scanned)<DATETIME(:lastScan) OR photos.scanned IS NULL)", {
replacements: {
lastScan: lastScan
},
type: photoDB.sequelize.QueryTypes.SELECT
}).then(function(results) {
let deleted = [];
setStatus("Checking " + results.length + " assets to see if they are on disk.");
return Promise.map(results, function(asset) {
return exists(asset.path + asset.filename).then(function(exist) {
if (!exist) {
setStatus(asset.path + asset.filename + " no longer exists on disk. Marking as deleted.");
deleted.push(asset.id);
}
});
}).then(function() {
return photoDB.sequelize.query("UPDATE photos SET deleted=1,scanned=CURRENT_TIMESTAMP WHERE id IN (:deleted)", {
replacements: {
deleted: deleted
}
}).then(function() {
return photoDB.sequelize.query("DELETE FROM photohashes WHERE photoId IN (:deleted)", {
replacements: {
deleted: deleted
}
});
}).then(function() {
setStatus(deleted.length + " assets deleted.");
});
});
});
}).then(function() {
processRunning = false;
});
}
function scanDir(parent, path) {
let re = new RegExp("\.((" + extensions.join(")|(") + "))$", "i"),
album = {
path: path.slice(picturesPath.length), /* path already ends in '/' */
name: path.replace(/\/$/, "").replace(/.*\//, "").replace(/_/g, " "),
parent: parent,
allAssetCount: 0,
allAlbumCount: 0
}, albums = [ album ], assets = [];
return new Promise(function(resolve, reject) {
fs.readdir(path, function(error, files) {
if (error) {
setStatus("Could not readdir: " + path, "warn");
return resolve([]);
}
/* Remove meta-data directories from being processed */
files = files.filter((file) => {
for (var i = 0; i < files.length; i++) {
/* If this file has an original NEF/ORF on the system, don't add the JPG to the DB */
if (rawExtension.exec(files[i]) && file == files[i].replace(rawExtension, ".jpg")) {
return false;
}
/* If there is a different CASE (eg. JPG vs jpg) don't add it, and remove the 'lower case'
* version from disk. */
if (file != files[i] && file.toUpperCase() == files[i]) {
removeNewerFile(path, file, files[i]);
setStatus("Duplicate file in " + path + ": ", file, files[i]);
return false;
}
}
return metaDirectories.indexOf(file) == -1;
});
return resolve(files);
});
}).then(function(files) {
return Promise.map(files, function(file) {
let filepath = path + file;
return stat(filepath).then(function(stats) {
if (stats.isDirectory()) {
filepath += "/";
return scanDir(album, filepath).spread(function(_albums, _assets) {
album.allAssetCount += _assets.length;
album.allAlbumCount += _albums.length + 1;
albums = albums.concat(_albums);
assets = assets.concat(_assets);
}).catch(function(error) {
setStatus("Could not scanDir " + filepath + ": " + error, "error");
});
}
/* Check file extensions */
if (!re.exec(file)) {
return;
}
album.hasAssets = true;
assets.push({
filename: file.replace(rawExtension, ".jpg"), /* We will be converting from NEF/ORF => JPG */
name: file.replace(/.[^.]*$/, ""),
stats: {
mtime: stats.mtime,
ctime: stats.ctime
},
size: stats.size,
album: album
});
});
});
}).then(function() {
return Promise.map(albums, function(album) {
if (album.hasAssets) {
return mkdir(album.path + "thumbs/scaled");
}
});
}).then(function() {
return [ albums, assets ];
});
}
function findOrCreateDBAlbum(transaction, album) {
let query = "SELECT id FROM albums WHERE path=:path AND ";
if (!album.parent) {
query += "parentId IS NULL";
album.parentId = null;
} else {
if (!album.parent.id) {
let error = "Albums in array in non ancestral order!";
setStatus(error, "error");
throw error;
}
album.parentId = album.parent.id;
query += "parentId=:parentId";
}
return photoDB.sequelize.query(query, {
replacements: album,
type: photoDB.sequelize.QueryTypes.SELECT
}).then(function(results) {
if (results.length == 0) {
if (!album.parent) {
setStatus("Creating top level album: " + picturesPath, "warn" );
}
return photoDB.sequelize.query("INSERT INTO albums (path,parentId,name) VALUES(:path,:parentId,:name)", {
replacements: album,
transaction: transaction
}).spread(function(results, metadata) {
return metadata.lastID;
});
} else {
return results[0].id;
}
}).then(function(id) {
album.id = id;
return id;
});
}
function findOrUpdateDBAsset(transaction, asset) {
if (!asset.album || !asset.album.id) {
let error = "Asset being processed without an album";
setStatus(error, "warn");
throw error;
}
asset.albumId = asset.album.id;
return photoDB.sequelize.query(
"SELECT id,DATETIME(scanned) AS scanned,size,DATETIME(modified) AS modified " +
"FROM photos " +
"WHERE albumId=:albumId AND filename=:filename", {
replacements: asset,
type: photoDB.sequelize.QueryTypes.SELECT
}).then(function(results) {
if (results.length == 0) {
return photoDB.sequelize.query("INSERT INTO photos " +
"(albumId,filename,name,size) VALUES(:albumId,:filename,:name,:size)", {
replacements: asset,
transaction: transaction
}).spread(function(results, metadata) {
asset.id = metadata.lastID;
});
}
asset.id = results[0].id;
asset.scanned = new Date(results[0].scanned);
asset.modified = new Date(results[0].modified);
/* If the size on disk changed, update the size entry in the DB. This shouldn't happen in
* production unless someone modifies the file, then re-stamps the modified time */
if (asset.size != results[0].size) {
setStatus("File was modified with time-restamp (HASH regeneration will be queued): " + asset.filename);
delete asset.scanned;
delete asset.modified;
}
}).then(function() {
return asset;
});
}
function computeHash(filepath) {
return new Promise(function(resolve, reject) {
let input = fs.createReadStream(filepath),
hash = crypto.createHash("sha256");
if (!input) {
console.warn("Unable to open " + filepath);
return reject();
}
input.on("error", function(error) {
console.warn("Error reading " + filepath);
reject(error);
});
input.on("readable", function() {
const data = input.read();
if (data) {
hash.update(data);
} else {
input.close();
resolve(hash.digest("hex"));
hash = null;
input = null;
}
});
});
}
let scanningStatus = [];
function setStatus(status, level) {
if (status == "idle") {
scanningStatus = [];
return;
}
level = level || "info";
scanningStatus.push({
level: level,
time: moment().format(),
log: status
});
switch (level) {
case "error":
console.error(status);
break;
case "warn":
console.warn(status);
break;
default:
console.log(status);
}
}
function doScan() {
/* 1. Scan for all assets which will be managed by the system. readdir
* 2. Check if entry in DB. Check mod-time in DB vs. stats from #1
* - For albums
* - For assets
* 3. If not in DB, or mod-time changed, queue for HASH CHECK
*
* HASH CHECK
* 1. Compute HASH
* 2. Check for HASH in photohash -- skip?
* 3. Check for and create thumbs/FILE thumbs/scaled/FILE
* 4. If necessary, create JPG from RAW
* 5. Update last-scanned date in DB for entry
* 6. Look up all DB entries with last-scanned date < NOW -- purge from DB (they were
* removed on disk)? Also purge from the HASH table.
*/
let initialized = Date.now();
let now = Date.now();
let needsProcessing = [];
if (scanningStatus.length != 0) {
return Promise.resolve(scanningStatus);
}
return scanDir(null, picturesPath).spread(function(albums, assets) {
setStatus("Found " + assets.length + " assets in " + albums.length + " albums after " +
((Date.now() - now) / 1000) + "s");
/* One at a time, in series, as the album[] array has parents first, then descendants.
* Operating in parallel could result in a child being searched for prior to the parent */
now = Date.now();
let toProcess = albums.length, lastMessage = moment();
return photoDB.sequelize.transaction(function(transaction) {
return Promise.mapSeries(albums, function(album) {
return findOrCreateDBAlbum(transaction, album).then(function() {
toProcess--;
if (moment().add(-5, 'seconds') > lastMessage) {
setStatus("Albums to be created in DB: " + toProcess);
lastMessage = moment();
}
});
});
}).then(function() {
setStatus("Processed " + albums.length + " album DB entries in " +
((Date.now() - now) / 1000) + "s");
now = Date.now();
setStatus(assets.length + " assets remaining to be verified/updated. ETA N/A");
let processed = 0, start = Date.now(), last = 0, updateScanned = [], newEntries = 0;
return photoDB.sequelize.transaction(function(transaction) {
return Promise.map(assets, function(asset) {
return Promise.resolve(asset).then(function(asset) {
/* If both mtime and ctime of the asset are older than the lastScan, skip it
*
* Can only do this after a full scan has occurred */
if (lastScan != null && asset.stats.mtime < lastScan && asset.stats.ctime < lastScan) {
return asset;
}
return findOrUpdateDBAsset(transaction, asset).then(function(asset) {
if (!asset.scanned) {
newEntries++;
}
if (!asset.scanned || asset.scanned < asset.stats.mtime || !asset.modified) {
// if (!asset.scanned) { console.log("no scan date on asset"); }
// if (asset.scanned < asset.stats.mtime) { console.log("scan date older than mtime"); }
// if (!asset.modified) { console.log("no mtime."); }
needsProcessing.push(asset);
} else {
updateScanned.push(asset.id);
}
return asset;
}).then(function(asset) {
return asset;
});
}).then(function(asset) {
processed++;
let elapsed = Date.now() - start;
if (elapsed < 5000) {
return asset;
}
let remaining = assets.length - processed, eta = Math.ceil((elapsed / 1000) * remaining / (processed - last));
setStatus(remaining + " assets remaining be verified/updated " +
"(" + newEntries + " new entries, " + needsProcessing.length + " need processing," + (processed - newEntries) + " up-to-date so far). ETA " + eta + "s");
last = processed;
start = Date.now();
});
}, {
concurrency: 10
});
}).then(function() {
if (updateScanned.length) {
return photoDB.sequelize.query("UPDATE photos SET scanned=CURRENT_TIMESTAMP WHERE id IN (:ids)", {
replacements: {
ids: updateScanned
}
}).then(function() {
setStatus("Updated scan date of " + updateScanned.length + " assets");
updateScanned = [];
});
}
}).then(function() {
setStatus(newEntries + " assets are new. " + (needsProcessing.length - newEntries) + " assets have been modified.\n" +
needsProcessing.length + " assets need HASH computed. " + (assets.length - needsProcessing.length) + " need no update.");;
processBlock(needsProcessing);
needsProcessing = [];
}).then(function() {
setStatus("Scanned " + assets.length + " asset DB entries in " +
((Date.now() - now) / 1000) + "s");
assets = [];
});
});
}).then(function() {
setStatus("Total time to initialize DB and all scans: " + ((Date.now() - initialized) / 1000) + "s");
return photoDB.sequelize.query("SELECT max(scanned) AS scanned FROM photos", {
type: photoDB.sequelize.QueryTypes.SELECT
}).then(function(results) {
if (results[0].scanned == null) {
lastScan = new Date("1800-01-01");
} else {
lastScan = new Date(results[0].scanned);
}
setStatus("Updating any asset newer than " + moment(lastScan).format());
});
}).then(function() {
setStatus("idle");
return "scan complete";
}).catch(function(error) {
setStatus(error);
throw error;
});
}
module.exports = {
init: function(db) {
photoDB = db;
},
scan: doScan
};