0
Fork 0
mirror of https://github.com/TryGhost/Ghost.git synced 2025-03-11 02:12:21 -05:00

Added support for one off jobs

refs https://github.com/TryGhost/Toolbox/issues/357

- This is a scaffolding for what will become a one off job scheduling mechanism. The aim is allowing to run jobs which can be only ever be run once in the lifetime of the instance - persisting through restarts.
This commit is contained in:
Naz 2022-07-21 19:39:54 +01:00
parent 13bfc0746b
commit 5dae6d6acf
3 changed files with 92 additions and 1 deletions

View file

@ -7,6 +7,7 @@ const {UnhandledJobError, IncorrectUsageError} = require('@tryghost/errors');
const logging = require('@tryghost/logging'); const logging = require('@tryghost/logging');
const isCronExpression = require('./is-cron-expression'); const isCronExpression = require('./is-cron-expression');
const assembleBreeJob = require('./assemble-bree-job'); const assembleBreeJob = require('./assemble-bree-job');
const JobsRepository = require('./jobs-repository');
const worker = async (task, callback) => { const worker = async (task, callback) => {
try { try {
@ -31,8 +32,9 @@ class JobManager {
* @param {Object} options * @param {Object} options
* @param {Function} [options.errorHandler] - custom job error handler * @param {Function} [options.errorHandler] - custom job error handler
* @param {Function} [options.workerMessageHandler] - custom message handler coming from workers * @param {Function} [options.workerMessageHandler] - custom message handler coming from workers
* @param {Object} [options.JobModel] - a model which can persist job data in the storage
*/ */
constructor({errorHandler, workerMessageHandler}) { constructor({errorHandler, workerMessageHandler, JobModel}) {
this.queue = fastq(this, worker, 1); this.queue = fastq(this, worker, 1);
this.bree = new Bree({ this.bree = new Bree({
@ -43,6 +45,8 @@ class JobManager {
errorHandler: errorHandler, errorHandler: errorHandler,
workerMessageHandler: workerMessageHandler workerMessageHandler: workerMessageHandler
}); });
this._jobsRepository = new JobsRepository({JobModel});
} }
/** /**
@ -118,6 +122,33 @@ class JobManager {
} }
} }
/**
* Adds a job that could ever be executed once.
*
* @param {Object} GhostJob - job options
* @prop {Function | String} GhostJob.job - function or path to a module defining a job
* @prop {String} [GhostJob.name] - unique job name, if not provided takes function name or job script filename
* @prop {String | Date} [GhostJob.at] - Date, cron or human readable schedule format. Manage will do immediate execution if not specified. Not supported for "inline" jobs
* @prop {Object} [GhostJob.data] - data to be passed into the job
* @prop {Boolean} [GhostJob.offloaded] - creates an "offloaded" job running in a worker thread by default. If set to "false" runs an "inline" job on the same event loop
*/
async addOneOffJob({name, job, data, offloaded = true}) {
const persistedJob = await this._jobsRepository.read(name);
if (persistedJob) {
throw new IncorrectUsageError({
message: `A "${name}" one off job has already been executed.`
});
}
await this._jobsRepository.add({
name,
status: 'queued'
});
this.addJob({name, job, data, offloaded});
}
/** /**
* Removes an "offloaded" job from scheduled jobs queue. * Removes an "offloaded" job from scheduled jobs queue.
* It's NOT yet possible to remove "inline" jobs (will be possible when scheduling is added https://github.com/breejs/bree/issues/68). * It's NOT yet possible to remove "inline" jobs (will be possible when scheduling is added https://github.com/breejs/bree/issues/68).

View file

@ -0,0 +1,19 @@
class JobsRepository {
constructor({JobModel}) {
this._JobModel = JobModel;
}
async add(data) {
const job = await this._JobModel.add(data);
return job;
}
async read(name) {
const job = await this._JobModel.findOne({name});
return job;
}
}
module.exports = JobsRepository;

View file

@ -1,6 +1,7 @@
// Switch these lines once there are useful utils // Switch these lines once there are useful utils
// const testUtils = require('./utils'); // const testUtils = require('./utils');
require('./utils'); require('./utils');
const assert = require('assert');
const path = require('path'); const path = require('path');
const sinon = require('sinon'); const sinon = require('sinon');
const delay = require('delay'); const delay = require('delay');
@ -251,6 +252,46 @@ describe('Job Manager', function () {
}); });
}); });
describe('Add one off job', function () {
it('adds job to the queue when it is a unique one', async function () {
const spy = sinon.spy();
const JobModel = {
findOne: sinon.stub().resolves(undefined),
add: sinon.stub().resolves()
};
const jobManager = new JobManager({JobModel});
await jobManager.addOneOffJob({
job: spy,
name: 'unique name',
data: 'test data'
});
assert.equal(JobModel.add.called, true);
});
it('does not add a job to the queue when it already exists', async function () {
const spy = sinon.spy();
const JobModel = {
findOne: sinon.stub().resolves({name: 'I am the only one'}),
add: sinon.stub().throws('should not be called')
};
const jobManager = new JobManager({JobModel});
try {
await jobManager.addOneOffJob({
job: spy,
name: 'I am the only one',
data: 'test data'
});
throw new Error('should not reach this point');
} catch (error) {
assert.equal(error.message, 'A "I am the only one" one off job has already been executed.');
}
});
});
describe('Remove a job', function () { describe('Remove a job', function () {
it('removes a scheduled job from the queue', async function () { it('removes a scheduled job from the queue', async function () {
const jobManager = new JobManager({}); const jobManager = new JobManager({});