From 0346b29d918ccc4e4e1f0d6d1575b4ad9a148195 Mon Sep 17 00:00:00 2001 From: James Ketrenos Date: Thu, 12 Jan 2023 15:18:26 -0800 Subject: [PATCH] Improved parallel execution speeds Signed-off-by: James Ketrenos --- docker-compose.yml | 1 + server/scanner.js | 121 +++++++++++++++++++++++++++++++++------------ 2 files changed, 91 insertions(+), 31 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index df12cd2..9d7af9b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,3 +18,4 @@ services: - ${PWD}/config/local.json:/website/config/local.json - /opt/ketrface/models:/root/.deepface # - ${PWD}:/website + - ${PWD}/server:/website/server diff --git a/server/scanner.js b/server/scanner.js index dc43cd8..cb99018 100755 --- a/server/scanner.js +++ b/server/scanner.js @@ -138,8 +138,9 @@ const determineImageDate = (asset, metadata) => { const created = asset.stats.mtime, filename = asset.filename; + /* Attempt to find CREATED / MODIFIED date based on meta-data or - * FILENAME patterns */ + * FILENAME patterns */ if (metadata.exif && metadata.exif.exif && metadata.exif.exif.DateTimeOriginal @@ -203,7 +204,8 @@ const processImageAsset = async (asset) => { let src = picturesPath + path + filename, image = sharp(src); - const metadata = await image/*.limitInputPixels(1073741824)*/ + const metadata = await image + .limitInputPixels(false) .metadata() .catch(error => console.error(error) ); @@ -282,9 +284,13 @@ const processBlock = async (items) => { let toProcess = processing.length, lastMessage = moment(); - setStatus("Items to be processed: " + toProcess); + setStatus(`Items to be processed: ${toProcess}`); - await Promise.mapSeries(processing, async (asset) => { + const updateHash = [], + insertHash = [], + updateAsset = []; + + await Promise.map(processing, async (asset) => { toProcess--; if (moment().add(-5, 'seconds') > lastMessage) { setStatus("Items to be processed: " + toProcess); @@ -318,9 +324,9 @@ const processBlock = async (items) => { let query; if (results.length == 0) { - query = "INSERT INTO photohashes (hash,photoId) VALUES(:hash,:id)"; + insertHash.push(asset); } else if (results[0].hash != asset.hash) { - query = "UPDATE photohashes SET hash=:hash WHERE photoId=:id"; + updateHash.push(asset); } else if (results[0].photoId != asset.id) { setStatus("Duplicate asset: " + "'" + asset.album.path + asset.filename + "' is a copy of " + @@ -345,7 +351,7 @@ const processBlock = async (items) => { needsProcessing.push(asset); try { - await processImageAsset(asset) + await processImageAsset(asset); /* no DB writes */ } catch (error) { const path = asset.album.path, filename = asset.filename; @@ -356,14 +362,42 @@ const processBlock = async (items) => { return; } - /* Update the DB with the image information */ - await photoDB.sequelize.query("UPDATE photos SET " + - "added=:added,modified=:modified,taken=:taken,width=:width,height=:height,size=:size,scanned=CURRENT_TIMESTAMP " + - "WHERE id=:id", { - replacements: asset, - }); + updateAsset.push(asset); + }, { + concurrency: require('os').cpus().length }); + try { + await photoDB.sequelize.transaction(async (t) => { + await Promise.mapSeries(updateHash, async (item) => { + await photoDB.sequelize.query( + "UPDATE photohashes SET hash=:hash WHERE photoId=:id", { + replacements: item, + transaction: t + }); + }); + await Promise.mapSeries(insertHash, async (item) => { + await photoDB.sequelize.query( + "INSERT INTO photohashes (hash,photoId) VALUES(:hash,:id)", { + replacements: item, + transaction: t + }); + }); + /* Update the DB with the image information */ + await Promise.mapSeries(updateAsset, async (item) => { + await photoDB.sequelize.query("UPDATE photos SET " + + "added=:added,modified=:modified,taken=:taken,width=:width,height=:height,size=:size,scanned=CURRENT_TIMESTAMP " + + "WHERE id=:id", { + replacements: item, + transaction: t + }); + }); + }); + } catch (error) { + console.error(error); + process.exit(-1); + } + /* Process the DUPLICATES */ setStatus( `Completed processing queue. Marking ${duplicates.length} duplicates.`); @@ -551,7 +585,19 @@ const findOrCreateDBAlbum = async (t, album) => { return album.id; } -const findOrUpdateDBAsset = async (asset) => { +const createDBAsset = async (asset, transaction) => { + return photoDB.sequelize.query( + "INSERT INTO photos " + + "(albumId,filename,name,size) VALUES(:albumId,:filename,:name,:size)", { + replacements: asset, + transaction + }).then(array => { + asset.id = array[1].lastID; + return asset; + }); +} + +const findDBAsset = async (asset) => { if (!asset.album || !asset.album.id) { let error = "Asset being processed without an album"; setStatus(error, "warn"); @@ -569,14 +615,7 @@ const findOrUpdateDBAsset = async (asset) => { }); if (results.length == 0) { - return photoDB.sequelize.query( - "INSERT INTO photos " + - "(albumId,filename,name,size) VALUES(:albumId,:filename,:name,:size)", { - replacements: asset - }).then(array => { - asset.id = array[1].lastID; - return asset; - }); + return undefined; } asset.id = results[0].id; @@ -587,7 +626,9 @@ const findOrUpdateDBAsset = async (asset) => { * shouldn't happen in production unless someone modifies the file, then * re-stamps the modified time */ if (asset.size != results[0].size) { - setStatus("File was modified with time-restamp (HASH regeneration will be queued): " + asset.filename); + setStatus( + `File was modified with time-restamp (HASH regeneration will be ` + + `queued): ${asset.filename}`); delete asset.scanned; delete asset.modified; } @@ -691,6 +732,7 @@ const doScan = async () => { lastMessage = moment(); await photoDB.sequelize.transaction(async (t) => { await Promise.mapSeries(albums, async (album) => { + await findOrCreateDBAlbum(t, album); toProcess--; @@ -716,14 +758,15 @@ const doScan = async () => { `${assets.length} assets remaining to be verified/updated. ETA N/A`); let updateScanned = [], - newEntries = 0; + newEntries = 0, + needsCreation = []; try { let processed = 0, start = Date.now(), last = 0; - await Promise.mapSeries(assets, async (asset) => { + await Promise.map(assets, async (asset) => { /* If both mtime and ctime of the asset are older than the * lastScan, skip it * Can only do this after a full scan has occurred */ @@ -733,7 +776,11 @@ const doScan = async () => { return; } - asset = await findOrUpdateDBAsset(asset); + const res = await findDBAsset(asset); + if (!res) { + needsCreation.push(asset); + } + if (!asset.scanned) { newEntries++; } @@ -759,15 +806,27 @@ const doScan = async () => { let remaining = assets.length - processed, eta = Math.ceil((elapsed / 1000) * remaining / (processed - last)); setStatus( - `${remaining} assets remaining to be verified/updated (${newEntries} ` + + `${remaining} assets remaining be verified/updated (${newEntries} ` + `new entries, ${needsProcessing.length} need processing, ` + `${(processed - newEntries)} up-to-date so far). ETA ${eta}s` ); last = processed; start = Date.now(); - /*, { - concurrency: 10 - */}); + } , { + concurrency: 10 + }); + } catch (error) { + console.error(error); + process.exit(-1); + } + + try { + console.log(`Creating ${needsCreation.length} asset entries.`); + await photoDB.sequelize.transaction(async (t) => { + await Promise.mapSeries(needsCreation, async (asset) => { + await createDBAsset(asset, t); + }); + }); } catch (error) { console.error(error); process.exit(-1); @@ -787,7 +846,7 @@ const doScan = async () => { setStatus( `${newEntries} assets are new. ` + `${needsProcessing.length - newEntries} assets have been ` + - `modified.\n${needsProcessing.length} assets need HASH computed. ` + + `modified.\n${needsProcessing.length} assets need to be processed. ` + `${assets.length - needsProcessing.length} need no update.`); processBlock(needsProcessing);