diff --git a/ghost/job-manager/lib/job-manager.js b/ghost/job-manager/lib/job-manager.js index d4b514802a..4fc7f23c10 100644 --- a/ghost/job-manager/lib/job-manager.js +++ b/ghost/job-manager/lib/job-manager.js @@ -7,6 +7,7 @@ const {UnhandledJobError, IncorrectUsageError} = require('@tryghost/errors'); const logging = require('@tryghost/logging'); const isCronExpression = require('./is-cron-expression'); const assembleBreeJob = require('./assemble-bree-job'); +const JobsRepository = require('./jobs-repository'); const worker = async (task, callback) => { try { @@ -31,8 +32,9 @@ class JobManager { * @param {Object} options * @param {Function} [options.errorHandler] - custom job error handler * @param {Function} [options.workerMessageHandler] - custom message handler coming from workers + * @param {Object} [options.JobModel] - a model which can persist job data in the storage */ - constructor({errorHandler, workerMessageHandler}) { + constructor({errorHandler, workerMessageHandler, JobModel}) { this.queue = fastq(this, worker, 1); this.bree = new Bree({ @@ -43,6 +45,8 @@ class JobManager { errorHandler: errorHandler, workerMessageHandler: workerMessageHandler }); + + this._jobsRepository = new JobsRepository({JobModel}); } /** @@ -118,6 +122,33 @@ class JobManager { } } + /** + * Adds a job that could ever be executed once. + * + * @param {Object} GhostJob - job options + * @prop {Function | String} GhostJob.job - function or path to a module defining a job + * @prop {String} [GhostJob.name] - unique job name, if not provided takes function name or job script filename + * @prop {String | Date} [GhostJob.at] - Date, cron or human readable schedule format. Manage will do immediate execution if not specified. Not supported for "inline" jobs + * @prop {Object} [GhostJob.data] - data to be passed into the job + * @prop {Boolean} [GhostJob.offloaded] - creates an "offloaded" job running in a worker thread by default. If set to "false" runs an "inline" job on the same event loop + */ + async addOneOffJob({name, job, data, offloaded = true}) { + const persistedJob = await this._jobsRepository.read(name); + + if (persistedJob) { + throw new IncorrectUsageError({ + message: `A "${name}" one off job has already been executed.` + }); + } + + await this._jobsRepository.add({ + name, + status: 'queued' + }); + + this.addJob({name, job, data, offloaded}); + } + /** * Removes an "offloaded" job from scheduled jobs queue. * It's NOT yet possible to remove "inline" jobs (will be possible when scheduling is added https://github.com/breejs/bree/issues/68). diff --git a/ghost/job-manager/lib/jobs-repository.js b/ghost/job-manager/lib/jobs-repository.js new file mode 100644 index 0000000000..daba18f23f --- /dev/null +++ b/ghost/job-manager/lib/jobs-repository.js @@ -0,0 +1,19 @@ +class JobsRepository { + constructor({JobModel}) { + this._JobModel = JobModel; + } + + async add(data) { + const job = await this._JobModel.add(data); + + return job; + } + + async read(name) { + const job = await this._JobModel.findOne({name}); + + return job; + } +} + +module.exports = JobsRepository; diff --git a/ghost/job-manager/test/job-manager.test.js b/ghost/job-manager/test/job-manager.test.js index 423a4f7579..0961495665 100644 --- a/ghost/job-manager/test/job-manager.test.js +++ b/ghost/job-manager/test/job-manager.test.js @@ -1,6 +1,7 @@ // Switch these lines once there are useful utils // const testUtils = require('./utils'); require('./utils'); +const assert = require('assert'); const path = require('path'); const sinon = require('sinon'); const delay = require('delay'); @@ -251,6 +252,46 @@ describe('Job Manager', function () { }); }); + describe('Add one off job', function () { + it('adds job to the queue when it is a unique one', async function () { + const spy = sinon.spy(); + const JobModel = { + findOne: sinon.stub().resolves(undefined), + add: sinon.stub().resolves() + }; + + const jobManager = new JobManager({JobModel}); + await jobManager.addOneOffJob({ + job: spy, + name: 'unique name', + data: 'test data' + }); + + assert.equal(JobModel.add.called, true); + }); + + it('does not add a job to the queue when it already exists', async function () { + const spy = sinon.spy(); + const JobModel = { + findOne: sinon.stub().resolves({name: 'I am the only one'}), + add: sinon.stub().throws('should not be called') + }; + + const jobManager = new JobManager({JobModel}); + + try { + await jobManager.addOneOffJob({ + job: spy, + name: 'I am the only one', + data: 'test data' + }); + throw new Error('should not reach this point'); + } catch (error) { + assert.equal(error.message, 'A "I am the only one" one off job has already been executed.'); + } + }); + }); + describe('Remove a job', function () { it('removes a scheduled job from the queue', async function () { const jobManager = new JobManager({});