diff --git a/server/libs/domain/src/job/job.interface.ts b/server/libs/domain/src/job/job.interface.ts index bbc966eb9c..176b2942af 100644 --- a/server/libs/domain/src/job/job.interface.ts +++ b/server/libs/domain/src/job/job.interface.ts @@ -19,7 +19,7 @@ export interface IFaceThumbnailJob extends IAssetFaceJob { export interface IEntityJob extends IBaseJob { id: string; - source?: string; + source?: 'upload'; } export interface IBulkEntityJob extends IBaseJob { diff --git a/server/libs/domain/src/job/job.service.spec.ts b/server/libs/domain/src/job/job.service.spec.ts index 8b79e30d2c..303661edb6 100644 --- a/server/libs/domain/src/job/job.service.spec.ts +++ b/server/libs/domain/src/job/job.service.spec.ts @@ -1,5 +1,7 @@ +import { SystemConfig } from '@app/infra/entities'; import { BadRequestException } from '@nestjs/common'; import { + asyncTick, newAssetRepositoryMock, newCommunicationRepositoryMock, newJobRepositoryMock, @@ -7,8 +9,17 @@ import { } from '../../test'; import { IAssetRepository } from '../asset'; import { ICommunicationRepository } from '../communication'; -import { IJobRepository, JobCommand, JobHandler, JobName, JobService, QueueName } from '../job'; +import { IJobRepository, JobCommand, JobHandler, JobItem, JobName, JobService, QueueName } from '../job'; import { ISystemConfigRepository } from '../system-config'; +import { SystemConfigCore } from '../system-config/system-config.core'; + +const makeMockHandlers = (success: boolean) => { + const mock = jest.fn().mockResolvedValue(success); + return Object.values(JobName).reduce((map, jobName) => ({ ...map, [jobName]: mock }), {}) as Record< + JobName, + JobHandler + >; +}; describe(JobService.name, () => { let sut: JobService; @@ -192,16 +203,101 @@ describe(JobService.name, () => { describe('registerHandlers', () => { it('should register a handler for each queue', async () => { - const mock = jest.fn(); - const handlers = Object.values(JobName).reduce((map, jobName) => ({ ...map, [jobName]: mock }), {}) as Record< - JobName, - JobHandler - >; - - await sut.registerHandlers(handlers); - + await sut.registerHandlers(makeMockHandlers(true)); expect(configMock.load).toHaveBeenCalled(); expect(jobMock.addHandler).toHaveBeenCalledTimes(Object.keys(QueueName).length); }); + + it('should subscribe to config changes', async () => { + await sut.registerHandlers(makeMockHandlers(false)); + + const configCore = new SystemConfigCore(newSystemConfigRepositoryMock()); + configCore.config$.next({ + job: { + [QueueName.BACKGROUND_TASK]: { concurrency: 10 }, + [QueueName.CLIP_ENCODING]: { concurrency: 10 }, + [QueueName.METADATA_EXTRACTION]: { concurrency: 10 }, + [QueueName.OBJECT_TAGGING]: { concurrency: 10 }, + [QueueName.RECOGNIZE_FACES]: { concurrency: 10 }, + [QueueName.SEARCH]: { concurrency: 10 }, + [QueueName.SIDECAR]: { concurrency: 10 }, + [QueueName.STORAGE_TEMPLATE_MIGRATION]: { concurrency: 10 }, + [QueueName.THUMBNAIL_GENERATION]: { concurrency: 10 }, + [QueueName.VIDEO_CONVERSION]: { concurrency: 10 }, + }, + } as SystemConfig); + + expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.BACKGROUND_TASK, 10); + expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.CLIP_ENCODING, 10); + expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.METADATA_EXTRACTION, 10); + expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.OBJECT_TAGGING, 10); + expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.RECOGNIZE_FACES, 10); + expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.SIDECAR, 10); + expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.STORAGE_TEMPLATE_MIGRATION, 10); + expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.THUMBNAIL_GENERATION, 10); + expect(jobMock.setConcurrency).toHaveBeenCalledWith(QueueName.VIDEO_CONVERSION, 10); + }); + + const tests: Array<{ item: JobItem; jobs: JobName[] }> = [ + { + item: { name: JobName.SIDECAR_SYNC, data: { id: 'asset-1' } }, + jobs: [JobName.METADATA_EXTRACTION], + }, + { + item: { name: JobName.SIDECAR_DISCOVERY, data: { id: 'asset-1' } }, + jobs: [JobName.METADATA_EXTRACTION], + }, + { + item: { name: JobName.METADATA_EXTRACTION, data: { id: 'asset-1' } }, + jobs: [JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, JobName.SEARCH_INDEX_ASSET], + }, + { + item: { name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, data: { id: 'asset-1', source: 'upload' } }, + jobs: [JobName.GENERATE_JPEG_THUMBNAIL], + }, + { + item: { name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, data: { id: 'asset-1' } }, + jobs: [], + }, + { + item: { name: JobName.GENERATE_JPEG_THUMBNAIL, data: { id: 'asset-1' } }, + jobs: [JobName.GENERATE_WEBP_THUMBNAIL, JobName.CLASSIFY_IMAGE, JobName.ENCODE_CLIP, JobName.RECOGNIZE_FACES], + }, + { + item: { name: JobName.CLASSIFY_IMAGE, data: { id: 'asset-1' } }, + jobs: [JobName.SEARCH_INDEX_ASSET], + }, + { + item: { name: JobName.ENCODE_CLIP, data: { id: 'asset-1' } }, + jobs: [JobName.SEARCH_INDEX_ASSET], + }, + { + item: { name: JobName.RECOGNIZE_FACES, data: { id: 'asset-1' } }, + jobs: [JobName.SEARCH_INDEX_ASSET], + }, + ]; + + for (const { item, jobs } of tests) { + it(`should queue ${jobs.length} jobs when a ${item.name} job finishes successfully`, async () => { + assetMock.getByIds.mockResolvedValue([]); + + await sut.registerHandlers(makeMockHandlers(true)); + await jobMock.addHandler.mock.calls[0][2](item); + await asyncTick(3); + + expect(jobMock.queue).toHaveBeenCalledTimes(jobs.length); + for (const jobName of jobs) { + expect(jobMock.queue).toHaveBeenCalledWith({ name: jobName, data: expect.anything() }); + } + }); + + it(`should not queue any jobs when ${item.name} finishes with 'false'`, async () => { + await sut.registerHandlers(makeMockHandlers(false)); + await jobMock.addHandler.mock.calls[0][2](item); + await asyncTick(3); + + expect(jobMock.queue).not.toHaveBeenCalled(); + }); + } }); }); diff --git a/server/libs/domain/src/job/job.service.ts b/server/libs/domain/src/job/job.service.ts index c52904b9bb..1fedcde423 100644 --- a/server/libs/domain/src/job/job.service.ts +++ b/server/libs/domain/src/job/job.service.ts @@ -135,11 +135,11 @@ export class JobService { /** * Queue follow up jobs */ - async onDone(item: JobItem) { + private async onDone(item: JobItem) { switch (item.name) { case JobName.SIDECAR_SYNC: case JobName.SIDECAR_DISCOVERY: - await this.jobRepository.queue({ name: JobName.METADATA_EXTRACTION, data: { id: item.data.id } }); + await this.jobRepository.queue({ name: JobName.METADATA_EXTRACTION, data: item.data }); break; case JobName.METADATA_EXTRACTION: