0
Fork 0
mirror of https://github.com/TryGhost/Ghost.git synced 2025-03-11 02:12:21 -05:00

Updated queueing system to run jobs from file

- added variable polling rates to try to handle bursts of activity/events
- added generic worker to process jobs from filepath (saved in jobs.metadata.job)
- prevented duplicate queue entries
This commit is contained in:
Steve Larson 2024-09-10 09:50:41 -05:00
parent 0fdd56adbc
commit ced8c1af9e
5 changed files with 201 additions and 56 deletions

View file

@ -1,5 +1,13 @@
const queries = require('../lib/queries');
/**
* Updates email analytics for a specific member
*
* @param {Object} options - The options object
* @param {string} options.memberId - The ID of the member to update analytics for
* @returns {Promise<Object>} The result of the aggregation query (1/0)
*/
module.exports = async function memberEmailAnalyticsUpdate({memberId}) {
await queries.aggregateMemberStats(memberId);
const result = await queries.aggregateMemberStats(memberId);
return result;
};

View file

@ -198,8 +198,16 @@ module.exports = {
updateQuery.email_open_rate = Math.round(emailOpenedCount.count / trackedEmailCount * 100);
}
await db.knex('members')
.update(updateQuery)
.where('id', memberId);
let result;
try {
result = await db.knex('members')
.update(updateQuery)
.where('id', memberId);
} catch (error) {
debug(`Error updating member stats for member ${memberId}: ${error.message}`);
throw error;
}
return result;
}
};

View file

@ -1,18 +1,23 @@
const JobsRepository = require('./JobsRepository'); // use this for persistence
const workerpool = require('workerpool');
const path = require('path');
const JobsRepository = require('./jobsRepository');
/**
* @class JobManagerBackground
* @description Manages background jobs using a worker pool and job repository.
*/
class JobManagerBackground {
#testMode = true;
/**
* @param {Object} options
* @param {Object} [options.JobModel] - a model which can persist job data in the storage
*/
* @constructor
* @param {Object} options - Configuration options for the job manager.
* @param {Object} [options.JobModel] - A model which can persist job data in storage.
*/
constructor({JobModel}) {
const workerpool = require('workerpool'); // don't use fastq because it runs on the main thread
this.pool = workerpool.pool(__dirname + '/jobs/test-job.js', {
this.pool = workerpool.pool(path.join(__dirname, '/workers/generic-worker.js'), {
workerType: 'thread',
workerTerminateTimeout: 10000,
maxWorkers: 3
workerTerminateTimeout: 10000
});
if (JobModel) {
@ -20,10 +25,15 @@ class JobManagerBackground {
}
}
/**
* @method init
* @async
* @description Initializes the job manager, starts reporting stats, and optionally starts the queue filler.
* @returns {Promise<void>}
*/
async init() {
console.log(`JobManagerBackground initializing`);
console.log(`[JobManager] Initializing`);
// console.log(`JobModel`, this.jobsRepository);
this.reportStats();
if (this.#testMode) {
@ -32,17 +42,42 @@ class JobManagerBackground {
}
}
/**
* @method testDataContinuousFill
* @description Continuously adds test jobs to the queue for testing purposes.
* @private
*/
testDataContinuousFill() {
setInterval(() => {
console.log(`--adding 50 test entries--`);
console.log(`[JobManager] Adding 50 test entries`);
for (let i = 0; i < 50; i++) {
this.addJob(`test-entry-${Date.now()}-${i}`, {job: 'testEntryJob', data: {a: 1, b: 2}});
}
}, 5000);
}
/**
* @method startQueueFiller
* @async
* @description Starts the queue filler process.
*
* This method initializes a polling mechanism to continuously check for and process queued jobs.
* It dynamically adjusts the polling interval based on job availability and system load.
*
* Key features:
* - Maintains a minimum of 500 pending tasks in the worker pool
* - Dynamically adjusts polling interval between 1 second and 1 minute
* - Uses an in-memory set to prevent duplicate job processing
* - Handles job execution and cleanup
*
* @returns {Promise<void>}
*/
async startQueueFiller() {
const POLL_INTERVAL = 5000; // 5 seconds
const MIN_POLL_INTERVAL = 1000; // 1 second
const MAX_POLL_INTERVAL = 60000; // 1 minute
const INCREASE_INTERVAL_THRESHOLD = 30000; // 30 seconds
let currentPollInterval = MIN_POLL_INTERVAL;
let lastFoundJobTime = Date.now();
let isPolling = false;
const queuedJobs = new Set(); // In-memory set to track queued jobs
@ -50,90 +85,100 @@ class JobManagerBackground {
if (isPolling) {
return;
}
console.log(`--queue filler polling--`);
isPolling = true;
console.log(`[JobManager] Polling for jobs, current interval: ${Math.floor(currentPollInterval / 1000)}s`);
try {
const stats = await this.getStats();
if (stats.pendingTasks <= 50) {
const entriesToAdd = Math.min(50, 51 - stats.pendingTasks);
console.log(`--adding ${entriesToAdd} entries to queue--`);
// get entries from repository
const jobs = await this.jobsRepository.getQueuedJobs();
if (stats.pendingTasks <= 500) {
const entriesToAdd = Math.min(500, 501 - stats.pendingTasks);
console.log(`[JobManager] Adding ${entriesToAdd} queue entries. Current pending tasks: ${stats.pendingTasks}`);
const jobs = await this.jobsRepository.getQueuedJobs(entriesToAdd);
if (jobs.length > 0) {
lastFoundJobTime = Date.now();
currentPollInterval = MIN_POLL_INTERVAL;
} else {
const timeSinceLastJob = Date.now() - lastFoundJobTime;
if (timeSinceLastJob > INCREASE_INTERVAL_THRESHOLD) {
currentPollInterval = MAX_POLL_INTERVAL;
}
}
jobs.forEach((job) => {
const jobName = job.get('name');
if (queuedJobs.has(jobName)) {
console.log(`--skipping already queued job: ${jobName}--`);
return;
}
console.log(`--adding job to queue--`, jobName);
const jobMetadata = JSON.parse(job.get('metadata'));
const jobData = jobMetadata.data;
const jobPath = jobMetadata.job;
queuedJobs.add(jobName);
// Queue the job directly without manipulating jobsRepository
this.pool.exec(jobPath, [jobData])
.then(async (result) => {
console.log(`Job ${jobName} completed with result: ${result}`);
queuedJobs.delete(jobName);
// TODO: We may want to update scheduled jobs, tbd
const jobFromTable = await this.jobsRepository.read(jobName);
console.log(`--jobFromTable--`, jobFromTable?.get('name'), jobName);
this.pool.exec('executeJob', [jobPath, jobData])
.then(async () => {
await this.jobsRepository.delete(job.id);
queuedJobs.delete(jobName); // clear memory entry last
})
.catch(async (error) => {
console.error(`Job ${jobName} failed:`, error);
queuedJobs.delete(jobName);
// TODO: We may want to do something regarding errors or retries
// retries could be stored in metadata
await this.jobsRepository.update(job.id, {
status: 'error',
finished_at: new Date(),
metadata: {
error: error,
error: error.message,
retries: jobMetadata.retries + 1,
...jobMetadata
}
});
});
});
} else {
console.log(`--queue full, skipping this poll cycle--`);
}
} catch (error) {
console.error('Error in queue filler:', error);
console.error('[JobManager] Error in queue filler:', error);
} finally {
isPolling = false;
this.queueFillerTimeout = setTimeout(poll, currentPollInterval);
}
};
// Start the polling process
this.queueFillerInterval = setInterval(poll, POLL_INTERVAL);
// Initial poll
poll();
poll(); // Initial poll
}
// TODO: could allow for queued entries or just offloaded (immediate) jobs
/**
* @method addJob
* @async
* @description Adds a new job to the job repository.
* @param {string} job - The name or identifier of the job.
* @param {Object} metadata - Metadata associated with the job.
* @returns {Promise<Object>} The added job model.
*/
async addJob(job, metadata) {
// this should add a job to the jobsRepository
// and maybe the queue? or we let that get polled
const model = await this.jobsRepository.addQueuedJob(job, metadata);
return model;
}
/**
* @method getStats
* @async
* @description Retrieves the current stats of the worker pool.
* @returns {Promise<Object>} The worker pool stats.
*/
async getStats() {
return this.pool.stats();
}
/**
* @method reportStats
* @async
* @description Starts periodic reporting of JobManagerBackground stats.
*/
async reportStats() {
setInterval(() => {
console.log(`-- JobManagerBackground stats --`);
console.log(`[JobManager] -- JobManagerBackground stats --`);
console.log(this.pool.stats());
}, 5000);
}, 10000);
}
}

View file

@ -1,42 +1,89 @@
/**
* @class JobsRepository
* @description Repository class for managing job-related operations.
*/
class JobsRepository {
/**
* @constructor
* @param {Object} options - The options object.
* @param {Object} options.JobModel - The Job model for database operations.
*/
constructor({JobModel}) {
this._JobModel = JobModel;
}
/**
* @method add
* @async
* @description Adds a new job to the database.
* @param {Object} data - The job data to be added.
* @returns {Promise<Object>} The added job object.
*/
async add(data) {
const job = await this._JobModel.add(data);
return job;
}
/**
* @method read
* @async
* @description Reads a job from the database by name.
* @param {string} name - The name of the job to read.
* @returns {Promise<Object|null>} The job object if found, null otherwise.
*/
async read(name) {
const job = await this._JobModel.findOne({name});
return job;
}
/**
* @method update
* @async
* @description Updates a job in the database.
* @param {string} id - The ID of the job to update.
* @param {Object} data - The updated job data.
* @returns {Promise<void>}
*/
async update(id, data) {
await this._JobModel.edit(data, {id});
}
/**
* @method getNextQueuedJob
* @async
* @description Retrieves the next queued job from the database.
* @returns {Promise<Object|null>} The next queued job object if found, null otherwise.
*/
async getNextQueuedJob() {
const job = await this._JobModel.findOne({
// status: 'queued', // if shutdown is called, we can just re-run the job that is at started or finished and not removed
queue_entry: 1
});
return job;
}
/**
* @method getQueuedJobs
* @async
* @description Retrieves a list of queued jobs from the database.
* @param {number} [limit=50] - The maximum number of jobs to retrieve.
* @returns {Promise<Array>} An array of queued job objects.
*/
async getQueuedJobs(limit = 50) {
console.log(`attempting to get ${limit} queued jobs`);
const jobs = await this._JobModel.findPage({
filter: 'queue_entry:1',
limit
});
// console.log(`--fetched jobs`,jobs.data);
return jobs.data;
}
/**
* @method addQueuedJob
* @async
* @description Adds a new queued job to the database.
* @param {string} name - The name of the job.
* @param {Object} metadata - The metadata associated with the job.
* @returns {Promise<Object>} The added job object.
*/
async addQueuedJob(name, metadata) {
const job = await this._JobModel.add({
name: name,
@ -47,8 +94,14 @@ class JobsRepository {
return job;
}
/**
* @method delete
* @async
* @description Deletes a job from the database.
* @param {string} id - The ID of the job to delete.
* @returns {Promise<void>}
*/
async delete(id) {
console.log(`attempting to delete job ${id}`);
try {
await this._JobModel.destroy({id});
} catch (error) {

View file

@ -0,0 +1,31 @@
/**
* @module generic-worker
* @description A generic worker module for executing jobs in a worker pool.
*/
const workerpool = require('workerpool');
/**
* @function executeJob
* @description Executes a job by requiring the job module and calling it with the provided data.
* @param {string} jobPath - The path to the job module.
* @param {Object} jobData - The data to be passed to the job function.
* @returns {Promise<*>} The result of the job execution.
* @throws {Error} If the job module doesn't export a function or if the execution fails.
*/
function executeJob(jobPath, jobData) {
try {
const jobModule = require(jobPath);
if (typeof jobModule !== 'function') {
throw new Error(`Job module at ${jobPath} does not export a function`);
}
return jobModule(jobData);
} catch (error) {
throw new Error(`Failed to execute job: ${error.message}`);
}
}
// Register the executeJob function as a worker method
workerpool.worker({
executeJob: executeJob
});