diff --git a/e2e/test-assets b/e2e/test-assets index 99544a2004..c4a0575c3e 160000 --- a/e2e/test-assets +++ b/e2e/test-assets @@ -1 +1 @@ -Subproject commit 99544a200412d553103cc7b8f1a28f339c7cffd9 +Subproject commit c4a0575c3e89a755b951ae6d91e7307cd34c606f diff --git a/server/src/interfaces/job.interface.ts b/server/src/interfaces/job.interface.ts index c55b2aefe0..424691f087 100644 --- a/server/src/interfaces/job.interface.ts +++ b/server/src/interfaces/job.interface.ts @@ -84,7 +84,7 @@ export enum JobName { // library management LIBRARY_QUEUE_SYNC_FILES = 'library-queue-sync-files', LIBRARY_QUEUE_SYNC_ASSETS = 'library-queue-sync-assets', - LIBRARY_SYNC_FILE = 'library-sync-file', + LIBRARY_SYNC_FILES = 'library-sync-files', LIBRARY_SYNC_ASSETS = 'library-sync-assets', LIBRARY_DELETE = 'library-delete', LIBRARY_QUEUE_SYNC_ALL = 'library-queue-sync-all', @@ -135,7 +135,7 @@ export interface IDelayedJob extends IBaseJob { export interface IEntityJob extends IBaseJob { id: string; - source?: 'upload' | 'sidecar-write' | 'copy'; + source?: 'upload' | 'library-import' | 'sidecar-write' | 'copy'; notify?: boolean; } @@ -146,7 +146,7 @@ export interface IAssetDeleteJob extends IEntityJob { export interface ILibraryFileJob { libraryId: string; ownerId: string; - assetPath: string; + assetPaths: string[]; } export interface ILibraryBulkIdsJob { @@ -290,7 +290,7 @@ export type JobItem = | { name: JobName.ASSET_DELETION_CHECK; data?: IBaseJob } // Library Management - | { name: JobName.LIBRARY_SYNC_FILE; data: ILibraryFileJob } + | { name: JobName.LIBRARY_SYNC_FILES; data: ILibraryFileJob } | { name: JobName.LIBRARY_QUEUE_SYNC_FILES; data: IEntityJob } | { name: JobName.LIBRARY_QUEUE_SYNC_ASSETS; data: IEntityJob } | { name: JobName.LIBRARY_SYNC_ASSETS; data: ILibraryBulkIdsJob } diff --git a/server/src/services/library.service.spec.ts b/server/src/services/library.service.spec.ts index 13507f6475..6441634d43 100644 --- a/server/src/services/library.service.spec.ts +++ b/server/src/services/library.service.spec.ts @@ -179,7 +179,7 @@ describe(LibraryService.name, () => { expect(jobMock.queueAll).toHaveBeenCalledWith([ { - name: JobName.LIBRARY_SYNC_FILE, + name: JobName.LIBRARY_SYNC_FILES, data: { id: libraryStub.externalLibrary1.id, ownerId: libraryStub.externalLibrary1.owner.id, @@ -960,7 +960,7 @@ describe(LibraryService.name, () => { expect(jobMock.queueAll).toHaveBeenCalledWith([ { - name: JobName.LIBRARY_SYNC_FILE, + name: JobName.LIBRARY_SYNC_FILES, data: { id: libraryStub.externalLibraryWithImportPaths1.id, assetPath: '/foo/photo.jpg', @@ -985,7 +985,7 @@ describe(LibraryService.name, () => { expect(jobMock.queueAll).toHaveBeenCalledWith([ { - name: JobName.LIBRARY_SYNC_FILE, + name: JobName.LIBRARY_SYNC_FILES, data: { id: libraryStub.externalLibraryWithImportPaths1.id, assetPath: '/foo/photo.jpg', diff --git a/server/src/services/library.service.ts b/server/src/services/library.service.ts index 2c4d17ab84..7bbf02540f 100644 --- a/server/src/services/library.service.ts +++ b/server/src/services/library.service.ts @@ -17,6 +17,7 @@ import { import { AssetEntity } from 'src/entities/asset.entity'; import { LibraryEntity } from 'src/entities/library.entity'; import { AssetStatus, AssetType, ImmichWorker } from 'src/enum'; +import { AssetCreate } from 'src/interfaces/asset.interface'; import { DatabaseLock } from 'src/interfaces/database.interface'; import { ArgOf } from 'src/interfaces/event.interface'; import { JobName, JobOf, JOBS_LIBRARY_PAGINATION_SIZE, JobStatus, QueueName } from 'src/interfaces/job.interface'; @@ -102,7 +103,10 @@ export class LibraryService extends BaseService { const handler = async (event: string, path: string) => { if (matcher(path)) { this.logger.debug(`File ${event} event received for ${path} in library ${library.id}}`); - await this.syncFiles(library, [path]); + await this.jobRepository.queue({ + name: JobName.LIBRARY_SYNC_FILES, + data: { libraryId: library.id, ownerId: library.ownerId, assetPaths: [path] }, + }); } else { this.logger.verbose(`Ignoring file ${event} event for ${path} in library ${library.id}`); } @@ -208,17 +212,23 @@ export class LibraryService extends BaseService { return mapLibrary(library); } - private async syncFiles({ id, ownerId }: LibraryEntity, assetPaths: string[]) { - await this.jobRepository.queueAll( - assetPaths.map((assetPath) => ({ - name: JobName.LIBRARY_SYNC_FILE, - data: { - libraryId: id, - assetPath, - ownerId, - }, - })), - ); + @OnJob({ name: JobName.LIBRARY_SYNC_FILES, queue: QueueName.LIBRARY }) + async handleSyncFiles(job: JobOf): Promise { + const assetImports = job.assetPaths.map((assetPath) => this.processEntity(assetPath, job.ownerId, job.libraryId)); + + const assetIds: string[] = []; + const batchSize = 1000; // Adjust the batch size as needed + for (let i = 0; i < assetImports.length; i += batchSize) { + const batch = assetImports.slice(i, i + batchSize); + const batchIds = await this.assetRepository.createAll(batch).then((assets) => assets.map((asset) => asset.id)); + assetIds.push(...batchIds); + } + + this.logger.log(`Imported ${assetIds.length} asset(s) for library ${job.libraryId}`); + + await this.queuePostSyncJobs(assetIds); + + return JobStatus.SUCCESS; } private async validateImportPath(importPath: string): Promise { @@ -332,60 +342,34 @@ export class LibraryService extends BaseService { return JobStatus.SUCCESS; } - @OnJob({ name: JobName.LIBRARY_SYNC_FILE, queue: QueueName.LIBRARY }) - async handleSyncFile(job: JobOf): Promise { - /* For performance reasons, we don't check if the asset is already imported. - This is instead handled by a previous step in the scan process. - In the edge case of an asset being imported between that check - and this function call, the database constraint will prevent duplicates. - */ - - const assetPath = path.normalize(job.assetPath); - - // TODO: we can replace this get call with an exists call - /* let asset = await this.assetRepository.getByLibraryIdAndOriginalPath(job.libraryId, assetPath); - if (asset) { - return await this.handleSyncAssets({ libraryId: job.libraryId, assetIds: [asset.id] }); - } */ - - this.logger.log(`Importing new asset ${assetPath} into library ${job.libraryId}`); - - // TODO: device asset id is deprecated, remove it - const deviceAssetId = `${basename(assetPath)}`.replaceAll(/\s+/g, ''); - - const pathHash = this.cryptoRepository.hashSha1(`path:${assetPath}`); - - const assetType = mimeTypes.isVideo(assetPath) ? AssetType.VIDEO : AssetType.IMAGE; + private processEntity(filePath: string, ownerId: string, libraryId: string): AssetCreate { + const assetPath = path.normalize(filePath); const now = new Date(); - const asset = await this.assetRepository.create({ - ownerId: job.ownerId, - libraryId: job.libraryId, - checksum: pathHash, + return { + ownerId: ownerId, + libraryId: libraryId, + checksum: this.cryptoRepository.hashSha1(`path:${assetPath}`), originalPath: assetPath, - deviceAssetId, + + // TODO: device asset id is deprecated, remove it + deviceAssetId: `${basename(assetPath)}`.replaceAll(/\s+/g, ''), deviceId: 'Library Import', fileCreatedAt: now, fileModifiedAt: now, localDateTime: now, - type: assetType, + type: mimeTypes.isVideo(assetPath) ? AssetType.VIDEO : AssetType.IMAGE, originalFileName: parse(assetPath).base, isExternal: true, - }); - - this.logger.debug(`Queueing metadata extraction for: ${asset.originalPath}`); - - await this.queuePostSyncJobs([asset.id]); - - return JobStatus.SUCCESS; + }; } async queuePostSyncJobs(assetIds: string[]) { await this.jobRepository.queueAll( assetIds.map((assetId) => ({ name: JobName.METADATA_EXTRACTION, - data: { id: assetId, source: 'upload' }, + data: { id: assetId, source: 'library-import' }, })), ); } @@ -586,7 +570,12 @@ export class LibraryService extends BaseService { const newPaths = await this.assetRepository.getNewPaths(library.id, pathBatch); if (newPaths.length > 0) { importCount += newPaths.length; - await this.syncFiles(library, newPaths); + + await this.jobRepository.queue({ + name: JobName.LIBRARY_SYNC_FILES, + data: { libraryId: library.id, ownerId: library.ownerId, assetPaths: newPaths }, + }); + if (newPaths.length < pathBatch.length) { this.logger.debug( `Current crawl batch: ${newPaths.length} of ${pathBatch.length} file(s) are new, queued import for library ${library.id}...`, diff --git a/server/src/services/metadata.service.ts b/server/src/services/metadata.service.ts index 79a7d519d6..14dae28da0 100644 --- a/server/src/services/metadata.service.ts +++ b/server/src/services/metadata.service.ts @@ -148,13 +148,17 @@ export class MetadataService extends BaseService { } @OnJob({ name: JobName.METADATA_EXTRACTION, queue: QueueName.METADATA_EXTRACTION }) - async handleMetadataExtraction({ id }: JobOf): Promise { + async handleMetadataExtraction({ id, source }: JobOf): Promise { const { metadata, reverseGeocoding } = await this.getConfig({ withCache: true }); const [asset] = await this.assetRepository.getByIds([id], { faces: { person: false } }); if (!asset) { return JobStatus.FAILED; } + if (source === 'library-import') { + await this.processSidecar(id, false); + } + const stats = await this.storageRepository.stat(asset.originalPath); const exifTags = await this.getExifTags(asset);