0
Fork 0
mirror of https://github.com/TryGhost/Ghost.git synced 2025-03-11 02:12:21 -05:00

Added in memory queue to prevent duplicate entries

This is much simpler than trying to write to the db every time that something is queued vs. started vs. other states.
This commit is contained in:
Steve Larson 2024-09-10 07:59:34 -05:00
parent f9229cb4e2
commit 0fdd56adbc
3 changed files with 40 additions and 16 deletions

View file

@ -11,7 +11,8 @@ class JobManagerBackground {
const workerpool = require('workerpool'); // don't use fastq because it runs on the main thread
this.pool = workerpool.pool(__dirname + '/jobs/test-job.js', {
workerType: 'thread',
workerTerminateTimeout: 10000
workerTerminateTimeout: 10000,
maxWorkers: 3
});
if (JobModel) {
@ -26,7 +27,7 @@ class JobManagerBackground {
this.reportStats();
if (this.#testMode) {
this.testDataContinuousFill();
// this.testDataContinuousFill();
this.startQueueFiller();
}
}
@ -41,9 +42,9 @@ class JobManagerBackground {
}
async startQueueFiller() {
console.log(`--queue filler starting--`);
const POLL_INTERVAL = 1000; // 1 second
const POLL_INTERVAL = 5000; // 5 seconds
let isPolling = false;
const queuedJobs = new Set(); // In-memory set to track queued jobs
const poll = async () => {
if (isPolling) {
@ -54,30 +55,49 @@ class JobManagerBackground {
isPolling = true;
try {
const stats = await this.getStats();
if (stats.pendingTasks <= 100) {
const entriesToAdd = Math.min(50, 101 - stats.pendingTasks);
if (stats.pendingTasks <= 50) {
const entriesToAdd = Math.min(50, 51 - stats.pendingTasks);
console.log(`--adding ${entriesToAdd} entries to queue--`);
// get entries from repository
const jobs = await this.jobsRepository.getQueuedJobs();
// console.log(`--jobs--`, jobs);
jobs.forEach((job) => {
const jobName = job.get('name');
// console.log(`--job--`, jobName);
if (queuedJobs.has(jobName)) {
console.log(`--skipping already queued job: ${jobName}--`);
return;
}
console.log(`--adding job to queue--`, jobName);
const jobMetadata = JSON.parse(job.get('metadata'));
// console.log(`--jobMetadata--`, jobMetadata);
const jobData = jobMetadata.data;
const jobPath = jobMetadata.job;
queuedJobs.add(jobName);
// Queue the job directly without manipulating jobsRepository
this.pool.exec(jobPath, [jobData])
.then((result) => {
.then(async (result) => {
console.log(`Job ${jobName} completed with result: ${result}`);
// Here, the job worker would update the jobsRepository
queuedJobs.delete(jobName);
// TODO: We may want to update scheduled jobs, tbd
const jobFromTable = await this.jobsRepository.read(jobName);
console.log(`--jobFromTable--`, jobFromTable?.get('name'), jobName);
await this.jobsRepository.delete(job.id);
})
.catch((error) => {
.catch(async (error) => {
console.error(`Job ${jobName} failed:`, error);
// Here, the job worker would update the jobsRepository
queuedJobs.delete(jobName);
// TODO: We may want to do something regarding errors or retries
// retries could be stored in metadata
await this.jobsRepository.update(job.id, {
status: 'error',
finished_at: new Date(),
metadata: {
error: error,
retries: jobMetadata.retries + 1,
...jobMetadata
}
});
});
});
} else {
@ -102,7 +122,6 @@ class JobManagerBackground {
// this should add a job to the jobsRepository
// and maybe the queue? or we let that get polled
const model = await this.jobsRepository.addQueuedJob(job, metadata);
return model;
}

View file

@ -48,7 +48,12 @@ class JobsRepository {
}
async delete(id) {
console.log(`attempting to delete job ${id}`);
try {
await this._JobModel.destroy({id});
} catch (error) {
console.error(`Error deleting job ${id}:`, error);
}
}
}

View file

@ -5,7 +5,7 @@ async function add({a, b}) {
if (!a || !b) {
throw new Error('Invalid input');
}
await new Promise(resolve => setTimeout(resolve, 1000 + Math.random() * 4000)); // Random delay between 1-5 seconds
await new Promise(resolve => setTimeout(resolve, Math.random() * 1000)); // Random delay
return a + b;
}