/** * scanner * * Face recognition: * 1. For each photo, extract all faces. Store face rectangles. * face_id unique * photo_id foreign key * top left bottom right * identity_id * distance (0 == truth; manually assigned identity) * 2. For each face_id, create: * normalized_file * original_file * 128 float */ "use strict"; /* meta directories are not scanned for photos */ const metaDirectories = [ "thumbs", "raw", "face-data", ".git", "corrupt" ]; const Promise = require("bluebird"), fs = require("fs"), config = require("config"), moment = require("moment"), crypto = require("crypto"), { stat, mkdir, exists } = require("./lib/util"); let photoDB = null; const picturesPath = config.get("picturesPath").replace(/\/$/, "") + "/"; let processQueue = [], triedClean = [], lastScan = new Date("1800-01-01"); //const rawExtension = /\.(nef|orf)$/i, extensions = [ "jpg", "jpeg", "png", "gif", "nef", "orf" ]; const rawExtension = /\.nef$/i, extensions = [ "jpg", "jpeg", "png", "gif", "nef" ]; function removeNewerFile(path, fileA, fileB) { fs.stat(path + fileA, function(err, statsA) { if (err) { return; } fs.stat(path + fileB, function(err, statsB) { if (err) { return; } if (statsA.mtime > statsB.mtime) { setStatus("Removing file by moving to 'corrupt':" + fileA); moveCorrupt(path, fileA); } else { setStatus("Removing file by moving to 'corrupt':" + fileB); moveCorrupt(path, fileB); } }); }); } let processRunning = false; const { spawn } = require('child_process'); const sharp = require("sharp"), exif = require("exif-reader"); function convertRawToJpg(path, raw, file) { setStatus(`Converting ${path}${raw} to ${file}.`); path = picturesPath + path; return new Promise(function(resolve, reject) { return exists(path + file.replace(rawExtension, ".jpg")).then(function(exist) { if (exist) { setStatus("Skipping already converted file: " + file); return; } const darktable = spawn("darktable-cli", [ path + raw, path + file ]); const stderr = []; darktable.stderr.on('data', function(data) { stderr.push(data); }); return new Promise((resolve, reject) => { darktable.on('exit', (code, signal) => { if (signal || code != 0) { let error = "darktable for " + path + file + " returned an error: " + code + "\n" + signal + "\n" + stderr.join("\n") + "\n"; setStatus(error, "error"); return moveCorrupt(path, file).then(function() { setStatus("darktable failed", "warn"); return reject(error); }).catch(function(error) { setStatus("moveCorrupt failed", "warn"); return reject(error); }); } return mkdir(path + "raw").then(function() { fs.rename(path + raw, path + "raw/" + raw, function(err) { if (err) { setStatus("Unable to move RAW file: " + path + raw, "error"); return reject(err); } return resolve(); }); }).catch(function(error) { setStatus("mkdir failed", "warn"); return reject(error); }); }); }); }); }); } function moveCorrupt(path, file) { if (path.indexOf(picturesPath) != 0) { path = picturesPath + path; } setStatus("Moving corrupt file '" + file + "' to " + path + "corrupt", "warn"); return mkdir(path + "corrupt").then(function() { return new Promise(function(resolve, reject) { fs.rename(path + file, path + "corrupt/" + file, function(err) { if (err) { setStatus("Unable to move corrupt file: " + path + file, "error"); return reject(err); } return resolve(); }); }); }); } const determineImageDate = (asset, metadata) => { const created = asset.stats.mtime, filename = asset.filename; /* Attempt to find CREATED / MODIFIED date based on meta-data or * FILENAME patterns */ if (metadata.exif && metadata.exif.exif && metadata.exif.exif.DateTimeOriginal && !isNaN(metadata.exif.exif.DateTimeOriginal.valueOf())) { asset.taken = moment(metadata.exif.exif.DateTimeOriginal).format(); asset.modified = moment(metadata.exif.exif.DateTimeOriginal).format(); if (asset.taken == "Invalid date" || asset.taken.replace(/T.*/, "") == "1899-11-30") { setStatus( `Invalid EXIF date information for ` + `${asset.album.path + asset.filename}`); return moment(created).format(); } return undefined; } /* Attempt to infer the datestamp from the filename */ let date = moment(created).format(); let match = filename.match( /WhatsApp Image (20[0-9][0-9]-[0-9][0-9]-[0-9][0-9]) at (.*).(jpeg|jpg)/); if (match) { date = moment((match[1] + " " + match[2]), "YYYY-MM-DD h.mm.ss a") .format(); if (date == "Invalid date") { date = moment(created).format(); } return date; } match = filename.match( /(20[0-9][0-9]-?[0-9][0-9]-?[0-9][0-9])[_\-]?([0-9]{6})?/); if (!match) { return moment(created).format(); } if (match[2]) { /* Stamp had time in it */ date = moment(`${match[1]}${match[2]}` .replace(/-/g, ""), "YYYYMMDDHHmmss") .format(); } else { date = moment(`${match[1]}` .replace(/-/g, ""), "YYYYMMDD") .format(); } if (date == "Invalid date") { date = moment(created).format(); } return date; } const processImageAsset = async (asset) => { let path = asset.album.path, filename = asset.filename; let src = picturesPath + path + filename, image = sharp(src, { limitInputPixels: false }); const metadata = await image .metadata() .catch(error => console.error(error) ); if (metadata.exif) { try { metadata.exif = exif(metadata.exif); delete metadata.exif.thumbnail; delete metadata.exif.image; for (var key in metadata.exif.exif) { if (Buffer.isBuffer(metadata.exif.exif[key])) { metadata.exif.exif[key] = "Buffer[" + metadata.exif.exif[key].length + "]"; } } } catch (error) { console.error(error); delete metadata.exif } } asset.width = metadata.width; asset.height = metadata.height; asset.added = moment().format(); const updateDate = determineImageDate(asset, metadata); if (updateDate) { asset.taken = asset.modified = updateDate; } let dst = picturesPath + path + "thumbs/" + filename; let onDisk = await exists(dst); if (!onDisk) { await image.resize(256, 256) .withMetadata() .toFile(dst) .catch((error) => { setStatus(`Error resizing image: ${dst}\n${error}`, "error"); throw error; }); } dst = picturesPath + path + "thumbs/scaled/" + filename; if (!onDisk) { await image.resize(Math.min(1024, metadata.width)) .withMetadata() .toFile(dst) .catch((error) => { setStatus(`Error resizing image: ${dst}\n${error}`, "error"); throw error; }); } }; const processBlock = async (items) => { if (items) { processQueue = processQueue.concat(items); } if (processRunning) { /* Invoke once per second to check if there are items to process */ setTimeout(processBlock, 1000); return; } let processing = processQueue.splice(0), needsProcessing = [], duplicates = []; processRunning = true; /* Sort to newest files to be processed first */ processing.sort(function(a, b) { return b.stats.mtime - a.stats.mtime; }); let toProcess = processing.length; setStatus(`Items to be processed: ${toProcess}`); const updateHash = [], insertHash = [], updateAsset = []; let last = 0, lastTime = Date.now(), start = Date.now(); await Promise.map(processing, async (asset) => { toProcess--; const elapsed = Date.now() - lastTime; if (elapsed > 5000) { const processed = processing.length - toProcess, rate = Math.ceil(100 * (processed - last) / elapsed) / 100, eta = Math.ceil(toProcess * (((Date.now() - start) / 1000) / processed)); setStatus(`Processed ${processed}/${processing.length} ` + `(${rate} assets/second). ETA ${eta}s`); last = processed; lastTime = Date.now(); } /* Create JPG from RAW if there is a RAW file and no JPG */ if (asset.raw) { const path = asset.album.path; const onDisk = await exists(picturesPath + path + asset.filename) if (!onDisk) { await mkdir(picturesPath + path + "raw"); await convertRawToJpg(path, asset.raw, asset.filename); console.log("Done converting..."); } } /* Update PHOTOHASH table */ asset.hash = await computeHash( picturesPath + asset.album.path + asset.filename) /* Scan for existing DB entries -- this is a promise, which * means the closure for the *Hash lists could introduce a race * condition where the array didn't contain a match prior to calling * the DB, but during the DB transaction, a match came into being */ const results = await photoDB.sequelize.query( "SELECT photohashes.*,photos.filename,albums.path FROM photohashes " + "LEFT JOIN photos ON (photos.id=photohashes.photoId) " + "LEFT JOIN albums ON (albums.id=photos.albumId) " + "WHERE hash=:hash OR photoId=:id", { replacements: asset, type: photoDB.sequelize.QueryTypes.SELECT }); /* Writes to DB for new assets hasn't happened yet, so the DB * won't have new duplicates */ let duplicate; if (results.length == 0) { duplicate = insertHash.find(entry => entry.hash === asset.hash); if (duplicate) { console.log(`DUPLICATE in INSERT hash list (${insertHash.length}) ` + `for ${duplicate.filename}`); } else { insertHash.push(asset); } } else if (results[0].hash != asset.hash) { duplicate = updateHash.find(entry => entry.hash === asset.hash); if (duplicate) { console.log(`DUPLICATE in UPDATE hash list (${updateHash.length}) ` + `for ${duplicate.filename}`); } else { updateHash.push(asset); } } else if (results[0].photoId != asset.id) { duplicate = results[0]; console.log(`DUPLICATE in HASH db for ${duplicate.filename}`); } /* Output log information about duplicates */ if (duplicate) { /* Fixup structure based on whether obtained from *Hash array or * from DB */ if (!duplicate.path && duplicate.album) { duplicate.path = duplicate.album.path; } setStatus(`Duplicate asset: ` + `'${asset.album.path}${asset.filename}' is a copy of ` + `'${duplicate.path}${duplicate.filename}'`); if (asset.duplicate != duplicate.photoId) { asset.duplicate = duplicate.photoId; duplicates.push(asset); } return; /* Done processing this asset (DUPLICATE) */ } /* Additional processing is only skipped if the asset was a * DUPLICATE above (the empty "return;") */ needsProcessing.push(asset); try { await processImageAsset(asset); /* no DB writes */ } catch (error) { const path = asset.album.path, filename = asset.filename; setStatus(`Error reading image ` + `${picturesPath}${path}${filename}:\n` + `${error}`, "error"); await moveCorrupt(path, filename); return; } updateAsset.push(asset); }, { concurrency: require('os').cpus().length }); try { console.log(`Updating ${updateAsset.length} asset entries.`); await photoDB.sequelize.transaction(async (t) => { /* Update the DB with the image information */ await Promise.mapSeries(updateAsset, async (item) => { await photoDB.sequelize.query("UPDATE photos SET " + "added=:added,modified=:modified,taken=:taken,width=:width," + "height=:height,size=:size,scanned=CURRENT_TIMESTAMP " + "WHERE id=:id", { replacements: item, transaction: t }); }); }); } catch (error) { console.error(error); process.exit(-1); } /* Run UPDATE for hashes first in case a new photo is being added * that has the same hash as an OLD entry. This could happen * if an original file is COPIED and the original DB entry is * edited (UPDATE), and the original is moved elsewhere (INSERT) * * If the UPDATE does not occur first, the INSERT will fail. */ try { console.log(`Updating ${updateHash.length} asset HASH entries.`); await photoDB.sequelize.transaction(async (t) => { await Promise.mapSeries(updateHash, async (item) => { await photoDB.sequelize.query( "UPDATE photohashes SET hash=:hash WHERE photoId=:id", { replacements: item, transaction: t }); }); }); console.log(`Inserting ${insertHash.length} HASH entries.`); await photoDB.sequelize.transaction(async (t) => { await Promise.mapSeries(insertHash, async (item) => { await photoDB.sequelize.query( "INSERT INTO photohashes (hash,photoId) VALUES(:hash,:id)", { replacements: item, transaction: t }); }); }); } catch (error) { console.error(error); process.exit(-1); } /* Process the DUPLICATES */ setStatus( `Completed processing queue. Marking ${duplicates.length} duplicates.`); await photoDB.sequelize.transaction(async (t) => { await Promise.mapSeries(duplicates, async (item) => { await photoDB.sequelize.query( "UPDATE photos " + "SET duplicate=:duplicate,modified=CURRENT_TIMESTAMP,scanned=CURRENT_TIMESTAMP WHERE id=:id", { replacements: item, transaction: t }); }); }) setStatus("Looking for removed assets"); let results = await photoDB.sequelize.query( "SELECT photos.scanned,photos.id,photos.filename,albums.path FROM photos " + "LEFT JOIN albums ON (albums.id=photos.albumId) " + "WHERE photos.deleted=0 AND (DATETIME(photos.scanned) { const onDisk = await exists(item.path + item.filename); if (!onDisk) { setStatus( `${item.path}${item.filename} no longer exists on disk. ` + `Marking as deleted.`); deleted.push(item.id); } }); if (deleted.length) { await photoDB.sequelize.transaction(async (t) => { await photoDB.sequelize.query( "UPDATE photos SET deleted=1,scanned=CURRENT_TIMESTAMP " + "WHERE id IN (:deleted)", { replacements: { deleted }, transaction: t } ); await photoDB.sequelize.query( "DELETE FROM photohashes WHERE photoId IN (:deleted)", { replacements: { deleted }, transaction: t } ); }); } setStatus(`${deleted.length} assets deleted.`); processRunning = false; } function scanDir(parent, path) { let re = new RegExp("\.((" + extensions.join(")|(") + "))$", "i"), album = { path: path.slice(picturesPath.length), /* path already ends in '/' */ name: path.replace(/\/$/, "").replace(/.*\//, "").replace(/_/g, " "), parent: parent, allAssetCount: 0, allAlbumCount: 0 }, albums = [ album ], assets = []; return new Promise(function(resolve, reject) { fs.readdir(path, function(error, files) { if (error) { setStatus("Could not readdir: " + path, "warn"); return resolve([]); } /* Remove meta-data directories from being processed */ files = files.filter((file) => { for (var i = 0; i < files.length; i++) { /* If this file has an original NEF/ORF on the system, don't add the JPG to the DB */ if (rawExtension.exec(files[i]) && file == files[i].replace(rawExtension, ".jpg")) { return false; } /* If there is a different CASE (eg. JPG vs jpg) don't add it, and remove the 'lower case' * version from disk. */ if (file != files[i] && file.toUpperCase() == files[i]) { removeNewerFile(path, file, files[i]); setStatus("Duplicate file in " + path + ": ", file, files[i]); return false; } } return metaDirectories.indexOf(file) == -1; }); return resolve(files); }); }).then(function(files) { return Promise.map(files, function(file) { let filepath = path + file; return stat(filepath).then(function(stats) { if (stats.isDirectory()) { filepath += "/"; return scanDir(album, filepath).then((res) => { const _albums = res.albums, _assets = res.assets; album.allAssetCount += _assets.length; album.allAlbumCount += _albums.length + 1; albums = albums.concat(_albums); assets = assets.concat(_assets); }).catch(function(error) { setStatus("Could not scanDir " + filepath + ": " + error, "error"); }); } /* Check file extensions */ if (!re.exec(file)) { return; } album.hasAssets = true; const asset = { filename: file.replace(rawExtension, ".jpg"), name: file.replace(/.[^.]*$/, ""), stats: { mtime: stats.mtime, ctime: stats.ctime }, size: stats.size, album: album } if (file != asset.filename) { asset.raw = file; } assets.push(asset); }); }); }).then(function() { return Promise.map(albums, function(album) { if (album.hasAssets) { return mkdir(album.path + "thumbs/scaled"); } }); }).then(function() { return { albums, assets }; }); } const findOrCreateDBAlbum = async (t, album) => { let query = "SELECT id FROM albums WHERE path=:path AND "; if (!album.parent) { query += "parentId IS NULL"; album.parentId = null; } else { if (!album.parent.id) { let error = "Albums in array in non ancestral order!"; setStatus(error, "error"); throw error; } album.parentId = album.parent.id; query += "parentId=:parentId"; } const results = await photoDB.sequelize.query(query, { replacements: album, type: photoDB.sequelize.QueryTypes.SELECT }); if (results.length == 0) { if (!album.parent) { setStatus("Creating top level album: " + picturesPath, "warn" ); } return photoDB.sequelize.query("INSERT INTO albums (path,parentId,name) VALUES(:path,:parentId,:name)", { replacements: album, transaction: t }).then(array => { album.id = array[1].lastID; }); } else { album.id = results[0].id; } return album.id; } const createDBAsset = async (asset, transaction) => { return photoDB.sequelize.query( "INSERT INTO photos " + "(albumId,filename,name,size) VALUES(:albumId,:filename,:name,:size)", { replacements: asset, transaction }).then(array => { asset.id = array[1].lastID; return asset; }); } const findDBAsset = async (asset) => { if (!asset.album || !asset.album.id) { let error = "Asset being processed without an album"; setStatus(error, "warn"); throw error; } asset.albumId = asset.album.id; const results = await photoDB.sequelize.query( "SELECT id,DATETIME(scanned) AS scanned,size,DATETIME(modified) AS modified " + "FROM photos " + "WHERE albumId=:albumId AND filename=:filename", { replacements: asset, type: photoDB.sequelize.QueryTypes.SELECT }); if (results.length == 0) { return undefined; } asset.id = results[0].id; asset.scanned = new Date(results[0].scanned); asset.modified = new Date(results[0].modified); /* If the size on disk changed, update the size entry in the DB. This * shouldn't happen in production unless someone modifies the file, then * re-stamps the modified time */ if (asset.size != results[0].size) { setStatus( `File was modified with time-restamp (HASH regeneration will be ` + `queued): ${asset.filename}`); delete asset.scanned; delete asset.modified; } return asset; } function computeHash(filepath) { return new Promise(function(resolve, reject) { let input = fs.createReadStream(filepath), hash = crypto.createHash("sha256"); if (!input) { console.warn("Unable to open " + filepath); return reject(); } input.on("error", function(error) { console.warn("Error reading " + filepath); reject(error); }); input.on("readable", function() { const data = input.read(); if (data) { hash.update(data); } else { input.close(); resolve(hash.digest("hex")); hash = null; input = null; } }); }); } let scanningStatus = []; function setStatus(status, level) { if (status == "idle") { scanningStatus = []; return; } level = level || "info"; scanningStatus.push({ level: level, time: moment().format(), log: status }); switch (level) { case "error": console.error(status); break; case "warn": console.warn(status); break; default: console.log(status); } } const doScan = async () => { /* 1. Scan for all assets which will be managed by the system. readdir * 2. Check if entry in DB. Check mod-time in DB vs. stats from #1 * - For albums * - For assets * 3. If not in DB, or mod-time changed, queue for HASH CHECK * * HASH CHECK * 1. Compute HASH * 2. Check for HASH in photohash -- skip? * 3. Check for and create thumbs/FILE thumbs/scaled/FILE * 4. If necessary, create JPG from RAW * 5. Update last-scanned date in DB for entry * 6. Look up all DB entries with last-scanned date < NOW -- purge from DB (they were * removed on disk)? Also purge from the HASH table. */ let initialized = Date.now(); let now = Date.now(); let needsProcessing = []; if (scanningStatus.length != 0) { return scanningStatus; } let { albums, assets } = await scanDir(null, picturesPath); setStatus( `Found ${assets.length} assets in ${albums.length} albums after ` + `${(Date.now() - now) / 1000}s`); /* One at a time, in series, as the album[] array has parents * first, then descendants. * Operating in parallel could result in a child being searched for * prior to the parent */ now = Date.now(); try { let toProcess = albums.length, lastMessage = moment(); await photoDB.sequelize.transaction(async (t) => { await Promise.mapSeries(albums, async (album) => { await findOrCreateDBAlbum(t, album); toProcess--; if (moment().add(-5, 'seconds') > lastMessage) { setStatus(`Albums to be created in DB: ${toProcess}`); lastMessage = moment(); } }); }); } catch (error) { console.error(error); process.exit(-1); } setStatus( `Processed ${albums.length} album DB entries in ` + `${(Date.now() - now) / 1000}s`); now = Date.now(); setStatus( `${assets.length} assets remaining to be verified/updated. ETA N/A`); let updateScanned = [], newEntries = 0, needsCreation = []; try { let processed = 0, start = Date.now(), last = 0; await Promise.map(assets, async (asset) => { /* If both mtime and ctime of the asset are older than the * lastScan, skip it * Can only do this after a full scan has occurred */ if (lastScan != null && asset.stats.mtime < lastScan && asset.stats.ctime < lastScan) { return; } const res = await findDBAsset(asset); if (!res) { needsCreation.push(asset); } if (!asset.scanned) { newEntries++; } if (!asset.scanned || asset.scanned < asset.stats.mtime || !asset.modified) { // if (!asset.scanned) { console.log("no scan date on asset"); } // if (asset.scanned < asset.stats.mtime) { console.log("scan date older than mtime"); } // if (!asset.modified) { console.log("no mtime."); } needsProcessing.push(asset); } else { updateScanned.push(asset.id); } processed++; let elapsed = Date.now() - start; if (elapsed < 5000) { return; } let remaining = assets.length - processed, eta = Math.ceil((elapsed / 1000) * remaining / (processed - last)); setStatus( `${remaining} assets remaining be verified/updated (${newEntries} ` + `new entries, ${needsProcessing.length} need processing, ` + `${(processed - newEntries)} up-to-date so far). ETA ${eta}s` ); last = processed; start = Date.now(); } , { concurrency: 10 }); } catch (error) { console.error(error); process.exit(-1); } try { console.log(`Creating ${needsCreation.length} asset entries.`); await photoDB.sequelize.transaction(async (t) => { await Promise.mapSeries(needsCreation, async (asset) => { await createDBAsset(asset, t); }); }); } catch (error) { console.error(error); process.exit(-1); } if (updateScanned.length) { await photoDB.sequelize.query( "UPDATE photos SET scanned=CURRENT_TIMESTAMP WHERE id IN (:ids)", { replacements: { ids: updateScanned } }); setStatus("Updated scan date of " + updateScanned.length + " assets"); updateScanned = []; } setStatus( `${newEntries} assets are new. ` + `${needsProcessing.length - newEntries} assets have been ` + `modified.\n${needsProcessing.length} assets need to be processed. ` + `${assets.length - needsProcessing.length} need no update.`); processBlock(needsProcessing); needsProcessing = []; setStatus( `Scanned ${assets.length} asset DB entries in ` + `${(Date.now() - now) / 1000}s`); assets = []; setStatus( `Total time to initialize DB and all scans: ` + `${(Date.now() - initialized) / 1000}s`); let results = await photoDB.sequelize.query( "SELECT max(scanned) AS scanned FROM photos", { type: photoDB.sequelize.QueryTypes.SELECT }); if (results[0].scanned == null) { lastScan = new Date("1800-01-01"); } else { lastScan = new Date(results[0].scanned); } setStatus(`Updating any asset newer than ${moment(lastScan).format()}`); setStatus("idle"); return "scan complete"; } module.exports = { init: function(db) { photoDB = db; }, scan: doScan };