mirror of
https://github.com/immich-app/immich.git
synced 2024-12-31 00:43:56 -05:00
refactor: config init event for first config load (#13930)
This commit is contained in:
parent
c383e115af
commit
d456d35510
18 changed files with 160 additions and 146 deletions
|
@ -78,11 +78,11 @@ class BaseModule implements OnModuleInit, OnModuleDestroy {
|
|||
}
|
||||
|
||||
this.eventRepository.setup({ services });
|
||||
await this.eventRepository.emit('app.bootstrap', this.worker);
|
||||
await this.eventRepository.emit('app.bootstrap');
|
||||
}
|
||||
|
||||
async onModuleDestroy() {
|
||||
await this.eventRepository.emit('app.shutdown', this.worker);
|
||||
await this.eventRepository.emit('app.shutdown');
|
||||
await teardownTelemetry();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,22 +2,21 @@ import { ClassConstructor } from 'class-transformer';
|
|||
import { SystemConfig } from 'src/config';
|
||||
import { AssetResponseDto } from 'src/dtos/asset-response.dto';
|
||||
import { ReleaseNotification, ServerVersionResponseDto } from 'src/dtos/server.dto';
|
||||
import { ImmichWorker } from 'src/enum';
|
||||
import { JobItem, QueueName } from 'src/interfaces/job.interface';
|
||||
|
||||
export const IEventRepository = 'IEventRepository';
|
||||
|
||||
type EventMap = {
|
||||
// app events
|
||||
'app.bootstrap': [ImmichWorker];
|
||||
'app.shutdown': [ImmichWorker];
|
||||
'app.bootstrap': [];
|
||||
'app.shutdown': [];
|
||||
|
||||
'config.init': [{ newConfig: SystemConfig }];
|
||||
// config events
|
||||
'config.update': [
|
||||
{
|
||||
newConfig: SystemConfig;
|
||||
/** When the server starts, `oldConfig` is `undefined` */
|
||||
oldConfig?: SystemConfig;
|
||||
oldConfig: SystemConfig;
|
||||
},
|
||||
];
|
||||
'config.validate': [{ newConfig: SystemConfig; oldConfig: SystemConfig }];
|
||||
|
@ -89,6 +88,13 @@ export type EventItem<T extends EmitEvent> = {
|
|||
server: boolean;
|
||||
};
|
||||
|
||||
export enum BootstrapEventPriority {
|
||||
// Database service should be initialized before anything else, most other services need database access
|
||||
DatabaseService = -200,
|
||||
// Initialise config after other bootstrap services, stop other services from using config on bootstrap
|
||||
SystemConfig = 100,
|
||||
}
|
||||
|
||||
export interface IEventRepository {
|
||||
setup(options: { services: ClassConstructor<unknown>[] }): void;
|
||||
emit<T extends keyof EventMap>(event: T, ...args: ArgsOf<T>): Promise<void>;
|
||||
|
|
|
@ -2,6 +2,7 @@ import { PassThrough } from 'node:stream';
|
|||
import { defaults, SystemConfig } from 'src/config';
|
||||
import { StorageCore } from 'src/cores/storage.core';
|
||||
import { ImmichWorker, StorageFolder } from 'src/enum';
|
||||
import { IConfigRepository } from 'src/interfaces/config.interface';
|
||||
import { IDatabaseRepository } from 'src/interfaces/database.interface';
|
||||
import { IJobRepository, JobStatus } from 'src/interfaces/job.interface';
|
||||
import { IProcessRepository } from 'src/interfaces/process.interface';
|
||||
|
@ -16,13 +17,14 @@ describe(BackupService.name, () => {
|
|||
let sut: BackupService;
|
||||
|
||||
let databaseMock: Mocked<IDatabaseRepository>;
|
||||
let configMock: Mocked<IConfigRepository>;
|
||||
let jobMock: Mocked<IJobRepository>;
|
||||
let processMock: Mocked<IProcessRepository>;
|
||||
let storageMock: Mocked<IStorageRepository>;
|
||||
let systemMock: Mocked<ISystemMetadataRepository>;
|
||||
|
||||
beforeEach(() => {
|
||||
({ sut, databaseMock, jobMock, processMock, storageMock, systemMock } = newTestService(BackupService));
|
||||
({ sut, configMock, databaseMock, jobMock, processMock, storageMock, systemMock } = newTestService(BackupService));
|
||||
});
|
||||
|
||||
it('should work', () => {
|
||||
|
@ -32,25 +34,23 @@ describe(BackupService.name, () => {
|
|||
describe('onBootstrapEvent', () => {
|
||||
it('should init cron job and handle config changes', async () => {
|
||||
databaseMock.tryLock.mockResolvedValue(true);
|
||||
systemMock.get.mockResolvedValue(systemConfigStub.backupEnabled);
|
||||
|
||||
await sut.onBootstrap(ImmichWorker.API);
|
||||
await sut.onConfigInit({ newConfig: systemConfigStub.backupEnabled as SystemConfig });
|
||||
|
||||
expect(jobMock.addCronJob).toHaveBeenCalled();
|
||||
expect(systemMock.get).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should not initialize backup database cron job when lock is taken', async () => {
|
||||
systemMock.get.mockResolvedValue(systemConfigStub.backupEnabled);
|
||||
databaseMock.tryLock.mockResolvedValue(false);
|
||||
|
||||
await sut.onBootstrap(ImmichWorker.API);
|
||||
await sut.onConfigInit({ newConfig: systemConfigStub.backupEnabled as SystemConfig });
|
||||
|
||||
expect(jobMock.addCronJob).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should not initialise backup database job when running on microservices', async () => {
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
configMock.getWorker.mockReturnValue(ImmichWorker.MICROSERVICES);
|
||||
await sut.onConfigInit({ newConfig: systemConfigStub.backupEnabled as SystemConfig });
|
||||
|
||||
expect(jobMock.addCronJob).not.toHaveBeenCalled();
|
||||
});
|
||||
|
@ -58,9 +58,8 @@ describe(BackupService.name, () => {
|
|||
|
||||
describe('onConfigUpdateEvent', () => {
|
||||
beforeEach(async () => {
|
||||
systemMock.get.mockResolvedValue(defaults);
|
||||
databaseMock.tryLock.mockResolvedValue(true);
|
||||
await sut.onBootstrap(ImmichWorker.API);
|
||||
await sut.onConfigInit({ newConfig: defaults });
|
||||
});
|
||||
|
||||
it('should update cron job if backup is enabled', () => {
|
||||
|
@ -80,14 +79,9 @@ describe(BackupService.name, () => {
|
|||
expect(jobMock.updateCronJob).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should do nothing if oldConfig is not provided', () => {
|
||||
sut.onConfigUpdate({ newConfig: systemConfigStub.backupEnabled as SystemConfig });
|
||||
expect(jobMock.updateCronJob).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should do nothing if instance does not have the backup database lock', async () => {
|
||||
databaseMock.tryLock.mockResolvedValue(false);
|
||||
await sut.onBootstrap(ImmichWorker.API);
|
||||
await sut.onConfigInit({ newConfig: defaults });
|
||||
sut.onConfigUpdate({ newConfig: systemConfigStub.backupEnabled as SystemConfig, oldConfig: defaults });
|
||||
expect(jobMock.updateCronJob).not.toHaveBeenCalled();
|
||||
});
|
||||
|
|
|
@ -14,14 +14,15 @@ import { validateCronExpression } from 'src/validation';
|
|||
export class BackupService extends BaseService {
|
||||
private backupLock = false;
|
||||
|
||||
@OnEvent({ name: 'app.bootstrap' })
|
||||
async onBootstrap(workerType: ImmichWorker) {
|
||||
if (workerType !== ImmichWorker.API) {
|
||||
@OnEvent({ name: 'config.init' })
|
||||
async onConfigInit({
|
||||
newConfig: {
|
||||
backup: { database },
|
||||
},
|
||||
}: ArgOf<'config.init'>) {
|
||||
if (this.worker !== ImmichWorker.API) {
|
||||
return;
|
||||
}
|
||||
const {
|
||||
backup: { database },
|
||||
} = await this.getConfig({ withCache: true });
|
||||
|
||||
this.backupLock = await this.databaseRepository.tryLock(DatabaseLock.BackupDatabase);
|
||||
|
||||
|
@ -36,8 +37,8 @@ export class BackupService extends BaseService {
|
|||
}
|
||||
|
||||
@OnEvent({ name: 'config.update', server: true })
|
||||
onConfigUpdate({ newConfig: { backup }, oldConfig }: ArgOf<'config.update'>) {
|
||||
if (!oldConfig || !this.backupLock) {
|
||||
onConfigUpdate({ newConfig: { backup } }: ArgOf<'config.update'>) {
|
||||
if (!this.backupLock) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ import {
|
|||
VectorExtension,
|
||||
VectorIndex,
|
||||
} from 'src/interfaces/database.interface';
|
||||
import { BootstrapEventPriority } from 'src/interfaces/event.interface';
|
||||
import { BaseService } from 'src/services/base.service';
|
||||
|
||||
type CreateFailedArgs = { name: string; extension: string; otherName: string };
|
||||
|
@ -64,7 +65,7 @@ const RETRY_DURATION = Duration.fromObject({ seconds: 5 });
|
|||
export class DatabaseService extends BaseService {
|
||||
private reconnection?: NodeJS.Timeout;
|
||||
|
||||
@OnEvent({ name: 'app.bootstrap', priority: -200 })
|
||||
@OnEvent({ name: 'app.bootstrap', priority: BootstrapEventPriority.DatabaseService })
|
||||
async onBootstrap() {
|
||||
const version = await this.databaseRepository.getPostgresVersion();
|
||||
const current = semver.coerce(version);
|
||||
|
|
|
@ -31,7 +31,7 @@ describe(JobService.name, () => {
|
|||
|
||||
describe('onConfigUpdate', () => {
|
||||
it('should update concurrency', () => {
|
||||
sut.onConfigUpdate({ oldConfig: defaults, newConfig: defaults });
|
||||
sut.onConfigInitOrUpdate({ newConfig: defaults });
|
||||
|
||||
expect(jobMock.setConcurrency).toHaveBeenCalledTimes(15);
|
||||
expect(jobMock.setConcurrency).toHaveBeenNthCalledWith(5, QueueName.FACIAL_RECOGNITION, 1);
|
||||
|
|
|
@ -38,8 +38,9 @@ const asJobItem = (dto: JobCreateDto): JobItem => {
|
|||
|
||||
@Injectable()
|
||||
export class JobService extends BaseService {
|
||||
@OnEvent({ name: 'config.init' })
|
||||
@OnEvent({ name: 'config.update', server: true })
|
||||
onConfigUpdate({ newConfig: config }: ArgOf<'config.update'>) {
|
||||
onConfigInitOrUpdate({ newConfig: config }: ArgOf<'config.init'>) {
|
||||
if (this.worker !== ImmichWorker.MICROSERVICES) {
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import { mapLibrary } from 'src/dtos/library.dto';
|
|||
import { UserEntity } from 'src/entities/user.entity';
|
||||
import { AssetType, ImmichWorker } from 'src/enum';
|
||||
import { IAssetRepository } from 'src/interfaces/asset.interface';
|
||||
import { IConfigRepository } from 'src/interfaces/config.interface';
|
||||
import { IDatabaseRepository } from 'src/interfaces/database.interface';
|
||||
import {
|
||||
IJobRepository,
|
||||
|
@ -16,7 +17,6 @@ import {
|
|||
} from 'src/interfaces/job.interface';
|
||||
import { ILibraryRepository } from 'src/interfaces/library.interface';
|
||||
import { IStorageRepository } from 'src/interfaces/storage.interface';
|
||||
import { ISystemMetadataRepository } from 'src/interfaces/system-metadata.interface';
|
||||
import { LibraryService } from 'src/services/library.service';
|
||||
import { assetStub } from 'test/fixtures/asset.stub';
|
||||
import { authStub } from 'test/fixtures/auth.stub';
|
||||
|
@ -35,30 +35,28 @@ describe(LibraryService.name, () => {
|
|||
let sut: LibraryService;
|
||||
|
||||
let assetMock: Mocked<IAssetRepository>;
|
||||
let configMock: Mocked<IConfigRepository>;
|
||||
let databaseMock: Mocked<IDatabaseRepository>;
|
||||
let jobMock: Mocked<IJobRepository>;
|
||||
let libraryMock: Mocked<ILibraryRepository>;
|
||||
let storageMock: Mocked<IStorageRepository>;
|
||||
let systemMock: Mocked<ISystemMetadataRepository>;
|
||||
|
||||
beforeEach(() => {
|
||||
({ sut, assetMock, databaseMock, jobMock, libraryMock, storageMock, systemMock } = newTestService(LibraryService));
|
||||
({ sut, assetMock, configMock, databaseMock, jobMock, libraryMock, storageMock } = newTestService(LibraryService));
|
||||
|
||||
databaseMock.tryLock.mockResolvedValue(true);
|
||||
configMock.getWorker.mockReturnValue(ImmichWorker.MICROSERVICES);
|
||||
});
|
||||
|
||||
it('should work', () => {
|
||||
expect(sut).toBeDefined();
|
||||
});
|
||||
|
||||
describe('onBootstrapEvent', () => {
|
||||
describe('onConfigInit', () => {
|
||||
it('should init cron job and handle config changes', async () => {
|
||||
systemMock.get.mockResolvedValue(systemConfigStub.libraryScan);
|
||||
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
await sut.onConfigInit({ newConfig: defaults });
|
||||
|
||||
expect(jobMock.addCronJob).toHaveBeenCalled();
|
||||
expect(systemMock.get).toHaveBeenCalled();
|
||||
|
||||
await sut.onConfigUpdate({
|
||||
oldConfig: defaults,
|
||||
|
@ -82,7 +80,6 @@ describe(LibraryService.name, () => {
|
|||
libraryStub.externalLibraryWithImportPaths2,
|
||||
]);
|
||||
|
||||
systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled);
|
||||
libraryMock.get.mockImplementation((id) =>
|
||||
Promise.resolve(
|
||||
[libraryStub.externalLibraryWithImportPaths1, libraryStub.externalLibraryWithImportPaths2].find(
|
||||
|
@ -91,7 +88,7 @@ describe(LibraryService.name, () => {
|
|||
),
|
||||
);
|
||||
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
await sut.onConfigInit({ newConfig: systemConfigStub.libraryWatchEnabled as SystemConfig });
|
||||
|
||||
expect(storageMock.watch.mock.calls).toEqual(
|
||||
expect.arrayContaining([
|
||||
|
@ -102,33 +99,30 @@ describe(LibraryService.name, () => {
|
|||
});
|
||||
|
||||
it('should not initialize watcher when watching is disabled', async () => {
|
||||
systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchDisabled);
|
||||
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
await sut.onConfigInit({ newConfig: systemConfigStub.libraryWatchDisabled as SystemConfig });
|
||||
|
||||
expect(storageMock.watch).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should not initialize watcher when lock is taken', async () => {
|
||||
systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled);
|
||||
databaseMock.tryLock.mockResolvedValue(false);
|
||||
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
await sut.onConfigInit({ newConfig: systemConfigStub.libraryWatchEnabled as SystemConfig });
|
||||
|
||||
expect(storageMock.watch).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should not initialize library scan cron job when lock is taken', async () => {
|
||||
systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled);
|
||||
databaseMock.tryLock.mockResolvedValue(false);
|
||||
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
await sut.onConfigInit({ newConfig: systemConfigStub.libraryWatchEnabled as SystemConfig });
|
||||
|
||||
expect(jobMock.addCronJob).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should not initialize watcher or library scan job when running on api', async () => {
|
||||
await sut.onBootstrap(ImmichWorker.API);
|
||||
configMock.getWorker.mockReturnValue(ImmichWorker.API);
|
||||
await sut.onConfigInit({ newConfig: systemConfigStub.libraryScan as SystemConfig });
|
||||
|
||||
expect(jobMock.addCronJob).not.toHaveBeenCalled();
|
||||
});
|
||||
|
@ -136,19 +130,13 @@ describe(LibraryService.name, () => {
|
|||
|
||||
describe('onConfigUpdateEvent', () => {
|
||||
beforeEach(async () => {
|
||||
systemMock.get.mockResolvedValue(defaults);
|
||||
databaseMock.tryLock.mockResolvedValue(true);
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
});
|
||||
|
||||
it('should do nothing if oldConfig is not provided', async () => {
|
||||
await sut.onConfigUpdate({ newConfig: systemConfigStub.libraryScan as SystemConfig });
|
||||
expect(jobMock.updateCronJob).not.toHaveBeenCalled();
|
||||
await sut.onConfigInit({ newConfig: defaults });
|
||||
});
|
||||
|
||||
it('should do nothing if instance does not have the watch lock', async () => {
|
||||
databaseMock.tryLock.mockResolvedValue(false);
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
await sut.onConfigInit({ newConfig: defaults });
|
||||
await sut.onConfigUpdate({ newConfig: systemConfigStub.libraryScan as SystemConfig, oldConfig: defaults });
|
||||
expect(jobMock.updateCronJob).not.toHaveBeenCalled();
|
||||
});
|
||||
|
@ -156,9 +144,7 @@ describe(LibraryService.name, () => {
|
|||
it('should update cron job and enable watching', async () => {
|
||||
libraryMock.getAll.mockResolvedValue([]);
|
||||
await sut.onConfigUpdate({
|
||||
newConfig: {
|
||||
library: { ...systemConfigStub.libraryScan.library, ...systemConfigStub.libraryWatchEnabled.library },
|
||||
} as SystemConfig,
|
||||
newConfig: systemConfigStub.libraryScanAndWatch as SystemConfig,
|
||||
oldConfig: defaults,
|
||||
});
|
||||
|
||||
|
@ -172,15 +158,11 @@ describe(LibraryService.name, () => {
|
|||
it('should update cron job and disable watching', async () => {
|
||||
libraryMock.getAll.mockResolvedValue([]);
|
||||
await sut.onConfigUpdate({
|
||||
newConfig: {
|
||||
library: { ...systemConfigStub.libraryScan.library, ...systemConfigStub.libraryWatchEnabled.library },
|
||||
} as SystemConfig,
|
||||
newConfig: systemConfigStub.libraryScanAndWatch as SystemConfig,
|
||||
oldConfig: defaults,
|
||||
});
|
||||
await sut.onConfigUpdate({
|
||||
newConfig: {
|
||||
library: { ...systemConfigStub.libraryScan.library, ...systemConfigStub.libraryWatchDisabled.library },
|
||||
} as SystemConfig,
|
||||
newConfig: systemConfigStub.libraryScan as SystemConfig,
|
||||
oldConfig: defaults,
|
||||
});
|
||||
|
||||
|
@ -703,12 +685,10 @@ describe(LibraryService.name, () => {
|
|||
libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1);
|
||||
libraryMock.getAll.mockResolvedValue([libraryStub.externalLibraryWithImportPaths1]);
|
||||
|
||||
systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled);
|
||||
|
||||
const mockClose = vitest.fn();
|
||||
storageMock.watch.mockImplementation(makeMockWatcher({ close: mockClose }));
|
||||
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
await sut.onConfigInit({ newConfig: systemConfigStub.libraryWatchEnabled as SystemConfig });
|
||||
await sut.delete(libraryStub.externalLibraryWithImportPaths1.id);
|
||||
|
||||
expect(mockClose).toHaveBeenCalled();
|
||||
|
@ -837,12 +817,11 @@ describe(LibraryService.name, () => {
|
|||
});
|
||||
|
||||
it('should create watched with import paths', async () => {
|
||||
systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled);
|
||||
libraryMock.create.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1);
|
||||
libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1);
|
||||
libraryMock.getAll.mockResolvedValue([]);
|
||||
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
await sut.onConfigInit({ newConfig: systemConfigStub.libraryWatchEnabled as SystemConfig });
|
||||
await sut.create({
|
||||
ownerId: authStub.admin.user.id,
|
||||
importPaths: libraryStub.externalLibraryWithImportPaths1.importPaths,
|
||||
|
@ -902,10 +881,9 @@ describe(LibraryService.name, () => {
|
|||
|
||||
describe('update', () => {
|
||||
beforeEach(async () => {
|
||||
systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled);
|
||||
libraryMock.getAll.mockResolvedValue([]);
|
||||
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
await sut.onConfigInit({ newConfig: systemConfigStub.libraryWatchEnabled as SystemConfig });
|
||||
});
|
||||
|
||||
it('should throw an error if an import path is invalid', async () => {
|
||||
|
@ -944,9 +922,7 @@ describe(LibraryService.name, () => {
|
|||
|
||||
describe('watching disabled', () => {
|
||||
beforeEach(async () => {
|
||||
systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchDisabled);
|
||||
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
await sut.onConfigInit({ newConfig: systemConfigStub.libraryWatchDisabled as SystemConfig });
|
||||
});
|
||||
|
||||
it('should not watch library', async () => {
|
||||
|
@ -960,9 +936,8 @@ describe(LibraryService.name, () => {
|
|||
|
||||
describe('watching enabled', () => {
|
||||
beforeEach(async () => {
|
||||
systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled);
|
||||
libraryMock.getAll.mockResolvedValue([]);
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
await sut.onConfigInit({ newConfig: systemConfigStub.libraryWatchEnabled as SystemConfig });
|
||||
});
|
||||
|
||||
it('should watch library', async () => {
|
||||
|
@ -1114,7 +1089,6 @@ describe(LibraryService.name, () => {
|
|||
libraryStub.externalLibraryWithImportPaths2,
|
||||
]);
|
||||
|
||||
systemMock.get.mockResolvedValue(systemConfigStub.libraryWatchEnabled);
|
||||
libraryMock.get.mockResolvedValue(libraryStub.externalLibrary1);
|
||||
|
||||
libraryMock.get.mockImplementation((id) =>
|
||||
|
@ -1128,7 +1102,7 @@ describe(LibraryService.name, () => {
|
|||
const mockClose = vitest.fn();
|
||||
storageMock.watch.mockImplementation(makeMockWatcher({ close: mockClose }));
|
||||
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
await sut.onConfigInit({ newConfig: systemConfigStub.libraryWatchEnabled as SystemConfig });
|
||||
await sut.onShutdown();
|
||||
|
||||
expect(mockClose).toHaveBeenCalledTimes(2);
|
||||
|
|
|
@ -32,16 +32,16 @@ export class LibraryService extends BaseService {
|
|||
private lock = false;
|
||||
private watchers: Record<string, () => Promise<void>> = {};
|
||||
|
||||
@OnEvent({ name: 'app.bootstrap' })
|
||||
async onBootstrap(workerType: ImmichWorker) {
|
||||
if (workerType !== ImmichWorker.MICROSERVICES) {
|
||||
@OnEvent({ name: 'config.init' })
|
||||
async onConfigInit({
|
||||
newConfig: {
|
||||
library: { watch, scan },
|
||||
},
|
||||
}: ArgOf<'config.init'>) {
|
||||
if (this.worker !== ImmichWorker.MICROSERVICES) {
|
||||
return;
|
||||
}
|
||||
|
||||
const config = await this.getConfig({ withCache: false });
|
||||
|
||||
const { watch, scan } = config.library;
|
||||
|
||||
// This ensures that library watching only occurs in one microservice
|
||||
this.lock = await this.databaseRepository.tryLock(DatabaseLock.Library);
|
||||
|
||||
|
@ -62,8 +62,8 @@ export class LibraryService extends BaseService {
|
|||
}
|
||||
|
||||
@OnEvent({ name: 'config.update', server: true })
|
||||
async onConfigUpdate({ newConfig: { library }, oldConfig }: ArgOf<'config.update'>) {
|
||||
if (!oldConfig || !this.lock) {
|
||||
async onConfigUpdate({ newConfig: { library } }: ArgOf<'config.update'>) {
|
||||
if (!this.lock) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import { ExifEntity } from 'src/entities/exif.entity';
|
|||
import { AssetType, ImmichWorker, SourceType } from 'src/enum';
|
||||
import { IAlbumRepository } from 'src/interfaces/album.interface';
|
||||
import { IAssetRepository, WithoutProperty } from 'src/interfaces/asset.interface';
|
||||
import { IConfigRepository } from 'src/interfaces/config.interface';
|
||||
import { ICryptoRepository } from 'src/interfaces/crypto.interface';
|
||||
import { IEventRepository } from 'src/interfaces/event.interface';
|
||||
import { IJobRepository, JobName, JobStatus } from 'src/interfaces/job.interface';
|
||||
|
@ -32,6 +33,7 @@ describe(MetadataService.name, () => {
|
|||
|
||||
let albumMock: Mocked<IAlbumRepository>;
|
||||
let assetMock: Mocked<IAssetRepository>;
|
||||
let configMock: Mocked<IConfigRepository>;
|
||||
let cryptoMock: Mocked<ICryptoRepository>;
|
||||
let eventMock: Mocked<IEventRepository>;
|
||||
let jobMock: Mocked<IJobRepository>;
|
||||
|
@ -55,6 +57,7 @@ describe(MetadataService.name, () => {
|
|||
sut,
|
||||
albumMock,
|
||||
assetMock,
|
||||
configMock,
|
||||
cryptoMock,
|
||||
eventMock,
|
||||
jobMock,
|
||||
|
@ -70,6 +73,8 @@ describe(MetadataService.name, () => {
|
|||
|
||||
mockReadTags();
|
||||
|
||||
configMock.getWorker.mockReturnValue(ImmichWorker.MICROSERVICES);
|
||||
|
||||
delete process.env.TZ;
|
||||
});
|
||||
|
||||
|
@ -83,17 +88,16 @@ describe(MetadataService.name, () => {
|
|||
|
||||
describe('onBootstrapEvent', () => {
|
||||
it('should pause and resume queue during init', async () => {
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
await sut.onBootstrap();
|
||||
|
||||
expect(jobMock.pause).toHaveBeenCalledTimes(1);
|
||||
expect(mapMock.init).toHaveBeenCalledTimes(1);
|
||||
expect(jobMock.resume).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('should return if reverse geocoding is disabled', async () => {
|
||||
systemMock.get.mockResolvedValue({ reverseGeocoding: { enabled: false } });
|
||||
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
it('should return if running on api', async () => {
|
||||
configMock.getWorker.mockReturnValue(ImmichWorker.API);
|
||||
await sut.onBootstrap();
|
||||
|
||||
expect(jobMock.pause).not.toHaveBeenCalled();
|
||||
expect(mapMock.init).not.toHaveBeenCalled();
|
||||
|
|
|
@ -80,12 +80,12 @@ const validateRange = (value: number | undefined, min: number, max: number): Non
|
|||
@Injectable()
|
||||
export class MetadataService extends BaseService {
|
||||
@OnEvent({ name: 'app.bootstrap' })
|
||||
async onBootstrap(app: ArgOf<'app.bootstrap'>) {
|
||||
if (app !== ImmichWorker.MICROSERVICES) {
|
||||
async onBootstrap() {
|
||||
if (this.worker !== ImmichWorker.MICROSERVICES) {
|
||||
return;
|
||||
}
|
||||
const config = await this.getConfig({ withCache: false });
|
||||
await this.init(config);
|
||||
this.logger.log('Bootstrapping metadata service');
|
||||
await this.init();
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'app.shutdown' })
|
||||
|
@ -93,17 +93,8 @@ export class MetadataService extends BaseService {
|
|||
await this.metadataRepository.teardown();
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'config.update' })
|
||||
async onConfigUpdate({ newConfig }: ArgOf<'config.update'>) {
|
||||
await this.init(newConfig);
|
||||
}
|
||||
|
||||
private async init({ reverseGeocoding }: SystemConfig) {
|
||||
const { enabled } = reverseGeocoding;
|
||||
|
||||
if (!enabled) {
|
||||
return;
|
||||
}
|
||||
private async init() {
|
||||
this.logger.log('Initializing metadata service');
|
||||
|
||||
try {
|
||||
await this.jobRepository.pause(QueueName.METADATA_EXTRACTION);
|
||||
|
|
|
@ -77,7 +77,7 @@ describe(NotificationService.name, () => {
|
|||
|
||||
describe('onConfigUpdate', () => {
|
||||
it('should emit client and server events', () => {
|
||||
const update = { newConfig: defaults };
|
||||
const update = { oldConfig: defaults, newConfig: defaults };
|
||||
expect(sut.onConfigUpdate(update)).toBeUndefined();
|
||||
expect(eventMock.clientBroadcast).toHaveBeenCalledWith('on_config_update');
|
||||
expect(eventMock.serverSend).toHaveBeenCalledWith('config.update', update);
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import { SystemConfig } from 'src/config';
|
||||
import { ImmichWorker } from 'src/enum';
|
||||
import { IAssetRepository, WithoutProperty } from 'src/interfaces/asset.interface';
|
||||
import { IConfigRepository } from 'src/interfaces/config.interface';
|
||||
import { IDatabaseRepository } from 'src/interfaces/database.interface';
|
||||
import { IJobRepository, JobName, JobStatus } from 'src/interfaces/job.interface';
|
||||
import { IMachineLearningRepository } from 'src/interfaces/machine-learning.interface';
|
||||
|
@ -22,12 +23,14 @@ describe(SmartInfoService.name, () => {
|
|||
let machineLearningMock: Mocked<IMachineLearningRepository>;
|
||||
let searchMock: Mocked<ISearchRepository>;
|
||||
let systemMock: Mocked<ISystemMetadataRepository>;
|
||||
let configMock: Mocked<IConfigRepository>;
|
||||
|
||||
beforeEach(() => {
|
||||
({ sut, assetMock, databaseMock, jobMock, machineLearningMock, searchMock, systemMock } =
|
||||
({ sut, assetMock, databaseMock, jobMock, machineLearningMock, searchMock, systemMock, configMock } =
|
||||
newTestService(SmartInfoService));
|
||||
|
||||
assetMock.getByIds.mockResolvedValue([assetStub.image]);
|
||||
configMock.getWorker.mockReturnValue(ImmichWorker.MICROSERVICES);
|
||||
});
|
||||
|
||||
it('should work', () => {
|
||||
|
@ -63,11 +66,11 @@ describe(SmartInfoService.name, () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('onBootstrapEvent', () => {
|
||||
describe('onConfigInit', () => {
|
||||
it('should return if not microservices', async () => {
|
||||
await sut.onBootstrap(ImmichWorker.API);
|
||||
configMock.getWorker.mockReturnValue(ImmichWorker.API);
|
||||
await sut.onConfigInit({ newConfig: systemConfigStub.machineLearningEnabled as SystemConfig });
|
||||
|
||||
expect(systemMock.get).not.toHaveBeenCalled();
|
||||
expect(searchMock.getDimensionSize).not.toHaveBeenCalled();
|
||||
expect(searchMock.setDimensionSize).not.toHaveBeenCalled();
|
||||
expect(searchMock.deleteAllSearchEmbeddings).not.toHaveBeenCalled();
|
||||
|
@ -78,11 +81,8 @@ describe(SmartInfoService.name, () => {
|
|||
});
|
||||
|
||||
it('should return if machine learning is disabled', async () => {
|
||||
systemMock.get.mockResolvedValue(systemConfigStub.machineLearningDisabled);
|
||||
await sut.onConfigInit({ newConfig: systemConfigStub.machineLearningDisabled as SystemConfig });
|
||||
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
|
||||
expect(systemMock.get).toHaveBeenCalledTimes(1);
|
||||
expect(searchMock.getDimensionSize).not.toHaveBeenCalled();
|
||||
expect(searchMock.setDimensionSize).not.toHaveBeenCalled();
|
||||
expect(searchMock.deleteAllSearchEmbeddings).not.toHaveBeenCalled();
|
||||
|
@ -95,9 +95,8 @@ describe(SmartInfoService.name, () => {
|
|||
it('should return if model and DB dimension size are equal', async () => {
|
||||
searchMock.getDimensionSize.mockResolvedValue(512);
|
||||
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
await sut.onConfigInit({ newConfig: systemConfigStub.machineLearningEnabled as SystemConfig });
|
||||
|
||||
expect(systemMock.get).toHaveBeenCalledTimes(1);
|
||||
expect(searchMock.getDimensionSize).toHaveBeenCalledTimes(1);
|
||||
expect(searchMock.setDimensionSize).not.toHaveBeenCalled();
|
||||
expect(searchMock.deleteAllSearchEmbeddings).not.toHaveBeenCalled();
|
||||
|
@ -111,9 +110,8 @@ describe(SmartInfoService.name, () => {
|
|||
searchMock.getDimensionSize.mockResolvedValue(768);
|
||||
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
|
||||
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
await sut.onConfigInit({ newConfig: systemConfigStub.machineLearningEnabled as SystemConfig });
|
||||
|
||||
expect(systemMock.get).toHaveBeenCalledTimes(1);
|
||||
expect(searchMock.getDimensionSize).toHaveBeenCalledTimes(1);
|
||||
expect(searchMock.setDimensionSize).toHaveBeenCalledWith(512);
|
||||
expect(jobMock.getQueueStatus).toHaveBeenCalledTimes(1);
|
||||
|
@ -126,9 +124,8 @@ describe(SmartInfoService.name, () => {
|
|||
searchMock.getDimensionSize.mockResolvedValue(768);
|
||||
jobMock.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: true });
|
||||
|
||||
await sut.onBootstrap(ImmichWorker.MICROSERVICES);
|
||||
await sut.onConfigInit({ newConfig: systemConfigStub.machineLearningEnabled as SystemConfig });
|
||||
|
||||
expect(systemMock.get).toHaveBeenCalledTimes(1);
|
||||
expect(searchMock.getDimensionSize).toHaveBeenCalledTimes(1);
|
||||
expect(searchMock.setDimensionSize).toHaveBeenCalledWith(512);
|
||||
expect(jobMock.getQueueStatus).toHaveBeenCalledTimes(1);
|
||||
|
@ -139,6 +136,22 @@ describe(SmartInfoService.name, () => {
|
|||
});
|
||||
|
||||
describe('onConfigUpdateEvent', () => {
|
||||
it('should return if not microservices', async () => {
|
||||
configMock.getWorker.mockReturnValue(ImmichWorker.API);
|
||||
await sut.onConfigUpdate({
|
||||
newConfig: systemConfigStub.machineLearningEnabled as SystemConfig,
|
||||
oldConfig: systemConfigStub.machineLearningEnabled as SystemConfig,
|
||||
});
|
||||
|
||||
expect(searchMock.getDimensionSize).not.toHaveBeenCalled();
|
||||
expect(searchMock.setDimensionSize).not.toHaveBeenCalled();
|
||||
expect(searchMock.deleteAllSearchEmbeddings).not.toHaveBeenCalled();
|
||||
expect(jobMock.getQueueStatus).not.toHaveBeenCalled();
|
||||
expect(jobMock.pause).not.toHaveBeenCalled();
|
||||
expect(jobMock.waitForQueueCompletion).not.toHaveBeenCalled();
|
||||
expect(jobMock.resume).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should return if machine learning is disabled', async () => {
|
||||
systemMock.get.mockResolvedValue(systemConfigStub.machineLearningDisabled);
|
||||
|
||||
|
|
|
@ -13,17 +13,12 @@ import { usePagination } from 'src/utils/pagination';
|
|||
|
||||
@Injectable()
|
||||
export class SmartInfoService extends BaseService {
|
||||
@OnEvent({ name: 'app.bootstrap' })
|
||||
async onBootstrap(app: ArgOf<'app.bootstrap'>) {
|
||||
if (app !== ImmichWorker.MICROSERVICES) {
|
||||
return;
|
||||
}
|
||||
|
||||
const config = await this.getConfig({ withCache: false });
|
||||
await this.init(config);
|
||||
@OnEvent({ name: 'config.init' })
|
||||
async onConfigInit({ newConfig }: ArgOf<'config.init'>) {
|
||||
await this.init(newConfig);
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'config.update' })
|
||||
@OnEvent({ name: 'config.update', server: true })
|
||||
async onConfigUpdate({ oldConfig, newConfig }: ArgOf<'config.update'>) {
|
||||
await this.init(newConfig, oldConfig);
|
||||
}
|
||||
|
@ -40,7 +35,7 @@ export class SmartInfoService extends BaseService {
|
|||
}
|
||||
|
||||
private async init(newConfig: SystemConfig, oldConfig?: SystemConfig) {
|
||||
if (!isSmartSearchEnabled(newConfig.machineLearning)) {
|
||||
if (this.worker !== ImmichWorker.MICROSERVICES || !isSmartSearchEnabled(newConfig.machineLearning)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ describe(StorageTemplateService.name, () => {
|
|||
|
||||
systemMock.get.mockResolvedValue({ storageTemplate: { enabled: true } });
|
||||
|
||||
sut.onConfigUpdate({ newConfig: defaults });
|
||||
sut.onConfigInitOrUpdate({ newConfig: defaults });
|
||||
});
|
||||
|
||||
describe('onConfigValidate', () => {
|
||||
|
@ -171,7 +171,7 @@ describe(StorageTemplateService.name, () => {
|
|||
const config = structuredClone(defaults);
|
||||
config.storageTemplate.template = '{{y}}/{{#if album}}{{album}}{{else}}other/{{MM}}{{/if}}/{{filename}}';
|
||||
|
||||
sut.onConfigUpdate({ oldConfig: defaults, newConfig: config });
|
||||
sut.onConfigInitOrUpdate({ newConfig: config });
|
||||
|
||||
userMock.get.mockResolvedValue(user);
|
||||
assetMock.getByIds.mockResolvedValueOnce([asset]);
|
||||
|
@ -192,7 +192,7 @@ describe(StorageTemplateService.name, () => {
|
|||
const user = userStub.user1;
|
||||
const config = structuredClone(defaults);
|
||||
config.storageTemplate.template = '{{y}}/{{#if album}}{{album}}{{else}}other//{{MM}}{{/if}}/{{filename}}';
|
||||
sut.onConfigUpdate({ oldConfig: defaults, newConfig: config });
|
||||
sut.onConfigInitOrUpdate({ newConfig: config });
|
||||
|
||||
userMock.get.mockResolvedValue(user);
|
||||
assetMock.getByIds.mockResolvedValueOnce([asset]);
|
||||
|
|
|
@ -74,8 +74,9 @@ export class StorageTemplateService extends BaseService {
|
|||
return this._template;
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'config.init' })
|
||||
@OnEvent({ name: 'config.update', server: true })
|
||||
onConfigUpdate({ newConfig }: ArgOf<'config.update'>) {
|
||||
onConfigInitOrUpdate({ newConfig }: ArgOf<'config.init'>) {
|
||||
const template = newConfig.storageTemplate.template;
|
||||
if (!this._template || template !== this.template.raw) {
|
||||
this.logger.debug(`Compiling new storage template: ${template}`);
|
||||
|
|
|
@ -4,17 +4,17 @@ import _ from 'lodash';
|
|||
import { defaults } from 'src/config';
|
||||
import { OnEvent } from 'src/decorators';
|
||||
import { SystemConfigDto, mapConfig } from 'src/dtos/system-config.dto';
|
||||
import { ArgOf } from 'src/interfaces/event.interface';
|
||||
import { ArgOf, BootstrapEventPriority } from 'src/interfaces/event.interface';
|
||||
import { BaseService } from 'src/services/base.service';
|
||||
import { clearConfigCache } from 'src/utils/config';
|
||||
import { toPlainObject } from 'src/utils/object';
|
||||
|
||||
@Injectable()
|
||||
export class SystemConfigService extends BaseService {
|
||||
@OnEvent({ name: 'app.bootstrap', priority: -100 })
|
||||
@OnEvent({ name: 'app.bootstrap', priority: BootstrapEventPriority.SystemConfig })
|
||||
async onBootstrap() {
|
||||
const config = await this.getConfig({ withCache: false });
|
||||
await this.eventRepository.emit('config.update', { newConfig: config });
|
||||
await this.eventRepository.emit('config.init', { newConfig: config });
|
||||
}
|
||||
|
||||
async getSystemConfig(): Promise<SystemConfigDto> {
|
||||
|
@ -26,14 +26,18 @@ export class SystemConfigService extends BaseService {
|
|||
return mapConfig(defaults);
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'config.update', server: true })
|
||||
onConfigUpdate({ newConfig: { logging } }: ArgOf<'config.update'>) {
|
||||
@OnEvent({ name: 'config.init' })
|
||||
onConfigInit({ newConfig: { logging } }: ArgOf<'config.init'>) {
|
||||
const { logLevel: envLevel } = this.configRepository.getEnv();
|
||||
const configLevel = logging.enabled ? logging.level : false;
|
||||
const level = envLevel ?? configLevel;
|
||||
this.logger.setLogLevel(level);
|
||||
this.logger.log(`LogLevel=${level} ${envLevel ? '(set via IMMICH_LOG_LEVEL)' : '(set via system config)'}`);
|
||||
// TODO only do this if the event is a socket.io event
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'config.update', server: true })
|
||||
onConfigUpdate({ newConfig }: ArgOf<'config.update'>) {
|
||||
this.onConfigInit({ newConfig });
|
||||
clearConfigCache();
|
||||
}
|
||||
|
||||
|
|
29
server/test/fixtures/system-config.stub.ts
vendored
29
server/test/fixtures/system-config.stub.ts
vendored
|
@ -54,6 +54,9 @@ export const systemConfigStub = {
|
|||
},
|
||||
libraryWatchEnabled: {
|
||||
library: {
|
||||
scan: {
|
||||
enabled: false,
|
||||
},
|
||||
watch: {
|
||||
enabled: true,
|
||||
},
|
||||
|
@ -61,6 +64,9 @@ export const systemConfigStub = {
|
|||
},
|
||||
libraryWatchDisabled: {
|
||||
library: {
|
||||
scan: {
|
||||
enabled: false,
|
||||
},
|
||||
watch: {
|
||||
enabled: false,
|
||||
},
|
||||
|
@ -72,6 +78,20 @@ export const systemConfigStub = {
|
|||
enabled: true,
|
||||
cronExpression: '0 0 * * *',
|
||||
},
|
||||
watch: {
|
||||
enabled: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
libraryScanAndWatch: {
|
||||
library: {
|
||||
scan: {
|
||||
enabled: true,
|
||||
cronExpression: '0 0 * * *',
|
||||
},
|
||||
watch: {
|
||||
enabled: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
backupEnabled: {
|
||||
|
@ -88,4 +108,13 @@ export const systemConfigStub = {
|
|||
enabled: false,
|
||||
},
|
||||
},
|
||||
machineLearningEnabled: {
|
||||
machineLearning: {
|
||||
enabled: true,
|
||||
clip: {
|
||||
modelName: 'ViT-B-16__openai',
|
||||
enabled: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
} satisfies Record<string, DeepPartial<SystemConfig>>;
|
||||
|
|
Loading…
Reference in a new issue