mirror of
https://github.com/immich-app/immich.git
synced 2025-04-15 03:11:28 -05:00
refactor: stream asset ids for library queue jobs (#16666)
This commit is contained in:
parent
5c82c485d7
commit
d45fa491ce
6 changed files with 65 additions and 66 deletions
|
@ -431,17 +431,6 @@ export class AssetRepository {
|
|||
return paginationHelper(items as any as AssetEntity[], pagination.take);
|
||||
}
|
||||
|
||||
async getAllInLibrary(pagination: PaginationOptions, libraryId: string): Paginated<AssetEntity> {
|
||||
const builder = this.db
|
||||
.selectFrom('assets')
|
||||
.select('id')
|
||||
.where('libraryId', '=', asUuid(libraryId))
|
||||
.limit(pagination.take + 1)
|
||||
.offset(pagination.skip ?? 0);
|
||||
const items = await builder.execute();
|
||||
return paginationHelper(items as any as AssetEntity[], pagination.take);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get assets by device's Id on the database
|
||||
* @param ownerId
|
||||
|
|
|
@ -144,4 +144,8 @@ export class LibraryRepository {
|
|||
total: Number(stats.photos) + Number(stats.videos),
|
||||
};
|
||||
}
|
||||
|
||||
streamAssetIds(libraryId: string) {
|
||||
return this.db.selectFrom('assets').select(['id']).where('libraryId', '=', libraryId).stream();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,7 +13,7 @@ import { libraryStub } from 'test/fixtures/library.stub';
|
|||
import { systemConfigStub } from 'test/fixtures/system-config.stub';
|
||||
import { userStub } from 'test/fixtures/user.stub';
|
||||
import { makeMockWatcher } from 'test/repositories/storage.repository.mock';
|
||||
import { newTestService, ServiceMocks } from 'test/utils';
|
||||
import { makeStream, newTestService, ServiceMocks } from 'test/utils';
|
||||
import { vitest } from 'vitest';
|
||||
|
||||
async function* mockWalk() {
|
||||
|
@ -287,10 +287,10 @@ describe(LibraryService.name, () => {
|
|||
it('should queue asset sync', async () => {
|
||||
mocks.library.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1);
|
||||
mocks.storage.walk.mockImplementation(async function* generator() {});
|
||||
mocks.asset.getAll.mockResolvedValue({ items: [assetStub.external], hasNextPage: false });
|
||||
mocks.library.streamAssetIds.mockReturnValue(makeStream([assetStub.external]));
|
||||
mocks.asset.getLibraryAssetCount.mockResolvedValue(1);
|
||||
mocks.asset.detectOfflineExternalAssets.mockResolvedValue({ numUpdatedRows: BigInt(0) });
|
||||
mocks.asset.getAllInLibrary.mockResolvedValue({ items: [assetStub.external], hasNextPage: false });
|
||||
mocks.library.streamAssetIds.mockReturnValue(makeStream([assetStub.external]));
|
||||
|
||||
const response = await sut.handleQueueSyncAssets({ id: libraryStub.externalLibraryWithImportPaths1.id });
|
||||
|
||||
|
@ -1039,7 +1039,7 @@ describe(LibraryService.name, () => {
|
|||
describe('handleDeleteLibrary', () => {
|
||||
it('should delete an empty library', async () => {
|
||||
mocks.library.get.mockResolvedValue(libraryStub.externalLibrary1);
|
||||
mocks.asset.getAll.mockResolvedValue({ items: [], hasNextPage: false });
|
||||
mocks.library.streamAssetIds.mockReturnValue(makeStream([]));
|
||||
|
||||
await expect(sut.handleDeleteLibrary({ id: libraryStub.externalLibrary1.id })).resolves.toBe(JobStatus.SUCCESS);
|
||||
expect(mocks.library.delete).toHaveBeenCalled();
|
||||
|
@ -1047,7 +1047,7 @@ describe(LibraryService.name, () => {
|
|||
|
||||
it('should delete all assets in a library', async () => {
|
||||
mocks.library.get.mockResolvedValue(libraryStub.externalLibrary1);
|
||||
mocks.asset.getAll.mockResolvedValue({ items: [assetStub.image1], hasNextPage: false });
|
||||
mocks.library.streamAssetIds.mockReturnValue(makeStream([assetStub.image1]));
|
||||
|
||||
mocks.asset.getById.mockResolvedValue(assetStub.image1);
|
||||
|
||||
|
|
|
@ -24,7 +24,6 @@ import { BaseService } from 'src/services/base.service';
|
|||
import { JobOf } from 'src/types';
|
||||
import { mimeTypes } from 'src/utils/mime-types';
|
||||
import { handlePromiseError } from 'src/utils/misc';
|
||||
import { usePagination } from 'src/utils/pagination';
|
||||
|
||||
@Injectable()
|
||||
export class LibraryService extends BaseService {
|
||||
|
@ -341,34 +340,37 @@ export class LibraryService extends BaseService {
|
|||
|
||||
await this.assetRepository.updateByLibraryId(libraryId, { deletedAt: new Date() });
|
||||
|
||||
const assetPagination = usePagination(JOBS_LIBRARY_PAGINATION_SIZE, (pagination) =>
|
||||
this.assetRepository.getAll(pagination, { libraryId, withDeleted: true }),
|
||||
);
|
||||
|
||||
let assetsFound = false;
|
||||
let chunk: string[] = [];
|
||||
|
||||
const queueChunk = async () => {
|
||||
if (chunk.length > 0) {
|
||||
assetsFound = true;
|
||||
this.logger.debug(`Queueing deletion of ${chunk.length} asset(s) in library ${libraryId}`);
|
||||
await this.jobRepository.queueAll(
|
||||
chunk.map((id) => ({ name: JobName.ASSET_DELETION, data: { id, deleteOnDisk: false } })),
|
||||
);
|
||||
chunk = [];
|
||||
}
|
||||
};
|
||||
|
||||
this.logger.debug(`Will delete all assets in library ${libraryId}`);
|
||||
for await (const assets of assetPagination) {
|
||||
if (assets.length > 0) {
|
||||
assetsFound = true;
|
||||
}
|
||||
const assets = this.libraryRepository.streamAssetIds(libraryId);
|
||||
for await (const asset of assets) {
|
||||
chunk.push(asset.id);
|
||||
|
||||
this.logger.debug(`Queueing deletion of ${assets.length} asset(s) in library ${libraryId}`);
|
||||
await this.jobRepository.queueAll(
|
||||
assets.map((asset) => ({
|
||||
name: JobName.ASSET_DELETION,
|
||||
data: {
|
||||
id: asset.id,
|
||||
deleteOnDisk: false,
|
||||
},
|
||||
})),
|
||||
);
|
||||
if (chunk.length >= 10_000) {
|
||||
await queueChunk();
|
||||
}
|
||||
}
|
||||
|
||||
await queueChunk();
|
||||
|
||||
if (!assetsFound) {
|
||||
this.logger.log(`Deleting library ${libraryId}`);
|
||||
await this.libraryRepository.delete(libraryId);
|
||||
}
|
||||
|
||||
return JobStatus.SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -676,7 +678,6 @@ export class LibraryService extends BaseService {
|
|||
}
|
||||
|
||||
const assetCount = await this.assetRepository.getLibraryAssetCount(job.id);
|
||||
|
||||
if (!assetCount) {
|
||||
this.logger.log(`Library ${library.id} is empty, no need to check assets`);
|
||||
return JobStatus.SUCCESS;
|
||||
|
@ -702,42 +703,47 @@ export class LibraryService extends BaseService {
|
|||
return JobStatus.SUCCESS;
|
||||
}
|
||||
|
||||
this.logger.log(`Scanning library ${library.id} for assets missing from disk...`);
|
||||
let chunk: string[] = [];
|
||||
let count = 0;
|
||||
|
||||
const existingAssets = usePagination(JOBS_LIBRARY_PAGINATION_SIZE, (pagination) =>
|
||||
this.assetRepository.getAllInLibrary(pagination, job.id),
|
||||
);
|
||||
const queueChunk = async () => {
|
||||
if (chunk.length > 0) {
|
||||
count += chunk.length;
|
||||
|
||||
let currentAssetCount = 0;
|
||||
for await (const assets of existingAssets) {
|
||||
if (assets.length === 0) {
|
||||
throw new BadRequestException(`Failed to get assets for library ${job.id}`);
|
||||
await this.jobRepository.queue({
|
||||
name: JobName.LIBRARY_SYNC_ASSETS,
|
||||
data: {
|
||||
libraryId: library.id,
|
||||
importPaths: library.importPaths,
|
||||
exclusionPatterns: library.exclusionPatterns,
|
||||
assetIds: chunk.map((id) => id),
|
||||
progressCounter: count,
|
||||
totalAssets: assetCount,
|
||||
},
|
||||
});
|
||||
chunk = [];
|
||||
|
||||
const completePercentage = ((100 * count) / assetCount).toFixed(1);
|
||||
|
||||
this.logger.log(
|
||||
`Queued check of ${count} of ${assetCount} (${completePercentage} %) existing asset(s) so far in library ${library.id}`,
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
currentAssetCount += assets.length;
|
||||
this.logger.log(`Scanning library ${library.id} for assets missing from disk...`);
|
||||
const existingAssets = this.libraryRepository.streamAssetIds(library.id);
|
||||
|
||||
await this.jobRepository.queue({
|
||||
name: JobName.LIBRARY_SYNC_ASSETS,
|
||||
data: {
|
||||
libraryId: library.id,
|
||||
importPaths: library.importPaths,
|
||||
exclusionPatterns: library.exclusionPatterns,
|
||||
assetIds: assets.map(({ id }) => id),
|
||||
progressCounter: currentAssetCount,
|
||||
totalAssets: assetCount,
|
||||
},
|
||||
});
|
||||
|
||||
const completePercentage = ((100 * currentAssetCount) / assetCount).toFixed(1);
|
||||
|
||||
this.logger.log(
|
||||
`Queued check of ${currentAssetCount} of ${assetCount} (${completePercentage} %) existing asset(s) so far in library ${library.id}`,
|
||||
);
|
||||
for await (const asset of existingAssets) {
|
||||
chunk.push(asset.id);
|
||||
if (chunk.length === 10_000) {
|
||||
await queueChunk();
|
||||
}
|
||||
}
|
||||
|
||||
if (currentAssetCount) {
|
||||
this.logger.log(`Finished queuing ${currentAssetCount} asset check(s) for library ${library.id}`);
|
||||
}
|
||||
await queueChunk();
|
||||
|
||||
this.logger.log(`Finished queuing ${count} asset check(s) for library ${library.id}`);
|
||||
|
||||
return JobStatus.SUCCESS;
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ export const newAssetRepositoryMock = (): Mocked<RepositoryInterface<AssetReposi
|
|||
getAll: vitest.fn().mockResolvedValue({ items: [], hasNextPage: false }),
|
||||
getAllByDeviceId: vitest.fn(),
|
||||
getLivePhotoCount: vitest.fn(),
|
||||
getAllInLibrary: vitest.fn(),
|
||||
getLibraryAssetCount: vitest.fn(),
|
||||
updateAll: vitest.fn(),
|
||||
updateDuplicates: vitest.fn(),
|
||||
|
|
|
@ -12,5 +12,6 @@ export const newLibraryRepositoryMock = (): Mocked<RepositoryInterface<LibraryRe
|
|||
getStatistics: vitest.fn(),
|
||||
getAllDeleted: vitest.fn(),
|
||||
getAll: vitest.fn(),
|
||||
streamAssetIds: vitest.fn(),
|
||||
};
|
||||
};
|
||||
|
|
Loading…
Add table
Reference in a new issue