0
Fork 0
mirror of https://github.com/TryGhost/Ghost.git synced 2025-04-15 03:01:37 -05:00

Added workerMessageHandler option to ctr options

refs https://github.com/TryGhost/Ghost/issues/12496

- `workerMessageHandler` option allows for custom worker message handling and allows to eliminate a need for loggers of any type inside of jobs.
- removing loggers from jobs solves file hanle leak which used to cause Ghost process to crash (see referenced issue)
This commit is contained in:
Naz 2021-02-22 19:10:47 +13:00
parent 0e1c2ececb
commit 55060e323c
3 changed files with 43 additions and 2 deletions

View file

@ -30,8 +30,9 @@ class JobManager {
* @param {Object} options
* @param {Object} [options.logging] - custom logging handler, defaults to console
* @param {Function} [options.errorHandler] - custom job error handler
* @param {Function} [options.workerMessageHandler] - custom message handler coming from workers
*/
constructor({logging, errorHandler}) {
constructor({logging, errorHandler, workerMessageHandler}) {
this.queue = fastq(this, worker, 1);
this.bree = new Bree({
@ -39,7 +40,8 @@ class JobManager {
hasSeconds: true, // precision is needed to avoid task overlaps after immediate execution
outputWorkerMetadata: true,
logger: logging,
errorHandler: errorHandler
errorHandler: errorHandler,
workerMessageHandler: workerMessageHandler
});
this.logging = logging;

View file

@ -225,6 +225,26 @@ describe('Job Manager', function () {
should(spyHandler.args[0][0].message).equal('job error');
should(spyHandler.args[0][1].name).equal('will-fail');
});
it('uses worker message handler when job sends a message', async function (){
const workerMessageHandlerSpy = sinon.spy();
const jobManager = new JobManager({logging, workerMessageHandler: workerMessageHandlerSpy});
jobManager.addJob({
job: path.resolve(__dirname, './jobs/message.js'),
name: 'will-send-msg'
});
jobManager.bree.run('will-send-msg');
jobManager.bree.workers['will-send-msg'].postMessage('hello from Ghost!');
// Give time for worker (worker thread) <-> parent process (job manager) communication
await delay(1000);
should(workerMessageHandlerSpy.called).be.true();
should(workerMessageHandlerSpy.args[0][0].name).equal('will-send-msg');
should(workerMessageHandlerSpy.args[0][0].message).equal('hello from Ghost!');
});
});
});

View file

@ -0,0 +1,19 @@
const {parentPort} = require('bthreads');
setInterval(() => { }, 10);
if (parentPort) {
parentPort.on('message', (message) => {
if (message === 'error') {
throw new Error('oops');
}
if (message === 'cancel') {
parentPort.postMessage('cancelled');
return;
}
parentPort.postMessage(message);
process.exit(0);
});
}