From 0fdd56adbcd5b55ca2ab8caf3beafb5d83289620 Mon Sep 17 00:00:00 2001 From: Steve Larson <9larsons@gmail.com> Date: Tue, 10 Sep 2024 07:59:34 -0500 Subject: [PATCH] Added in memory queue to prevent duplicate entries This is much simpler than trying to write to the db every time that something is queued vs. started vs. other states. --- .../lib/JobManagerBackground.js | 47 +++++++++++++------ .../lib/JobsRepository.js | 7 ++- .../lib/jobs/test-job.js | 2 +- 3 files changed, 40 insertions(+), 16 deletions(-) diff --git a/ghost/job-manager-background/lib/JobManagerBackground.js b/ghost/job-manager-background/lib/JobManagerBackground.js index 165e58b158..3b26f3bff9 100644 --- a/ghost/job-manager-background/lib/JobManagerBackground.js +++ b/ghost/job-manager-background/lib/JobManagerBackground.js @@ -11,7 +11,8 @@ class JobManagerBackground { const workerpool = require('workerpool'); // don't use fastq because it runs on the main thread this.pool = workerpool.pool(__dirname + '/jobs/test-job.js', { workerType: 'thread', - workerTerminateTimeout: 10000 + workerTerminateTimeout: 10000, + maxWorkers: 3 }); if (JobModel) { @@ -26,7 +27,7 @@ class JobManagerBackground { this.reportStats(); if (this.#testMode) { - this.testDataContinuousFill(); + // this.testDataContinuousFill(); this.startQueueFiller(); } } @@ -41,9 +42,9 @@ class JobManagerBackground { } async startQueueFiller() { - console.log(`--queue filler starting--`); - const POLL_INTERVAL = 1000; // 1 second + const POLL_INTERVAL = 5000; // 5 seconds let isPolling = false; + const queuedJobs = new Set(); // In-memory set to track queued jobs const poll = async () => { if (isPolling) { @@ -54,30 +55,49 @@ class JobManagerBackground { isPolling = true; try { const stats = await this.getStats(); - if (stats.pendingTasks <= 100) { - const entriesToAdd = Math.min(50, 101 - stats.pendingTasks); + if (stats.pendingTasks <= 50) { + const entriesToAdd = Math.min(50, 51 - stats.pendingTasks); console.log(`--adding ${entriesToAdd} entries to queue--`); // get entries from repository const jobs = await this.jobsRepository.getQueuedJobs(); - // console.log(`--jobs--`, jobs); jobs.forEach((job) => { const jobName = job.get('name'); - // console.log(`--job--`, jobName); + if (queuedJobs.has(jobName)) { + console.log(`--skipping already queued job: ${jobName}--`); + return; + } + console.log(`--adding job to queue--`, jobName); const jobMetadata = JSON.parse(job.get('metadata')); - // console.log(`--jobMetadata--`, jobMetadata); const jobData = jobMetadata.data; const jobPath = jobMetadata.job; + queuedJobs.add(jobName); + // Queue the job directly without manipulating jobsRepository this.pool.exec(jobPath, [jobData]) - .then((result) => { + .then(async (result) => { console.log(`Job ${jobName} completed with result: ${result}`); - // Here, the job worker would update the jobsRepository + queuedJobs.delete(jobName); + // TODO: We may want to update scheduled jobs, tbd + const jobFromTable = await this.jobsRepository.read(jobName); + console.log(`--jobFromTable--`, jobFromTable?.get('name'), jobName); + await this.jobsRepository.delete(job.id); }) - .catch((error) => { + .catch(async (error) => { console.error(`Job ${jobName} failed:`, error); - // Here, the job worker would update the jobsRepository + queuedJobs.delete(jobName); + // TODO: We may want to do something regarding errors or retries + // retries could be stored in metadata + await this.jobsRepository.update(job.id, { + status: 'error', + finished_at: new Date(), + metadata: { + error: error, + retries: jobMetadata.retries + 1, + ...jobMetadata + } + }); }); }); } else { @@ -102,7 +122,6 @@ class JobManagerBackground { // this should add a job to the jobsRepository // and maybe the queue? or we let that get polled const model = await this.jobsRepository.addQueuedJob(job, metadata); - return model; } diff --git a/ghost/job-manager-background/lib/JobsRepository.js b/ghost/job-manager-background/lib/JobsRepository.js index 605e43b048..27abf1e53b 100644 --- a/ghost/job-manager-background/lib/JobsRepository.js +++ b/ghost/job-manager-background/lib/JobsRepository.js @@ -48,7 +48,12 @@ class JobsRepository { } async delete(id) { - await this._JobModel.destroy({id}); + console.log(`attempting to delete job ${id}`); + try { + await this._JobModel.destroy({id}); + } catch (error) { + console.error(`Error deleting job ${id}:`, error); + } } } diff --git a/ghost/job-manager-background/lib/jobs/test-job.js b/ghost/job-manager-background/lib/jobs/test-job.js index e569d15d8a..ca9bee5c91 100644 --- a/ghost/job-manager-background/lib/jobs/test-job.js +++ b/ghost/job-manager-background/lib/jobs/test-job.js @@ -5,7 +5,7 @@ async function add({a, b}) { if (!a || !b) { throw new Error('Invalid input'); } - await new Promise(resolve => setTimeout(resolve, 1000 + Math.random() * 4000)); // Random delay between 1-5 seconds + await new Promise(resolve => setTimeout(resolve, Math.random() * 1000)); // Random delay return a + b; }