mirror of
https://github.com/immich-app/immich.git
synced 2025-01-21 00:52:43 -05:00
eb89208abb
* feat(cli): use a queue for duplicate and upload Using a queue to process the files makes the file duplicate detection and asset upload more stable and tolerant of network errors. If an error occurs, the whole command will not stop; the task will be retried (3 times) before logging the error and moving to the next step. The new queue abstraction is using [fastq](https://www.npmjs.com/package/fastq) internally. * chore(cli): queue.push return promise which resolve with task * test(cli): add spec for uploadFiles and checkForDuplicates
131 lines
3.8 KiB
TypeScript
131 lines
3.8 KiB
TypeScript
import * as fastq from 'fastq';
|
|
import { uniqueId } from 'lodash-es';
|
|
|
|
export type Task<T, R> = {
|
|
readonly id: string;
|
|
status: 'idle' | 'processing' | 'succeeded' | 'failed';
|
|
data: T;
|
|
error: unknown | undefined;
|
|
count: number;
|
|
// TODO: Could be useful to adding progress property.
|
|
// TODO: Could be useful to adding start_at/end_at/duration properties.
|
|
result: undefined | R;
|
|
};
|
|
|
|
export type QueueOptions = {
|
|
verbose?: boolean;
|
|
concurrency?: number;
|
|
retry?: number;
|
|
// TODO: Could be useful to adding timeout property for retry.
|
|
};
|
|
|
|
export type ComputedQueueOptions = Required<QueueOptions>;
|
|
|
|
export const defaultQueueOptions = {
|
|
concurrency: 1,
|
|
retry: 0,
|
|
verbose: false,
|
|
};
|
|
|
|
/**
|
|
* An in-memory queue that processes tasks in parallel with a given concurrency.
|
|
* @see {@link https://www.npmjs.com/package/fastq}
|
|
* @template T - The type of the worker task data.
|
|
* @template R - The type of the worker output data.
|
|
*/
|
|
export class Queue<T, R> {
|
|
private readonly queue: fastq.queueAsPromised<string, Task<T, R>>;
|
|
private readonly store = new Map<string, Task<T, R>>();
|
|
readonly options: ComputedQueueOptions;
|
|
readonly worker: (data: T) => Promise<R>;
|
|
|
|
/**
|
|
* Create a new queue.
|
|
* @param worker - The worker function that processes the task.
|
|
* @param options - The queue options.
|
|
*/
|
|
constructor(worker: (data: T) => Promise<R>, options?: QueueOptions) {
|
|
this.options = { ...defaultQueueOptions, ...options };
|
|
this.worker = worker;
|
|
this.store = new Map<string, Task<T, R>>();
|
|
this.queue = this.buildQueue();
|
|
}
|
|
|
|
get tasks(): Task<T, R>[] {
|
|
const tasks: Task<T, R>[] = [];
|
|
for (const task of this.store.values()) {
|
|
tasks.push(task);
|
|
}
|
|
return tasks;
|
|
}
|
|
|
|
getTask(id: string): Task<T, R> {
|
|
const task = this.store.get(id);
|
|
if (!task) {
|
|
throw new Error(`Task with id ${id} not found`);
|
|
}
|
|
return task;
|
|
}
|
|
|
|
/**
|
|
* Wait for the queue to be empty.
|
|
* @returns Promise<void> - The returned Promise will be resolved when all tasks in the queue have been processed by a worker.
|
|
* This promise could be ignored as it will not lead to a `unhandledRejection`.
|
|
*/
|
|
async drained(): Promise<void> {
|
|
await this.queue.drain();
|
|
}
|
|
|
|
/**
|
|
* Add a task at the end of the queue.
|
|
* @see {@link https://www.npmjs.com/package/fastq}
|
|
* @param data
|
|
* @returns Promise<void> - A Promise that will be fulfilled (rejected) when the task is completed successfully (unsuccessfully).
|
|
* This promise could be ignored as it will not lead to a `unhandledRejection`.
|
|
*/
|
|
async push(data: T): Promise<Task<T, R>> {
|
|
const id = uniqueId();
|
|
const task: Task<T, R> = { id, status: 'idle', error: undefined, count: 0, data, result: undefined };
|
|
this.store.set(id, task);
|
|
return this.queue.push(id);
|
|
}
|
|
|
|
// TODO: Support more function delegation to fastq.
|
|
|
|
private buildQueue(): fastq.queueAsPromised<string, Task<T, R>> {
|
|
return fastq.promise((id: string) => {
|
|
const task = this.getTask(id);
|
|
return this.work(task);
|
|
}, this.options.concurrency);
|
|
}
|
|
|
|
private async work(task: Task<T, R>): Promise<Task<T, R>> {
|
|
task.count += 1;
|
|
task.error = undefined;
|
|
task.status = 'processing';
|
|
if (this.options.verbose) {
|
|
console.log('[task] processing:', task);
|
|
}
|
|
try {
|
|
task.result = await this.worker(task.data);
|
|
task.status = 'succeeded';
|
|
if (this.options.verbose) {
|
|
console.log('[task] succeeded:', task);
|
|
}
|
|
return task;
|
|
} catch (error) {
|
|
task.error = error;
|
|
task.status = 'failed';
|
|
if (this.options.verbose) {
|
|
console.log('[task] failed:', task);
|
|
}
|
|
if (this.options.retry > 0 && task.count < this.options.retry) {
|
|
if (this.options.verbose) {
|
|
console.log('[task] retry:', task);
|
|
}
|
|
return this.work(task);
|
|
}
|
|
return task;
|
|
}
|
|
}
|
|
}
|