Improved parallel execution speeds

Signed-off-by: James Ketrenos <james_git@ketrenos.com>
This commit is contained in:
James Ketr 2023-01-12 15:18:26 -08:00
parent 3e6bb96ab8
commit 0346b29d91
2 changed files with 91 additions and 31 deletions

View File

@ -18,3 +18,4 @@ services:
- ${PWD}/config/local.json:/website/config/local.json
- /opt/ketrface/models:/root/.deepface
# - ${PWD}:/website
- ${PWD}/server:/website/server

View File

@ -138,8 +138,9 @@ const determineImageDate = (asset, metadata) => {
const created = asset.stats.mtime,
filename = asset.filename;
/* Attempt to find CREATED / MODIFIED date based on meta-data or
* FILENAME patterns */
* FILENAME patterns */
if (metadata.exif
&& metadata.exif.exif
&& metadata.exif.exif.DateTimeOriginal
@ -203,7 +204,8 @@ const processImageAsset = async (asset) => {
let src = picturesPath + path + filename,
image = sharp(src);
const metadata = await image/*.limitInputPixels(1073741824)*/
const metadata = await image
.limitInputPixels(false)
.metadata()
.catch(error => console.error(error) );
@ -282,9 +284,13 @@ const processBlock = async (items) => {
let toProcess = processing.length, lastMessage = moment();
setStatus("Items to be processed: " + toProcess);
setStatus(`Items to be processed: ${toProcess}`);
await Promise.mapSeries(processing, async (asset) => {
const updateHash = [],
insertHash = [],
updateAsset = [];
await Promise.map(processing, async (asset) => {
toProcess--;
if (moment().add(-5, 'seconds') > lastMessage) {
setStatus("Items to be processed: " + toProcess);
@ -318,9 +324,9 @@ const processBlock = async (items) => {
let query;
if (results.length == 0) {
query = "INSERT INTO photohashes (hash,photoId) VALUES(:hash,:id)";
insertHash.push(asset);
} else if (results[0].hash != asset.hash) {
query = "UPDATE photohashes SET hash=:hash WHERE photoId=:id";
updateHash.push(asset);
} else if (results[0].photoId != asset.id) {
setStatus("Duplicate asset: " +
"'" + asset.album.path + asset.filename + "' is a copy of " +
@ -345,7 +351,7 @@ const processBlock = async (items) => {
needsProcessing.push(asset);
try {
await processImageAsset(asset)
await processImageAsset(asset); /* no DB writes */
} catch (error) {
const path = asset.album.path,
filename = asset.filename;
@ -356,14 +362,42 @@ const processBlock = async (items) => {
return;
}
/* Update the DB with the image information */
await photoDB.sequelize.query("UPDATE photos SET " +
"added=:added,modified=:modified,taken=:taken,width=:width,height=:height,size=:size,scanned=CURRENT_TIMESTAMP " +
"WHERE id=:id", {
replacements: asset,
});
updateAsset.push(asset);
}, {
concurrency: require('os').cpus().length
});
try {
await photoDB.sequelize.transaction(async (t) => {
await Promise.mapSeries(updateHash, async (item) => {
await photoDB.sequelize.query(
"UPDATE photohashes SET hash=:hash WHERE photoId=:id", {
replacements: item,
transaction: t
});
});
await Promise.mapSeries(insertHash, async (item) => {
await photoDB.sequelize.query(
"INSERT INTO photohashes (hash,photoId) VALUES(:hash,:id)", {
replacements: item,
transaction: t
});
});
/* Update the DB with the image information */
await Promise.mapSeries(updateAsset, async (item) => {
await photoDB.sequelize.query("UPDATE photos SET " +
"added=:added,modified=:modified,taken=:taken,width=:width,height=:height,size=:size,scanned=CURRENT_TIMESTAMP " +
"WHERE id=:id", {
replacements: item,
transaction: t
});
});
});
} catch (error) {
console.error(error);
process.exit(-1);
}
/* Process the DUPLICATES */
setStatus(
`Completed processing queue. Marking ${duplicates.length} duplicates.`);
@ -551,7 +585,19 @@ const findOrCreateDBAlbum = async (t, album) => {
return album.id;
}
const findOrUpdateDBAsset = async (asset) => {
const createDBAsset = async (asset, transaction) => {
return photoDB.sequelize.query(
"INSERT INTO photos " +
"(albumId,filename,name,size) VALUES(:albumId,:filename,:name,:size)", {
replacements: asset,
transaction
}).then(array => {
asset.id = array[1].lastID;
return asset;
});
}
const findDBAsset = async (asset) => {
if (!asset.album || !asset.album.id) {
let error = "Asset being processed without an album";
setStatus(error, "warn");
@ -569,14 +615,7 @@ const findOrUpdateDBAsset = async (asset) => {
});
if (results.length == 0) {
return photoDB.sequelize.query(
"INSERT INTO photos " +
"(albumId,filename,name,size) VALUES(:albumId,:filename,:name,:size)", {
replacements: asset
}).then(array => {
asset.id = array[1].lastID;
return asset;
});
return undefined;
}
asset.id = results[0].id;
@ -587,7 +626,9 @@ const findOrUpdateDBAsset = async (asset) => {
* shouldn't happen in production unless someone modifies the file, then
* re-stamps the modified time */
if (asset.size != results[0].size) {
setStatus("File was modified with time-restamp (HASH regeneration will be queued): " + asset.filename);
setStatus(
`File was modified with time-restamp (HASH regeneration will be ` +
`queued): ${asset.filename}`);
delete asset.scanned;
delete asset.modified;
}
@ -691,6 +732,7 @@ const doScan = async () => {
lastMessage = moment();
await photoDB.sequelize.transaction(async (t) => {
await Promise.mapSeries(albums, async (album) => {
await findOrCreateDBAlbum(t, album);
toProcess--;
@ -716,14 +758,15 @@ const doScan = async () => {
`${assets.length} assets remaining to be verified/updated. ETA N/A`);
let updateScanned = [],
newEntries = 0;
newEntries = 0,
needsCreation = [];
try {
let processed = 0,
start = Date.now(),
last = 0;
await Promise.mapSeries(assets, async (asset) => {
await Promise.map(assets, async (asset) => {
/* If both mtime and ctime of the asset are older than the
* lastScan, skip it
* Can only do this after a full scan has occurred */
@ -733,7 +776,11 @@ const doScan = async () => {
return;
}
asset = await findOrUpdateDBAsset(asset);
const res = await findDBAsset(asset);
if (!res) {
needsCreation.push(asset);
}
if (!asset.scanned) {
newEntries++;
}
@ -759,15 +806,27 @@ const doScan = async () => {
let remaining = assets.length - processed,
eta = Math.ceil((elapsed / 1000) * remaining / (processed - last));
setStatus(
`${remaining} assets remaining to be verified/updated (${newEntries} ` +
`${remaining} assets remaining be verified/updated (${newEntries} ` +
`new entries, ${needsProcessing.length} need processing, ` +
`${(processed - newEntries)} up-to-date so far). ETA ${eta}s`
);
last = processed;
start = Date.now();
/*, {
concurrency: 10
*/});
} , {
concurrency: 10
});
} catch (error) {
console.error(error);
process.exit(-1);
}
try {
console.log(`Creating ${needsCreation.length} asset entries.`);
await photoDB.sequelize.transaction(async (t) => {
await Promise.mapSeries(needsCreation, async (asset) => {
await createDBAsset(asset, t);
});
});
} catch (error) {
console.error(error);
process.exit(-1);
@ -787,7 +846,7 @@ const doScan = async () => {
setStatus(
`${newEntries} assets are new. ` +
`${needsProcessing.length - newEntries} assets have been ` +
`modified.\n${needsProcessing.length} assets need HASH computed. ` +
`modified.\n${needsProcessing.length} assets need to be processed. ` +
`${assets.length - needsProcessing.length} need no update.`);
processBlock(needsProcessing);