"use strict"; const Promise = require("bluebird"), fs = require("fs"), config = require("config"), moment = require("moment"), crypto = require("crypto"); let photoDB = null; const picturesPath = config.get("picturesPath").replace(/\/$/, "") + "/"; let processQueue = [], triedClean = [], lastScan = new Date("1800-01-01"); //const rawExtension = /\.(nef|orf)$/i, extensions = [ "jpg", "jpeg", "png", "gif", "nef", "orf" ]; const rawExtension = /\.nef$/i, extensions = [ "jpg", "jpeg", "png", "gif", "nef" ]; function removeNewerFile(path, fileA, fileB) { fs.stat(path + fileA, function(err, statsA) { if (err) { return; } fs.stat(path + fileB, function(err, statsB) { if (err) { return; } if (statsA.mtime > statsB.mtime) { setStatus("Removing file by moving to 'corrupt':" + fileA); moveCorrupt(path, fileA); } else { setStatus("Removing file by moving to 'corrupt':" + fileB); moveCorrupt(path, fileB); } }); }); } let processRunning = false; const { spawn } = require('child_process'); const sharp = require("sharp"), exif = require("exif-reader"); const stat = function (_path) { if (_path.indexOf(picturesPath.replace(/\/$/, "")) == 0) { _path = _path.substring(picturesPath.length); } let path = picturesPath + _path; return new Promise(function (resolve, reject) { fs.stat(path, function (error, stats) { if (error) { return reject(error); } return resolve(stats); }); }); } const mkdir = function (_path) { if (_path.indexOf(picturesPath) == 0) { _path = _path.substring(picturesPath.length); } let parts = _path.split("/"), path; parts.unshift(picturesPath); return Promise.mapSeries(parts, function (part) { if (!path) { path = picturesPath.replace(/\/$/, ""); } else { path += "/" + part; } return stat(path).catch(function (error) { if (error.code != "ENOENT") { throw error; } return new Promise(function (resolve, reject) { fs.mkdir(path, function (error) { if (error) { return reject(error); } return resolve(); }); }); }); }); } const exists = function(path) { return stat(path).then(function() { return true; }).catch(function() { return false; }); } function convertRawToJpg(path, file) { setStatus("Converting " + path + file); path = picturesPath + path; return new Promise(function(resolve, reject) { return exists(path + file.replace(rawExtension, ".jpg")).then(function(exist) { if (exist) { setStatus("Skipping already converted file: " + file); return; } const ufraw = spawn("ufraw-batch", [ "--silent", "--wb=camera", "--rotate=camera", "--out-type=jpg", "--compression=90", "--exif", "--overwrite", "--output", path + file.replace(rawExtension, ".jpg"), path + file ]); const stderr = []; ufraw.stderr.on('data', function(data) { stderr.push(data); }); return new Promise(function(ufraw, resolve, reject) { ufraw.on('exit', function(stderr, code, signal) { if (signal || code != 0) { let error = "UFRAW for " + path + file + " returned an error: " + code + "\n" + signal + "\n" + stderr.join("\n") + "\n"; setStatus(error, "error"); return moveCorrupt(path, file).then(function() { setStatus("ufraw failed", "warn"); return reject(error); }).catch(function(error) { setStatus("moveCorrupt failed", "warn"); return reject(error); }); } return mkdir(path + "raw").then(function() { fs.rename(path + file, path + "raw/" + file, function(err) { if (err) { setStatus("Unable to move RAW file: " + path + file, "error"); return reject(err); } return resolve(); }); }).catch(function(error) { setStatus("mkdir failed", "warn"); return reject(error); }); }.bind(this, ufraw)); }.bind(this, stderr)); }); }); } function moveCorrupt(path, file) { if (path.indexOf(picturesPath) != 0) { path = picturesPath + path; } setStatus("Moving corrupt file '" + file + "' to " + path + "corrupt", "warn"); return mkdir(path + "corrupt").then(function() { return new Promise(function(resolve, reject) { fs.rename(path + file, path + "corrupt/" + file, function(err) { if (err) { setStatus("Unable to move corrupt file: " + path + file, "error"); return reject(err); } return resolve(); }); }); }); } function processBlock(items) { if (items) { processQueue = processQueue.concat(items); } if (processRunning) { /* Invoke once per second to check if there are items to process */ setTimeout(processBlock, 1000); return; } let processing = processQueue.splice(0), needsProcessing = [], duplicates = []; processRunning = true; /* Sort to newest files to be processed first */ processing.sort(function(a, b) { return b.stats.mtime - a.stats.mtime; }); let toProcess = processing.length, lastMessage = moment(); setStatus("Items to be processed: " + toProcess); return Promise.mapSeries(processing, function(asset) { return computeHash(picturesPath + asset.album.path + asset.filename).then(function(hash) { asset.hash = hash; return asset; }).then(function(asset) { return photoDB.sequelize.query("SELECT photohashes.*,photos.filename,albums.path FROM photohashes " + "LEFT JOIN photos ON (photos.id=photohashes.photoId) " + "LEFT JOIN albums ON (albums.id=photos.albumId) " + "WHERE hash=:hash OR photoId=:id", { replacements: asset, type: photoDB.sequelize.QueryTypes.SELECT }).then(function(results) { let query; if (results.length == 0) { query = "INSERT INTO photohashes (hash,photoId) VALUES(:hash,:id)"; } else if (results[0].hash != asset.hash) { query = "UPDATE photohashes SET hash=:hash WHERE photoId=:id"; } else if (results[0].photoId != asset.id) { setStatus("Duplicate asset: " + "'" + asset.album.path + asset.filename + "' is a copy of " + "'" + results[0].path + results[0].filename + "'"); if (asset.duplicate != results[0].photoId) { duplicates.push(asset); } return null; } /* Even if the hash doesn't need to be updated, the entry needs to be scanned */ needsProcessing.push(asset); if (!query) { return asset; } return photoDB.sequelize.query(query, { replacements: asset, }).then(function() { return asset; }); }); }).then(function(asset) { if (!asset) { /* The processed entry is a DUPLICATE. Skip it. */ return; } var path = asset.album.path, file = asset.filename, created = asset.stats.mtime, albumId = asset.album.id; let tmp = Promise.resolve(file); /* If this is a Nikon RAW file, convert it to JPG and move to /raw dir */ if (rawExtension.exec(file)) { tmp = exists(picturesPath + path + file.replace(rawExtension, ".jpg")).then(function(exist) { if (exist) { return file.replace(rawExtension, ".jpg"); /* We converted from NEF/ORF => JPG */ } return mkdir(picturesPath + path + "raw").then(function() { return convertRawToJpg(path, file); }).then(function() { return file.replace(rawExtension, ".jpg"); /* We converted from NEF/ORF => JPG */ }); }); } return tmp.then(function(file) { var src = picturesPath + path + file, image = sharp(src); return image.limitInputPixels(1073741824).metadata().then(function(metadata) { if (metadata.exif) { metadata.exif = exif(metadata.exif); delete metadata.exif.thumbnail; delete metadata.exif.image; for (var key in metadata.exif.exif) { if (Buffer.isBuffer(metadata.exif.exif[key])) { metadata.exif.exif[key] = "Buffer[" + metadata.exif.exif[key].length + "]"; } } } asset.width = metadata.width; asset.height = metadata.height; asset.added = moment().format(); if (metadata.exif && metadata.exif.exif && metadata.exif.exif.DateTimeOriginal && !isNaN(metadata.exif.exif.DateTimeOriginal.valueOf())) { asset.taken = moment(metadata.exif.exif.DateTimeOriginal).format(); asset.modified = moment(metadata.exif.exif.DateTimeOriginal).format(); if (asset.taken == "Invalid date" || asset.taken.replace(/T.*/, "") == "1899-11-30") { setStatus("Invalid EXIF date information for " + asset.album.path + asset.filename); asset.taken = asset.modified = moment(created).format(); } } else { /* Attempt to infer the datestamp from the filename */ let date = moment(created).format(); let match = file.match(/WhatsApp Image (20[0-9][0-9]-[0-9][0-9]-[0-9][0-9]) at (.*).(jpeg|jpg)/); if (match) { date = moment((match[1]+" "+match[2]), "YYYY-MM-DD h.mm.ss a").format(); if (date == "Invalid date") { date = moment(created).format(); } } else { match = file.match(/(20[0-9][0-9]-?[0-9][0-9]-?[0-9][0-9])[_\-]?([0-9]{6})?/); if (match) { if (match[2]) { /* Stamp had time in it */ date = moment((match[1]+""+match[2]).replace(/-/g, ""), "YYYYMMDDHHmmss").format(); } else { date = moment(match[1].replace(/-/g, ""), "YYYYMMDD").format(); } if (date == "Invalid date") { date = moment(created).format(); } } else { date = moment(created).format(); } } asset.taken = asset.modified = date; } let dst = picturesPath + path + "thumbs/" + file; return exists(dst).then(function(exist) { if (exist) { return; } return image.resize(256, 256).withMetadata().toFile(dst).catch(function(error) { setStatus("Error resizing image: " + dst + "\n" + error, "error"); throw error; }); }).then(function() { let dst = picturesPath + path + "thumbs/scaled/" + file; return exists(dst).then(function(exist) { if (exist) { return; } return image.resize(Math.min(1024, metadata.width)).withMetadata().toFile(dst).catch(function(error) { setStatus("Error resizing image: " + dst + "\n" + error, "error"); throw error; }); }); }).then(function() { return photoDB.sequelize.query("UPDATE photos SET " + "added=:added,modified=:modified,taken=:taken,width=:width,height=:height,size=:size,scanned=CURRENT_TIMESTAMP " + "WHERE id=:id", { replacements: asset, }); }); }).catch(function(error) { setStatus("Error reading image " + src + ":\n" + error, "error"); return moveCorrupt(path, file).then(function() { /* If the original file was not a NEF/ORF, we are done... */ if (!rawExtension.exec(asset.filename)) { return; } /* ... otherwise, attempt to re-convert the NEF/ORF->JPG and then resize again */ for (var i = 0; i < triedClean.length; i++) { if (triedClean[i] == path + file) { /* Move the NEF/ORF to /corrupt as well so it isn't re-checked again and again... */ // return moveCorrupt(path, asset.filename); setStatus("Already attempted to convert NEF/ORF to JPG: " + path + file, "error"); return; } } setStatus("Adding " + path + file + " back onto processing queue.", "error"); triedClean.push(path + file); processBlock([ path, file, created, albumId ]); }); }); }).catch(function() { setStatus("Continuing file processing.", "warn"); }); }).then(function() { toProcess--; if (moment().add(-5, 'seconds') > lastMessage) { setStatus("Items to be processed: " + toProcess); lastMessage = moment(); } }); }).catch(function(error) { setStatus("Error processing file. Continuing."); throw error; }).then(function() { setStatus("Completed processing queue. Marking " + duplicates.length + " duplicates."); return photoDB.sequelize.transaction(function(transaction) { return Promise.mapSeries(duplicates, function(asset) { return photoDB.sequelize.query("UPDATE photos " + "SET duplicate=:duplicate,modified=CURRENT_TIMESTAMP,scanned=CURRENT_TIMESTAMP WHERE id=:id", { replacements: asset, transaction: transaction }); }); }); }).then(function() { setStatus("Looking for removed assets"); return photoDB.sequelize.query("SELECT photos.scanned,photos.id,photos.filename,albums.path FROM photos " + "LEFT JOIN albums ON (albums.id=photos.albumId) " + "WHERE photos.deleted=0 AND DATETIME(photos.scanned) JPG */ name: file.replace(/.[^.]*$/, ""), stats: { mtime: stats.mtime, ctime: stats.ctime }, size: stats.size, album: album }); }); }); }).then(function() { return Promise.map(albums, function(album) { if (album.hasAssets) { return mkdir(album.path + "thumbs/scaled"); } }); }).then(function() { return [ albums, assets ]; }); } function findOrCreateDBAlbum(transaction, album) { let query = "SELECT id FROM albums WHERE path=:path AND "; if (!album.parent) { query += "parentId IS NULL"; album.parentId = null; } else { if (!album.parent.id) { let error = "Albums in array in non ancestral order!"; setStatus(error, "error"); throw error; } album.parentId = album.parent.id; query += "parentId=:parentId"; } return photoDB.sequelize.query(query, { replacements: album, type: photoDB.sequelize.QueryTypes.SELECT }).then(function(results) { if (results.length == 0) { if (!album.parent) { setStatus("Creating top level album: " + picturesPath, "warn" ); } return photoDB.sequelize.query("INSERT INTO albums (path,parentId,name) VALUES(:path,:parentId,:name)", { replacements: album, transaction: transaction }).spread(function(results, metadata) { return metadata.lastID; }); } else { return results[0].id; } }).then(function(id) { album.id = id; return id; }); } function findOrUpdateDBAsset(transaction, asset) { if (!asset.album || !asset.album.id) { let error = "Asset being processed without an album"; setStatus(error, "warn"); throw error; } asset.albumId = asset.album.id; return photoDB.sequelize.query( "SELECT id,DATETIME(scanned) AS scanned,size,DATETIME(modified) AS modified " + "FROM photos " + "WHERE albumId=:albumId AND filename=:filename", { replacements: asset, type: photoDB.sequelize.QueryTypes.SELECT }).then(function(results) { if (results.length == 0) { return photoDB.sequelize.query("INSERT INTO photos " + "(albumId,filename,name,size) VALUES(:albumId,:filename,:name,:size)", { replacements: asset, transaction: transaction }).spread(function(results, metadata) { asset.id = metadata.lastID; }); } asset.id = results[0].id; asset.scanned = new Date(results[0].scanned); asset.modified = new Date(results[0].modified); /* If the size on disk changed, update the size entry in the DB. This shouldn't happen in * production unless someone modifies the file, then re-stamps the modified time */ if (asset.size != results[0].size) { setStatus("File was modified with time-restamp (HASH regeneration will be queued): " + asset.filename); delete asset.scanned; delete asset.modified; } }).then(function() { return asset; }); } function computeHash(filepath) { return new Promise(function(resolve, reject) { let input = fs.createReadStream(filepath), hash = crypto.createHash("sha256"); if (!input) { return reject() } input.on("readable", function() { const data = input.read(); if (data) { hash.update(data); } else { input.close(); resolve(hash.digest("hex")); hash = null; input = null; } }); }); } let scanningStatus = []; function setStatus(status, level) { if (status == "idle") { scanningStatus = []; } level = level || "info"; scanningStatus.push({ level: level, time: moment().format(), log: status }); switch (level) { case "error": console.error(status); break; case "warn": console.warn(status); break; default: console.log(status); } } function doScan() { /* 1. Scan for all assets which will be managed by the system. readdir * 2. Check if entry in DB. Check mod-time in DB vs. stats from #1 * - For albums * - For assets * 3. If not in DB, or mod-time changed, queue for HASH CHECK * * HASH CHECK * 1. Compute HASH * 2. Check for HASH in photohash -- skip? * 3. Check for and create thumbs/FILE thumbs/scaled/FILE * 4. If necessary, create JPG from RAW * 5. Update last-scanned date in DB for entry * 6. Look up all DB entries with last-scanned date < NOW -- purge from DB (they were * removed on disk)? Also purge from the HASH table. */ let initialized = Date.now(); let now = Date.now(); let needsProcessing = []; if (scanningStatus.length != 0) { return Promise.resolve(scanningStatus); } return scanDir(null, picturesPath).spread(function(albums, assets) { setStatus("Found " + assets.length + " assets in " + albums.length + " albums after " + ((Date.now() - now) / 1000) + "s"); /* One at a time, in series, as the album[] array has parents first, then descendants. * Operating in parallel could result in a child being searched for prior to the parent */ now = Date.now(); let toProcess = albums.length, lastMessage = moment(); return photoDB.sequelize.transaction(function(transaction) { return Promise.mapSeries(albums, function(album) { return findOrCreateDBAlbum(transaction, album).then(function() { toProcess--; if (moment().add(-5, 'seconds') > lastMessage) { setStatus("Albums to be created in DB: " + toProcess); lastMessage = moment(); } }); }); }).then(function() { setStatus("Processed " + albums.length + " album DB entries in " + ((Date.now() - now) / 1000) + "s"); now = Date.now(); setStatus(assets.length + " assets remaining to be verified/updated. ETA N/A"); let processed = 0, start = Date.now(), last = 0, updateScanned = [], newEntries = 0; return photoDB.sequelize.transaction(function(transaction) { return Promise.map(assets, function(asset) { return Promise.resolve(asset).then(function(asset) { /* If both mtime and ctime of the asset are older than the lastScan, skip it * * Can only do this after a full scan has occurred */ if (lastScan != null && asset.stats.mtime < lastScan && asset.stats.ctime < lastScan) { return asset; } return findOrUpdateDBAsset(transaction, asset).then(function(asset) { if (!asset.scanned) { newEntries++; } if (!asset.scanned || asset.scanned < asset.stats.mtime || !asset.modified) { needsProcessing.push(asset); } else { updateScanned.push(asset.id); } return asset; }).then(function(asset) { return asset; }); }).then(function(asset) { processed++; let elapsed = Date.now() - start; if (elapsed < 5000) { return asset; } let remaining = assets.length - processed, eta = Math.ceil((elapsed / 1000) * remaining / (processed - last)); setStatus(remaining + " assets remaining be verified/updated " + "(" + newEntries + " new entries, " + (processed - newEntries) + " up-to-date so far). ETA " + eta + "s"); last = processed; start = Date.now(); }); }, { concurrency: 10 }); }).then(function() { if (updateScanned.length) { return photoDB.sequelize.query("UPDATE photos SET scanned=CURRENT_TIMESTAMP WHERE id IN (:ids)", { replacements: { ids: updateScanned } }).then(function() { setStatus("Updated scan date of " + updateScanned.length + " assets"); updateScanned = []; }); } }).then(function() { setStatus(newEntries + " assets are new. " + (needsProcessing.length - newEntries) + " assets have been modified.\n" + needsProcessing.length + " assets need HASH computed. " + (assets.length - needsProcessing.length) + " need no update.");; processBlock(needsProcessing); needsProcessing = []; }).then(function() { setStatus("Scanned " + assets.length + " asset DB entries in " + ((Date.now() - now) / 1000) + "s"); assets = []; }); }); }).then(function() { setStatus("Total time to initialize DB and all scans: " + ((Date.now() - initialized) / 1000) + "s"); return photoDB.sequelize.query("SELECT max(scanned) AS scanned FROM photos", { type: photoDB.sequelize.QueryTypes.SELECT }).then(function(results) { if (results[0].scanned == null) { lastScan = new Date("1800-01-01"); } else { lastScan = new Date(results[0].scanned); } setStatus("Updating any asset newer than " + moment(lastScan).format()); }); }).then(function() { setStatus("idle"); return "scan complete"; }).catch(function(error) { setStatus(error); throw error; }); } module.exports = { init: function(db) { photoDB = db; }, scan: doScan };