diff --git a/server/apps/microservices/src/microservices.module.ts b/server/apps/microservices/src/microservices.module.ts index cb2e38d1b9..2c890d481e 100644 --- a/server/apps/microservices/src/microservices.module.ts +++ b/server/apps/microservices/src/microservices.module.ts @@ -6,6 +6,7 @@ import { SmartInfoEntity } from '@app/database/entities/smart-info.entity'; import { UserEntity } from '@app/database/entities/user.entity'; import { assetUploadedQueueName, + generateChecksumQueueName, metadataExtractionQueueName, thumbnailGeneratorQueueName, videoConversionQueueName, @@ -17,6 +18,7 @@ import { TypeOrmModule } from '@nestjs/typeorm'; import { CommunicationModule } from '../../immich/src/api-v1/communication/communication.module'; import { MicroservicesService } from './microservices.service'; import { AssetUploadedProcessor } from './processors/asset-uploaded.processor'; +import { GenerateChecksumProcessor } from './processors/generate-checksum.processor'; import { MetadataExtractionProcessor } from './processors/metadata-extraction.processor'; import { ThumbnailGeneratorProcessor } from './processors/thumbnail.processor'; import { VideoTranscodeProcessor } from './processors/video-transcode.processor'; @@ -45,30 +47,34 @@ import { VideoTranscodeProcessor } from './processors/video-transcode.processor' removeOnComplete: true, removeOnFail: false, }, - }), - BullModule.registerQueue({ + }, { name: assetUploadedQueueName, defaultJobOptions: { attempts: 3, removeOnComplete: true, removeOnFail: false, }, - }), - BullModule.registerQueue({ + }, { name: metadataExtractionQueueName, defaultJobOptions: { attempts: 3, removeOnComplete: true, removeOnFail: false, }, - }), - BullModule.registerQueue({ + }, { name: videoConversionQueueName, defaultJobOptions: { attempts: 3, removeOnComplete: true, removeOnFail: false, }, + }, { + name: generateChecksumQueueName, + defaultJobOptions: { + attempts: 3, + removeOnComplete: true, + removeOnFail: false, + }, }), CommunicationModule, ], @@ -79,6 +85,7 @@ import { VideoTranscodeProcessor } from './processors/video-transcode.processor' ThumbnailGeneratorProcessor, MetadataExtractionProcessor, VideoTranscodeProcessor, + GenerateChecksumProcessor, ], exports: [], }) diff --git a/server/apps/microservices/src/microservices.service.ts b/server/apps/microservices/src/microservices.service.ts index e2a6ae0c22..39dd887e9d 100644 --- a/server/apps/microservices/src/microservices.service.ts +++ b/server/apps/microservices/src/microservices.service.ts @@ -1,8 +1,17 @@ -import { Injectable } from '@nestjs/common'; +import { generateChecksumQueueName } from '@app/job'; +import { InjectQueue } from '@nestjs/bull'; +import { Injectable, OnModuleInit } from '@nestjs/common'; +import { Queue } from 'bull'; +import { randomUUID } from 'node:crypto'; @Injectable() -export class MicroservicesService { - getHello(): string { - return 'Hello World 123!'; +export class MicroservicesService implements OnModuleInit { + constructor ( + @InjectQueue(generateChecksumQueueName) + private generateChecksumQueue: Queue, + ) {} + + async onModuleInit() { + await this.generateChecksumQueue.add({}, { jobId: randomUUID() },); } } diff --git a/server/apps/microservices/src/processors/generate-checksum.processor.ts b/server/apps/microservices/src/processors/generate-checksum.processor.ts new file mode 100644 index 0000000000..e7514de3c9 --- /dev/null +++ b/server/apps/microservices/src/processors/generate-checksum.processor.ts @@ -0,0 +1,69 @@ +import { AssetEntity } from '@app/database/entities/asset.entity'; +import { generateChecksumQueueName } from '@app/job'; +import { Process, Processor } from '@nestjs/bull'; +import { Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { createHash } from 'node:crypto'; +import fs from 'node:fs'; +import { IsNull, Repository } from 'typeorm'; + +// TODO: just temporary task to generate previous uploaded assets. +@Processor(generateChecksumQueueName) +export class GenerateChecksumProcessor { + constructor( + @InjectRepository(AssetEntity) + private assetRepository: Repository, + ) {} + + @Process() + async generateChecksum() { + let hasNext = true; + let pageSize = 200; + let offset = 0; + + while (hasNext) { + const assets = await this.assetRepository.find({ + where: { + checksum: IsNull() + }, + skip: offset, + take: pageSize, + }); + + if (!assets?.length) { + hasNext = false; // avoid using break + } else { + for (const asset of assets) { + try { + await this.generateAssetChecksum(asset); + } catch (err: any) { + Logger.error(`Error generate checksum ${err}`); + } + } + + if (assets.length < pageSize) { + hasNext = false; + } else { + offset += pageSize; + } + } + } + } + + private async generateAssetChecksum(asset: AssetEntity) { + if (!asset.originalPath) return; + if (!fs.existsSync(asset.originalPath)) return; + + const fileReadStream = fs.createReadStream(asset.originalPath); + const sha1Hash = createHash('sha1'); + const deferred = new Promise((resolve, reject) => { + sha1Hash.once('error', (err) => reject(err)); + sha1Hash.once('finish', () => resolve(sha1Hash.read())); + }); + + fileReadStream.pipe(sha1Hash); + const checksum = await deferred; + + await this.assetRepository.update(asset.id, { checksum }); + } +} diff --git a/server/libs/job/src/constants/queue-name.constant.ts b/server/libs/job/src/constants/queue-name.constant.ts index 504e9fca81..15b7d7a6cc 100644 --- a/server/libs/job/src/constants/queue-name.constant.ts +++ b/server/libs/job/src/constants/queue-name.constant.ts @@ -2,3 +2,4 @@ export const thumbnailGeneratorQueueName = 'thumbnail-generator-queue'; export const assetUploadedQueueName = 'asset-uploaded-queue'; export const metadataExtractionQueueName = 'metadata-extraction-queue'; export const videoConversionQueueName = 'video-conversion-queue'; +export const generateChecksumQueueName = 'generate-checksum-queue';