0
Fork 0
mirror of https://github.com/immich-app/immich.git synced 2025-01-07 00:50:23 -05:00

refactor(server): redis config (#13538)

* refactor(server): redis config

* refactor: cache parsed env data

* chore: add database and redis tests
This commit is contained in:
Jason Rasmussen 2024-10-17 10:50:54 -04:00 committed by GitHub
parent 79acbc1d7b
commit 3f663106e8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 318 additions and 173 deletions

View file

@ -7,7 +7,7 @@ import { TypeOrmModule } from '@nestjs/typeorm';
import { ClsModule } from 'nestjs-cls';
import { OpenTelemetryModule } from 'nestjs-otel';
import { commands } from 'src/commands';
import { bullConfig, bullQueues, clsConfig, immichAppConfig } from 'src/config';
import { clsConfig, immichAppConfig } from 'src/config';
import { controllers } from 'src/controllers';
import { databaseConfig } from 'src/database.config';
import { entities } from 'src/entities';
@ -20,6 +20,7 @@ import { FileUploadInterceptor } from 'src/middleware/file-upload.interceptor';
import { GlobalExceptionFilter } from 'src/middleware/global-exception.filter';
import { LoggingInterceptor } from 'src/middleware/logging.interceptor';
import { repositories } from 'src/repositories';
import { ConfigRepository } from 'src/repositories/config.repository';
import { services } from 'src/services';
import { DatabaseService } from 'src/services/database.service';
import { otelConfig } from 'src/utils/instrumentation';
@ -35,9 +36,12 @@ const middleware = [
{ provide: APP_GUARD, useClass: AuthGuard },
];
const configRepository = new ConfigRepository();
const { bull } = configRepository.getEnv();
const imports = [
BullModule.forRoot(bullConfig),
BullModule.registerQueue(...bullQueues),
BullModule.forRoot(bull.config),
BullModule.registerQueue(...bull.queues),
ClsModule.forRoot(clsConfig),
ConfigModule.forRoot(immichAppConfig),
OpenTelemetryModule.forRoot(otelConfig),

View file

@ -1,9 +1,6 @@
import { RegisterQueueOptions } from '@nestjs/bullmq';
import { ConfigModuleOptions } from '@nestjs/config';
import { CronExpression } from '@nestjs/schedule';
import { QueueOptions } from 'bullmq';
import { Request, Response } from 'express';
import { RedisOptions } from 'ioredis';
import Joi, { Root } from 'joi';
import { CLS_ID, ClsModuleOptions } from 'nestjs-cls';
import { ImmichHeader } from 'src/dtos/auth.dto';
@ -363,38 +360,6 @@ export const immichAppConfig: ConfigModuleOptions = {
}),
};
export function parseRedisConfig(): RedisOptions {
const redisUrl = process.env.REDIS_URL;
if (redisUrl && redisUrl.startsWith('ioredis://')) {
try {
const decodedString = Buffer.from(redisUrl.slice(10), 'base64').toString();
return JSON.parse(decodedString);
} catch (error) {
throw new Error(`Failed to decode redis options: ${error}`);
}
}
return {
host: process.env.REDIS_HOSTNAME || 'redis',
port: Number.parseInt(process.env.REDIS_PORT || '6379'),
db: Number.parseInt(process.env.REDIS_DBINDEX || '0'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
path: process.env.REDIS_SOCKET || undefined,
};
}
export const bullConfig: QueueOptions = {
prefix: 'immich_bull',
connection: parseRedisConfig(),
defaultJobOptions: {
attempts: 3,
removeOnComplete: true,
removeOnFail: false,
},
};
export const bullQueues: RegisterQueueOptions[] = Object.values(QueueName).map((name) => ({ name }));
export const clsConfig: ClsModuleOptions = {
middleware: {
mount: true,

View file

@ -1,3 +1,6 @@
import { RegisterQueueOptions } from '@nestjs/bullmq';
import { QueueOptions } from 'bullmq';
import { RedisOptions } from 'ioredis';
import { ImmichEnvironment, ImmichWorker, LogLevel } from 'src/enum';
import { VectorExtension } from 'src/interfaces/database.interface';
@ -57,6 +60,13 @@ export interface EnvData {
};
};
redis: RedisOptions;
bull: {
config: QueueOptions;
queues: RegisterQueueOptions[];
};
storage: {
ignoreMountCheckErrors: boolean;
};

View file

@ -3,7 +3,7 @@ import { IoAdapter } from '@nestjs/platform-socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { Redis } from 'ioredis';
import { ServerOptions } from 'socket.io';
import { parseRedisConfig } from 'src/config';
import { IConfigRepository } from 'src/interfaces/config.interface';
export class WebSocketAdapter extends IoAdapter {
constructor(private app: INestApplicationContext) {
@ -11,8 +11,9 @@ export class WebSocketAdapter extends IoAdapter {
}
createIOServer(port: number, options?: ServerOptions): any {
const { redis } = this.app.get<IConfigRepository>(IConfigRepository).getEnv();
const server = super.createIOServer(port, options);
const pubClient = new Redis(parseRedisConfig());
const pubClient = new Redis(redis);
const subClient = pubClient.duplicate();
server.adapter(createAdapter(pubClient, subClient));
return server;

View file

@ -1,76 +1,181 @@
import { ConfigRepository } from 'src/repositories/config.repository';
import { clearEnvCache, ConfigRepository } from 'src/repositories/config.repository';
const getEnv = () => new ConfigRepository().getEnv();
const getEnv = () => {
clearEnvCache();
return new ConfigRepository().getEnv();
};
const resetEnv = () => {
for (const env of [
'IMMICH_WORKERS_INCLUDE',
'IMMICH_WORKERS_EXCLUDE',
'DB_URL',
'DB_HOSTNAME',
'DB_PORT',
'DB_USERNAME',
'DB_PASSWORD',
'DB_DATABASE_NAME',
'DB_SKIP_MIGRATIONS',
'DB_VECTOR_EXTENSION',
'REDIS_HOSTNAME',
'REDIS_PORT',
'REDIS_DBINDEX',
'REDIS_USERNAME',
'REDIS_PASSWORD',
'REDIS_SOCKET',
'REDIS_URL',
'NO_COLOR',
]) {
delete process.env[env];
}
};
const sentinelConfig = {
sentinels: [
{
host: 'redis-sentinel-node-0',
port: 26_379,
},
{
host: 'redis-sentinel-node-1',
port: 26_379,
},
{
host: 'redis-sentinel-node-2',
port: 26_379,
},
],
name: 'redis-sentinel',
};
describe('getEnv', () => {
beforeEach(() => {
delete process.env.IMMICH_WORKERS_INCLUDE;
delete process.env.IMMICH_WORKERS_EXCLUDE;
delete process.env.NO_COLOR;
resetEnv();
});
it('should return default workers', () => {
const { workers } = getEnv();
expect(workers).toEqual(['api', 'microservices']);
describe('database', () => {
it('should use defaults', () => {
const { database } = getEnv();
expect(database).toEqual({
url: undefined,
host: 'database',
port: 5432,
name: 'immich',
username: 'postgres',
password: 'postgres',
skipMigrations: false,
vectorExtension: 'vectors',
});
});
it('should allow skipping migrations', () => {
process.env.DB_SKIP_MIGRATIONS = 'true';
const { database } = getEnv();
expect(database).toMatchObject({ skipMigrations: true });
});
});
it('should return included workers', () => {
process.env.IMMICH_WORKERS_INCLUDE = 'api';
const { workers } = getEnv();
expect(workers).toEqual(['api']);
describe('redis', () => {
it('should use defaults', () => {
const { redis } = getEnv();
expect(redis).toEqual({
host: 'redis',
port: 6379,
db: 0,
username: undefined,
password: undefined,
path: undefined,
});
});
it('should parse base64 encoded config, ignore other env', () => {
process.env.REDIS_URL = `ioredis://${Buffer.from(JSON.stringify(sentinelConfig)).toString('base64')}`;
process.env.REDIS_HOSTNAME = 'redis-host';
process.env.REDIS_USERNAME = 'redis-user';
process.env.REDIS_PASSWORD = 'redis-password';
const { redis } = getEnv();
expect(redis).toEqual(sentinelConfig);
});
it('should reject invalid json', () => {
process.env.REDIS_URL = `ioredis://${Buffer.from('{ "invalid json"').toString('base64')}`;
expect(() => getEnv()).toThrowError('Failed to decode redis options');
});
});
it('should excluded workers from defaults', () => {
process.env.IMMICH_WORKERS_EXCLUDE = 'api';
const { workers } = getEnv();
expect(workers).toEqual(['microservices']);
describe('noColor', () => {
beforeEach(() => {
delete process.env.NO_COLOR;
});
it('should default noColor to false', () => {
const { noColor } = getEnv();
expect(noColor).toBe(false);
});
it('should map NO_COLOR=1 to true', () => {
process.env.NO_COLOR = '1';
const { noColor } = getEnv();
expect(noColor).toBe(true);
});
it('should map NO_COLOR=true to true', () => {
process.env.NO_COLOR = 'true';
const { noColor } = getEnv();
expect(noColor).toBe(true);
});
});
it('should exclude workers from include list', () => {
process.env.IMMICH_WORKERS_INCLUDE = 'api,microservices,randomservice';
process.env.IMMICH_WORKERS_EXCLUDE = 'randomservice,microservices';
const { workers } = getEnv();
expect(workers).toEqual(['api']);
});
describe('workers', () => {
it('should return default workers', () => {
const { workers } = getEnv();
expect(workers).toEqual(['api', 'microservices']);
});
it('should remove whitespace from included workers before parsing', () => {
process.env.IMMICH_WORKERS_INCLUDE = 'api, microservices';
const { workers } = getEnv();
expect(workers).toEqual(['api', 'microservices']);
});
it('should return included workers', () => {
process.env.IMMICH_WORKERS_INCLUDE = 'api';
const { workers } = getEnv();
expect(workers).toEqual(['api']);
});
it('should remove whitespace from excluded workers before parsing', () => {
process.env.IMMICH_WORKERS_EXCLUDE = 'api, microservices';
const { workers } = getEnv();
expect(workers).toEqual([]);
});
it('should excluded workers from defaults', () => {
process.env.IMMICH_WORKERS_EXCLUDE = 'api';
const { workers } = getEnv();
expect(workers).toEqual(['microservices']);
});
it('should remove whitespace from included and excluded workers before parsing', () => {
process.env.IMMICH_WORKERS_INCLUDE = 'api, microservices, randomservice,randomservice2';
process.env.IMMICH_WORKERS_EXCLUDE = 'randomservice,microservices, randomservice2';
const { workers } = getEnv();
expect(workers).toEqual(['api']);
});
it('should exclude workers from include list', () => {
process.env.IMMICH_WORKERS_INCLUDE = 'api,microservices,randomservice';
process.env.IMMICH_WORKERS_EXCLUDE = 'randomservice,microservices';
const { workers } = getEnv();
expect(workers).toEqual(['api']);
});
it('should throw error for invalid workers', () => {
process.env.IMMICH_WORKERS_INCLUDE = 'api,microservices,randomservice';
expect(getEnv).toThrowError('Invalid worker(s) found: api,microservices,randomservice');
});
it('should remove whitespace from included workers before parsing', () => {
process.env.IMMICH_WORKERS_INCLUDE = 'api, microservices';
const { workers } = getEnv();
expect(workers).toEqual(['api', 'microservices']);
});
it('should default noColor to false', () => {
const { noColor } = getEnv();
expect(noColor).toBe(false);
});
it('should remove whitespace from excluded workers before parsing', () => {
process.env.IMMICH_WORKERS_EXCLUDE = 'api, microservices';
const { workers } = getEnv();
expect(workers).toEqual([]);
});
it('should map NO_COLOR=1 to true', () => {
process.env.NO_COLOR = '1';
const { noColor } = getEnv();
expect(noColor).toBe(true);
});
it('should remove whitespace from included and excluded workers before parsing', () => {
process.env.IMMICH_WORKERS_INCLUDE = 'api, microservices, randomservice,randomservice2';
process.env.IMMICH_WORKERS_EXCLUDE = 'randomservice,microservices, randomservice2';
const { workers } = getEnv();
expect(workers).toEqual(['api']);
});
it('should map NO_COLOR=true to true', () => {
process.env.NO_COLOR = 'true';
const { noColor } = getEnv();
expect(noColor).toBe(true);
it('should throw error for invalid workers', () => {
process.env.IMMICH_WORKERS_INCLUDE = 'api,microservices,randomservice';
expect(getEnv).toThrowError('Invalid worker(s) found: api,microservices,randomservice');
});
});
});

View file

@ -4,6 +4,7 @@ import { citiesFile } from 'src/constants';
import { ImmichEnvironment, ImmichWorker, LogLevel } from 'src/enum';
import { EnvData, IConfigRepository } from 'src/interfaces/config.interface';
import { DatabaseExtension } from 'src/interfaces/database.interface';
import { QueueName } from 'src/interfaces/job.interface';
import { setDifference } from 'src/utils/set';
// TODO replace src/config validation with class-validator, here
@ -29,86 +30,131 @@ const asSet = (value: string | undefined, defaults: ImmichWorker[]) => {
return new Set(values.length === 0 ? defaults : (values as ImmichWorker[]));
};
const getEnv = (): EnvData => {
const included = asSet(process.env.IMMICH_WORKERS_INCLUDE, [ImmichWorker.API, ImmichWorker.MICROSERVICES]);
const excluded = asSet(process.env.IMMICH_WORKERS_EXCLUDE, []);
const workers = [...setDifference(included, excluded)];
for (const worker of workers) {
if (!WORKER_TYPES.has(worker)) {
throw new Error(`Invalid worker(s) found: ${workers.join(',')}`);
}
}
const environment = process.env.IMMICH_ENV as ImmichEnvironment;
const isProd = environment === ImmichEnvironment.PRODUCTION;
const buildFolder = process.env.IMMICH_BUILD_DATA || '/build';
const folders = {
geodata: join(buildFolder, 'geodata'),
web: join(buildFolder, 'www'),
};
let redisConfig = {
host: process.env.REDIS_HOSTNAME || 'redis',
port: Number.parseInt(process.env.REDIS_PORT || '') || 6379,
db: Number.parseInt(process.env.REDIS_DBINDEX || '') || 0,
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
path: process.env.REDIS_SOCKET || undefined,
};
const redisUrl = process.env.REDIS_URL;
if (redisUrl && redisUrl.startsWith('ioredis://')) {
try {
redisConfig = JSON.parse(Buffer.from(redisUrl.slice(10), 'base64').toString());
} catch (error) {
throw new Error(`Failed to decode redis options: ${error}`);
}
}
return {
host: process.env.IMMICH_HOST,
port: Number(process.env.IMMICH_PORT) || 2283,
environment,
configFile: process.env.IMMICH_CONFIG_FILE,
logLevel: process.env.IMMICH_LOG_LEVEL as LogLevel,
buildMetadata: {
build: process.env.IMMICH_BUILD,
buildUrl: process.env.IMMICH_BUILD_URL,
buildImage: process.env.IMMICH_BUILD_IMAGE,
buildImageUrl: process.env.IMMICH_BUILD_IMAGE_URL,
repository: process.env.IMMICH_REPOSITORY,
repositoryUrl: process.env.IMMICH_REPOSITORY_URL,
sourceRef: process.env.IMMICH_SOURCE_REF,
sourceCommit: process.env.IMMICH_SOURCE_COMMIT,
sourceUrl: process.env.IMMICH_SOURCE_URL,
thirdPartySourceUrl: process.env.IMMICH_THIRD_PARTY_SOURCE_URL,
thirdPartyBugFeatureUrl: process.env.IMMICH_THIRD_PARTY_BUG_FEATURE_URL,
thirdPartyDocumentationUrl: process.env.IMMICH_THIRD_PARTY_DOCUMENTATION_URL,
thirdPartySupportUrl: process.env.IMMICH_THIRD_PARTY_SUPPORT_URL,
},
bull: {
config: {
prefix: 'immich_bull',
connection: { ...redisConfig },
defaultJobOptions: {
attempts: 3,
removeOnComplete: true,
removeOnFail: false,
},
},
queues: Object.values(QueueName).map((name) => ({ name })),
},
database: {
url: process.env.DB_URL,
host: process.env.DB_HOSTNAME || 'database',
port: Number(process.env.DB_PORT) || 5432,
username: process.env.DB_USERNAME || 'postgres',
password: process.env.DB_PASSWORD || 'postgres',
name: process.env.DB_DATABASE_NAME || 'immich',
skipMigrations: process.env.DB_SKIP_MIGRATIONS === 'true',
vectorExtension:
process.env.DB_VECTOR_EXTENSION === 'pgvector' ? DatabaseExtension.VECTOR : DatabaseExtension.VECTORS,
},
licensePublicKey: isProd ? productionKeys : stagingKeys,
redis: redisConfig,
resourcePaths: {
lockFile: join(buildFolder, 'build-lock.json'),
geodata: {
dateFile: join(folders.geodata, 'geodata-date.txt'),
admin1: join(folders.geodata, 'admin1CodesASCII.txt'),
admin2: join(folders.geodata, 'admin2Codes.txt'),
cities500: join(folders.geodata, citiesFile),
naturalEarthCountriesPath: join(folders.geodata, 'ne_10m_admin_0_countries.geojson'),
},
web: {
root: folders.web,
indexHtml: join(folders.web, 'index.html'),
},
},
storage: {
ignoreMountCheckErrors: process.env.IMMICH_IGNORE_MOUNT_CHECK_ERRORS === 'true',
},
workers,
noColor: !!process.env.NO_COLOR,
};
};
let cached: EnvData | undefined;
@Injectable()
export class ConfigRepository implements IConfigRepository {
getEnv(): EnvData {
const included = asSet(process.env.IMMICH_WORKERS_INCLUDE, [ImmichWorker.API, ImmichWorker.MICROSERVICES]);
const excluded = asSet(process.env.IMMICH_WORKERS_EXCLUDE, []);
const workers = [...setDifference(included, excluded)];
for (const worker of workers) {
if (!WORKER_TYPES.has(worker)) {
throw new Error(`Invalid worker(s) found: ${workers.join(',')}`);
}
if (!cached) {
cached = getEnv();
}
const environment = process.env.IMMICH_ENV as ImmichEnvironment;
const isProd = environment === ImmichEnvironment.PRODUCTION;
const buildFolder = process.env.IMMICH_BUILD_DATA || '/build';
const folders = {
geodata: join(buildFolder, 'geodata'),
web: join(buildFolder, 'www'),
};
return {
host: process.env.IMMICH_HOST,
port: Number(process.env.IMMICH_PORT) || 2283,
environment,
configFile: process.env.IMMICH_CONFIG_FILE,
logLevel: process.env.IMMICH_LOG_LEVEL as LogLevel,
buildMetadata: {
build: process.env.IMMICH_BUILD,
buildUrl: process.env.IMMICH_BUILD_URL,
buildImage: process.env.IMMICH_BUILD_IMAGE,
buildImageUrl: process.env.IMMICH_BUILD_IMAGE_URL,
repository: process.env.IMMICH_REPOSITORY,
repositoryUrl: process.env.IMMICH_REPOSITORY_URL,
sourceRef: process.env.IMMICH_SOURCE_REF,
sourceCommit: process.env.IMMICH_SOURCE_COMMIT,
sourceUrl: process.env.IMMICH_SOURCE_URL,
thirdPartySourceUrl: process.env.IMMICH_THIRD_PARTY_SOURCE_URL,
thirdPartyBugFeatureUrl: process.env.IMMICH_THIRD_PARTY_BUG_FEATURE_URL,
thirdPartyDocumentationUrl: process.env.IMMICH_THIRD_PARTY_DOCUMENTATION_URL,
thirdPartySupportUrl: process.env.IMMICH_THIRD_PARTY_SUPPORT_URL,
},
database: {
url: process.env.DB_URL,
host: process.env.DB_HOSTNAME || 'database',
port: Number(process.env.DB_PORT) || 5432,
username: process.env.DB_USERNAME || 'postgres',
password: process.env.DB_PASSWORD || 'postgres',
name: process.env.DB_DATABASE_NAME || 'immich',
skipMigrations: process.env.DB_SKIP_MIGRATIONS === 'true',
vectorExtension:
process.env.DB_VECTOR_EXTENSION === 'pgvector' ? DatabaseExtension.VECTOR : DatabaseExtension.VECTORS,
},
licensePublicKey: isProd ? productionKeys : stagingKeys,
resourcePaths: {
lockFile: join(buildFolder, 'build-lock.json'),
geodata: {
dateFile: join(folders.geodata, 'geodata-date.txt'),
admin1: join(folders.geodata, 'admin1CodesASCII.txt'),
admin2: join(folders.geodata, 'admin2Codes.txt'),
cities500: join(folders.geodata, citiesFile),
naturalEarthCountriesPath: join(folders.geodata, 'ne_10m_admin_0_countries.geojson'),
},
web: {
root: folders.web,
indexHtml: join(folders.web, 'index.html'),
},
},
storage: {
ignoreMountCheckErrors: process.env.IMMICH_IGNORE_MOUNT_CHECK_ERRORS === 'true',
},
workers,
noColor: !!process.env.NO_COLOR,
};
return cached;
}
}
export const clearEnvCache = () => (cached = undefined);

View file

@ -5,7 +5,7 @@ import { SchedulerRegistry } from '@nestjs/schedule';
import { Job, JobsOptions, Processor, Queue, Worker, WorkerOptions } from 'bullmq';
import { CronJob, CronTime } from 'cron';
import { setTimeout } from 'node:timers/promises';
import { bullConfig } from 'src/config';
import { IConfigRepository } from 'src/interfaces/config.interface';
import {
IJobRepository,
JobCounts,
@ -106,14 +106,16 @@ export class JobRepository implements IJobRepository {
constructor(
private moduleReference: ModuleRef,
private schedulerReqistry: SchedulerRegistry,
@Inject(IConfigRepository) private configRepository: IConfigRepository,
@Inject(ILoggerRepository) private logger: ILoggerRepository,
) {
this.logger.setContext(JobRepository.name);
}
addHandler(queueName: QueueName, concurrency: number, handler: (item: JobItem) => Promise<void>) {
const { bull } = this.configRepository.getEnv();
const workerHandler: Processor = async (job: Job) => handler(job as JobItem);
const workerOptions: WorkerOptions = { ...bullConfig, concurrency };
const workerOptions: WorkerOptions = { ...bull.config, concurrency };
this.workers[queueName] = new Worker(queueName, workerHandler, workerOptions);
}

View file

@ -8,6 +8,12 @@ const envData: EnvData = {
environment: ImmichEnvironment.PRODUCTION,
buildMetadata: {},
bull: {
config: {
prefix: 'immich_bull',
},
queues: [{ name: 'queue-1' }],
},
database: {
host: 'database',
@ -25,6 +31,12 @@ const envData: EnvData = {
server: 'server-public-key',
},
redis: {
host: 'redis',
port: 6379,
db: 0,
},
resourcePaths: {
lockFile: 'build-lock.json',
geodata: {