0
Fork 0
mirror of https://github.com/immich-app/immich.git synced 2025-03-11 02:23:09 -05:00

use global library watch lock

This commit is contained in:
Jonathan Jogenfors 2024-02-29 18:50:05 +01:00
parent c07ef59167
commit eda6572c87

View file

@ -70,8 +70,16 @@ export class LibraryService extends EventEmitter {
async init() { async init() {
const config = await this.configCore.getConfig(); const config = await this.configCore.getConfig();
const { watch, scan } = config.library; const { watch, scan } = config.library;
this.watchLibraries = watch.enabled; this.watchLibraries = false;
if (watch.enabled) {
this.databaseRepository.withTryLock(DatabaseLock.LibraryWatch, async () => {
// This ensures that library watching only occurs in one microservice
// TODO: we could make the lock be per-library instead of global
this.watchLibraries = true;
});
}
this.jobRepository.addCronJob( this.jobRepository.addCronJob(
'libraryScan', 'libraryScan',
scan.cronExpression, scan.cronExpression,
@ -106,61 +114,59 @@ export class LibraryService extends EventEmitter {
return false; return false;
} }
this.databaseRepository.withTryLock(DatabaseLock.LibraryWatch, async () => { await this.unwatch(id);
await this.unwatch(id);
this.logger.log(`Starting to watch library ${library.id} with import path(s) ${library.importPaths}`); this.logger.log(`Starting to watch library ${library.id} with import path(s) ${library.importPaths}`);
const matcher = picomatch(`**/*{${mimeTypes.getSupportedFileExtensions().join(',')}}`, { const matcher = picomatch(`**/*{${mimeTypes.getSupportedFileExtensions().join(',')}}`, {
nocase: true, nocase: true,
ignore: library.exclusionPatterns, ignore: library.exclusionPatterns,
});
let _resolve: () => void;
const ready$ = new Promise<void>((resolve) => (_resolve = resolve));
this.watchers[id] = this.storageRepository.watch(
library.importPaths,
{
usePolling: false,
ignoreInitial: true,
},
{
onReady: () => _resolve(),
onAdd: async (path) => {
this.logger.debug(`File add event received for ${path} in library ${library.id}}`);
if (matcher(path)) {
await this.scanAssets(library.id, [path], library.ownerId, false);
}
this.emit('add', path);
},
onChange: async (path) => {
this.logger.debug(`Detected file change for ${path} in library ${library.id}`);
if (matcher(path)) {
// Note: if the changed file was not previously imported, it will be imported now.
await this.scanAssets(library.id, [path], library.ownerId, false);
}
this.emit('change', path);
},
onUnlink: async (path) => {
this.logger.debug(`Detected deleted file at ${path} in library ${library.id}`);
const asset = await this.assetRepository.getByLibraryIdAndOriginalPath(library.id, path);
if (asset && matcher(path)) {
await this.assetRepository.save({ id: asset.id, isOffline: true });
}
this.emit('unlink', path);
},
onError: (error) => {
// TODO: should we log, or throw an exception?
this.logger.error(`Library watcher for library ${library.id} encountered error: ${error}`);
},
},
);
// Wait for the watcher to initialize before returning
await ready$;
}); });
let _resolve: () => void;
const ready$ = new Promise<void>((resolve) => (_resolve = resolve));
this.watchers[id] = this.storageRepository.watch(
library.importPaths,
{
usePolling: false,
ignoreInitial: true,
},
{
onReady: () => _resolve(),
onAdd: async (path) => {
this.logger.debug(`File add event received for ${path} in library ${library.id}}`);
if (matcher(path)) {
await this.scanAssets(library.id, [path], library.ownerId, false);
}
this.emit('add', path);
},
onChange: async (path) => {
this.logger.debug(`Detected file change for ${path} in library ${library.id}`);
if (matcher(path)) {
// Note: if the changed file was not previously imported, it will be imported now.
await this.scanAssets(library.id, [path], library.ownerId, false);
}
this.emit('change', path);
},
onUnlink: async (path) => {
this.logger.debug(`Detected deleted file at ${path} in library ${library.id}`);
const asset = await this.assetRepository.getByLibraryIdAndOriginalPath(library.id, path);
if (asset && matcher(path)) {
await this.assetRepository.save({ id: asset.id, isOffline: true });
}
this.emit('unlink', path);
},
onError: (error) => {
// TODO: should we log, or throw an exception?
this.logger.error(`Library watcher for library ${library.id} encountered error: ${error}`);
},
},
);
// Wait for the watcher to initialize before returning
await ready$;
return true; return true;
} }