From 1b1794063fa7e467501e62dda0150a5e6cbcd675 Mon Sep 17 00:00:00 2001 From: Naz Date: Thu, 5 Nov 2020 17:36:29 +1300 Subject: [PATCH] Added experimental job scheduling feature no issue - This method should be used with caution and should serve as a playground for upcoming new feature of scheduled jobs support --- ghost/job-manager/lib/job-manager.js | 40 ++++++++++++++++++++++ ghost/job-manager/package.json | 1 + ghost/job-manager/test/job-manager.test.js | 13 +++++++ 3 files changed, 54 insertions(+) diff --git a/ghost/job-manager/lib/job-manager.js b/ghost/job-manager/lib/job-manager.js index d7b25e219d..97d61f7dfd 100644 --- a/ghost/job-manager/lib/job-manager.js +++ b/ghost/job-manager/lib/job-manager.js @@ -1,5 +1,7 @@ const fastq = require('fastq'); +const later = require('@breejs/later'); const pWaitFor = require('p-wait-for'); +const isCronExpression = require('./is-cron-expression'); const worker = async (task, callback) => { try { @@ -21,6 +23,7 @@ const handler = (error, result) => { class JobManager { constructor(logging) { this.queue = fastq(this, worker, 1); + this.schedule = []; this.logging = logging; } @@ -38,7 +41,44 @@ class JobManager { }, handler); } + /** + * Schedules recuring job + * + * @param {Function|String} job - function or path to a file defining a job + * @param {Object} data - data to be passed into the joba + * @param {String} when - cron or human readable schedule format + */ + scheduleJob(job, data, when) { + let schedule; + + schedule = later.parse.text(when); + + if (isCronExpression(when)) { + schedule = later.parse.cron(when); + } + + if ((schedule.error && schedule.error !== -1) || schedule.schedules.length === 0) { + throw new Error('Invalid schedule format'); + } + + this.logging.info(`Scheduling job. Next run on: ${later.schedule(schedule).next()}`); + + const cancelInterval = later.setInterval(() => { + this.logging.info(`Scheduled job added to the queue.`); + this.addJob(job, data); + }, schedule); + + this.schedule.push(cancelInterval); + } + + /** + * @param {import('p-wait-for').Options} [options] + */ async shutdown(options) { + this.schedule.forEach((cancelHandle) => { + cancelHandle.clear(); + }); + if (this.queue.idle()) { return; } diff --git a/ghost/job-manager/package.json b/ghost/job-manager/package.json index cbe5027631..1b589d7336 100644 --- a/ghost/job-manager/package.json +++ b/ghost/job-manager/package.json @@ -24,6 +24,7 @@ "sinon": "9.2.1" }, "dependencies": { + "@breejs/later": "4.0.2", "cron-parser": "2.17.0", "fastq": "1.9.0", "p-wait-for": "3.1.0" diff --git a/ghost/job-manager/test/job-manager.test.js b/ghost/job-manager/test/job-manager.test.js index 696f013266..a5aa58cd9a 100644 --- a/ghost/job-manager/test/job-manager.test.js +++ b/ghost/job-manager/test/job-manager.test.js @@ -9,5 +9,18 @@ describe('Job Manager', function () { const jobManager = new JobManager(); should.exist(jobManager.addJob); + should.exist(jobManager.scheduleJob); + }); + + describe('Schedule Job', function () { + it ('fails to run for invalid scheduling expression', function () { + const jobManager = new JobManager(); + + try { + jobManager.scheduleJob(() => {}, {}, 'invalid expression'); + } catch (err) { + err.message.should.equal('Invalid schedule format'); + } + }); }); });