0
Fork 0
mirror of https://github.com/TryGhost/Ghost.git synced 2025-02-10 23:36:14 -05:00

Added node event support to the job queue (#21858)

ref https://linear.app/ghost/issue/ENG-1851/
- added handling so that jobs run by the job queue can emit events on
completion
- added tests

Events (both node and our DomainEvents lib) must be emitted on the
primary process, so we can't emit these within the worker threads.
Instead, we'll return the necessary data with the job's completion in
the thread message such that the JobQueueManager can emit whatever
events may be needed.
This commit is contained in:
Steve Larson 2024-12-10 15:47:49 -06:00 committed by GitHub
parent 0f9449137f
commit dd9d3a6f2e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 112 additions and 11 deletions

View file

@ -15,6 +15,7 @@ const errorHandler = (error, workerMeta) => {
logging.error(error);
sentry.captureException(error);
};
const events = require('../../lib/common/events');
const workerMessageHandler = ({name, message}) => {
if (typeof message === 'string') {
@ -43,7 +44,7 @@ const initTestMode = () => {
}, 5000);
};
const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, config, prometheusClient});
const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, config, prometheusClient, events});
module.exports = jobManager;
module.exports.initTestMode = initTestMode;

View file

@ -3,6 +3,7 @@ const path = require('path');
const configUtils = require('../../utils/configUtils');
const models = require('../../../core/server/models');
const testUtils = require('../../utils/');
const events = require('../../../core/server/lib/common/events');
// Helper function to wait for job completion
async function waitForJobCompletion(jobName, maxWaitTimeMs = 5000, checkIntervalMs = 50) {
@ -53,10 +54,40 @@ describe('Job Queue', function () {
// Wait for the job to complete
await waitForJobCompletion(job.name, 8000); // Increase wait time
// Check job status
const jobEntry = await models.Job.findOne({name: job.name});
// Verify that the job no longer exists in the queue
const jobEntry = await models.Job.findOne({name: job.name});
assert.equal(jobEntry, null);
});
it('should emit events if present in result', async function () {
this.timeout(10000);
const job = {
name: `emit-events-${Date.now()}`,
metadata: {
job: path.resolve(__dirname, './test-job-events.js'),
data: {}
}
};
let eventEmitted = false;
let eventData = null;
// Set up the event listener
events.on('member.edited', (data) => {
eventEmitted = true;
eventData = data;
});
const result = await jobService.addQueuedJob(job);
assert.ok(result);
await waitForJobCompletion(job.name, 8000); // Increase wait time
// Assert that the event was emitted
assert.ok(eventEmitted, 'Expected job.completed event to be emitted');
assert.ok(eventData, 'Expected event data to be captured');
const jobEntry = await models.Job.findOne({name: job.name});
assert.equal(jobEntry, null);
});
});

View file

@ -0,0 +1,19 @@
/**
* A job that simulates an event-driven process.
* @returns {Object} An object containing the event data.
*/
module.exports = function jobWithEvents() {
const num1 = Math.floor(Math.random() * 100);
const num2 = Math.floor(Math.random() * 100);
const result = num1 + num2;
return {
success: true,
data: {
result: result
},
eventData: {
events: [{name: 'member.edited', data: {id: '1'}}]
}
};
};

View file

@ -42,6 +42,7 @@ class JobManager {
#jobQueueManager = null;
#config;
#JobModel;
#events;
/**
* @param {Object} options
@ -53,14 +54,16 @@ class JobManager {
* @param {boolean} [options.isDuplicate] - if true, the job manager will not initialize the job queue
* @param {JobQueueManager} [options.jobQueueManager] - job queue manager instance (for testing)
* @param {Object} [options.prometheusClient] - prometheus client instance (for testing)
* @param {Object} [options.events] - events instance (for testing)
*/
constructor({errorHandler, workerMessageHandler, JobModel, domainEvents, config, isDuplicate = false, jobQueueManager = null, prometheusClient = null}) {
constructor({errorHandler, workerMessageHandler, JobModel, domainEvents, config, isDuplicate = false, jobQueueManager = null, prometheusClient = null, events = null}) {
this.inlineQueue = fastq(this, worker, 3);
this._jobMessageHandler = this._jobMessageHandler.bind(this);
this._jobErrorHandler = this._jobErrorHandler.bind(this);
this.#domainEvents = domainEvents;
this.#config = config;
this.#JobModel = JobModel;
this.#events = events;
const combinedMessageHandler = workerMessageHandler
? ({name, message}) => {
@ -104,7 +107,7 @@ class JobManager {
#initializeJobQueueManager() {
if (this.#config?.get('services:jobs:queue:enabled') === true && !this.#jobQueueManager) {
this.#jobQueueManager = new JobQueueManager({JobModel: this.#JobModel, config: this.#config, prometheusClient: this.prometheusClient});
this.#jobQueueManager = new JobQueueManager({JobModel: this.#JobModel, config: this.#config, prometheusClient: this.prometheusClient, eventEmitter: this.#events});
this.#jobQueueManager.init();
}
}

View file

@ -5,15 +5,16 @@ const debug = require('@tryghost/debug')('job-manager:JobQueueManager');
const logging = require('@tryghost/logging');
class JobQueueManager {
constructor({JobModel, config, logger = logging, WorkerPool = workerpool, prometheusClient}) {
constructor({JobModel, config, logger = logging, WorkerPool = workerpool, prometheusClient, eventEmitter}) {
this.jobsRepository = new JobsRepository({JobModel});
this.config = this.initializeConfig(config?.get('services:jobs:queue') || {});
this.logger = this.createLogger(logger, this.config.logLevel);
this.WorkerPool = WorkerPool;
this.pool = this.createWorkerPool();
this.state = this.initializeState();
this.eventEmitter = eventEmitter;
this.prometheusClient = prometheusClient;
if (prometheusClient) {
this.prometheusClient.registerCounter({
name: 'job_manager_queue_job_completion_count',
@ -127,6 +128,16 @@ class JobQueueManager {
}
}
/**
* Emits events to the Node event emitter
* @param {Array<{name: string, data: any}>} events - The events to emit, e.g. member.edited
*/
emitEvents(events) {
events.forEach((e) => {
this.eventEmitter.emit(e.name, e.data);
});
}
async processJobs(jobs) {
for (const job of jobs) {
const jobMetadata = JSON.parse(job.get('metadata'));
@ -141,12 +152,20 @@ class JobQueueManager {
async executeJob(job, jobName, jobMetadata) {
this.state.queuedJobs.add(jobName);
try {
await this.pool.exec('executeJob', [jobMetadata.job, jobMetadata.data]);
/**
* @param {'executeJob'} jobName - This is the generic job execution fn
* @param {Array<{name: string, data: any}>} args - The arguments to pass to the job execution fn
* @returns {Promise<{success?: boolean, eventData?: {events: Array<{name: string, data: any}>}}>}
*/
const result = await this.pool.exec('executeJob', [jobMetadata.job, jobMetadata.data]);
await this.jobsRepository.delete(job.id);
this.prometheusClient?.getMetric('job_manager_queue_job_completion_count')?.inc({jobName});
if (jobName === 'update-member-email-analytics') {
this.prometheusClient?.getMetric('email_analytics_aggregate_member_stats_count')?.inc();
}
if (result && result.eventData) {
this.emitEvents(result.eventData.events); // this is nested within eventData because we may want to support DomainEvents emission as well
}
} catch (error) {
await this.handleJobError(job, jobMetadata, error);
} finally {

View file

@ -10,7 +10,7 @@ describe('JobQueueManager', function () {
let mockWorkerPool;
let mockPrometheusClient;
let metricIncStub;
let mockEventEmitter;
beforeEach(function () {
metricIncStub = sinon.stub();
mockJobModel = {};
@ -36,13 +36,17 @@ describe('JobQueueManager', function () {
terminate: sinon.stub()
})
};
mockEventEmitter = {
emit: sinon.stub()
};
jobQueueManager = new JobQueueManager({
JobModel: mockJobModel,
config: mockConfig,
logger: mockLogger,
WorkerPool: mockWorkerPool,
prometheusClient: mockPrometheusClient
prometheusClient: mockPrometheusClient,
eventEmitter: mockEventEmitter
});
});
@ -305,6 +309,30 @@ describe('JobQueueManager', function () {
await jobQueueManager.executeJob(job, 'update-member-email-analytics', {job: 'update-member-email-analytics', data: {}});
expect(metricIncStub.calledTwice).to.be.true;
});
it('should emit events if present in result', async function () {
const job = {id: '1', get: sinon.stub().returns('{"job": "testJob", "data": {}}')};
jobQueueManager.pool.exec.resolves({eventData: {events: [{name: 'member.edited', data: {id: '1'}}]}});
sinon.stub(jobQueueManager.jobsRepository, 'delete').resolves();
await jobQueueManager.executeJob(job, 'testJob', {job: 'testJob', data: {}});
expect(mockEventEmitter.emit.calledOnce).to.be.true;
expect(mockEventEmitter.emit.calledWith('member.edited', {id: '1'})).to.be.true;
});
});
describe('emitEvents', function () {
it('should emit events', function () {
jobQueueManager.emitEvents([{name: 'member.edited', data: {id: '1'}}]);
expect(mockEventEmitter.emit.calledOnce).to.be.true;
expect(mockEventEmitter.emit.calledWith('member.edited', {id: '1'})).to.be.true;
});
it('should handle multiple events', function () {
jobQueueManager.emitEvents([{name: 'member.edited', data: {id: '1'}}, {name: 'site.changed', data: {}}]);
expect(mockEventEmitter.emit.calledTwice).to.be.true;
expect(mockEventEmitter.emit.calledWith('member.edited', {id: '1'})).to.be.true;
expect(mockEventEmitter.emit.calledWith('site.changed', {})).to.be.true;
});
});
describe('handleJobError', function () {