mirror of
https://github.com/TryGhost/Ghost.git
synced 2025-01-20 22:42:53 -05:00
207 lines
7.3 KiB
JavaScript
207 lines
7.3 KiB
JavaScript
|
const debug = require('ghost-ignition').debug('services:url:queue'),
|
||
|
EventEmitter = require('events').EventEmitter,
|
||
|
_ = require('lodash'),
|
||
|
common = require('../../lib/common');
|
||
|
|
||
|
/**
|
||
|
* ### Purpose of this queue
|
||
|
*
|
||
|
* Ghost fetches as earliest as possible the resources from the database. The reason is simply: we
|
||
|
* want to know all urls as soon as possible.
|
||
|
*
|
||
|
* Parallel to this, the routes/channels are read/prepared and registered in express.
|
||
|
* So the challenge is to handle both resource availability and route registration.
|
||
|
* If you start an event, all subscribers of it are executed in a sequence. The queue will wait
|
||
|
* till the current subscriber has finished it's work.
|
||
|
* The url service must ensure that each url in the system exists once. The order of
|
||
|
* subscribers defines who will possibly own an url first.
|
||
|
*
|
||
|
* If an event has finished, the subscribers of this event still remain in the queue.
|
||
|
* That means:
|
||
|
*
|
||
|
* - you can re-run an event
|
||
|
* - you can add more subscribers to an existing queue
|
||
|
* - you can order subscribers (helpful if you want to order routes/channels)
|
||
|
*
|
||
|
* Each subscriber represents one instance of the url generator. One url generator represents one channel/route.
|
||
|
*
|
||
|
* ### Tolerance option
|
||
|
*
|
||
|
* You can define a tolerance value per event. If you want to wait an amount of time till you think
|
||
|
* all subscribers have registered.
|
||
|
*
|
||
|
* ### Some examples to understand cases
|
||
|
*
|
||
|
* e.g.
|
||
|
* - resources have been loaded, event has started
|
||
|
* - no subscribers yet, we need to wait, express still initialises
|
||
|
* - okay, routes are coming in
|
||
|
* - we notify the subscribers
|
||
|
*
|
||
|
* e.g.
|
||
|
* - resources are in the progress of fetching from the database
|
||
|
* - routes are already waiting for the resources
|
||
|
*
|
||
|
* e.g.
|
||
|
* - resources are in the progress of fetching from the database
|
||
|
* - 2 subscribers are already registered
|
||
|
* - resources finished, event starts
|
||
|
* - 2 more subscribers are coming in
|
||
|
|
||
|
* ### Events
|
||
|
* - unique events e.g. added, updated, init, all
|
||
|
* - has subscribers
|
||
|
* - we remember the subscriber
|
||
|
*
|
||
|
* ### Actions
|
||
|
* - one event can have multiple actions
|
||
|
* - unique actions e.g. add post 1, add post 2
|
||
|
* - one event might only allow a single action to avoid collisions e.g. you initialise data twice
|
||
|
* - if an event has no action, the name of the action is the name of the event
|
||
|
* - in this case the event can only run once at a time
|
||
|
* - makes use of `toNotify` to remember who was notified already
|
||
|
*/
|
||
|
class Queue extends EventEmitter {
|
||
|
constructor() {
|
||
|
super();
|
||
|
this.queue = {};
|
||
|
this.toNotify = {};
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* `tolerance`:
|
||
|
* - 0: don't wait for more subscribers [default]
|
||
|
* - 100: wait long enough till all subscribers have registered (e.g. bootstrap)
|
||
|
*/
|
||
|
register(options, fn) {
|
||
|
if (!options.hasOwnProperty('tolerance')) {
|
||
|
options.tolerance = 0;
|
||
|
}
|
||
|
|
||
|
// CASE: nobody has initialised the queue event yet
|
||
|
if (!this.queue.hasOwnProperty(options.event)) {
|
||
|
this.queue[options.event] = {
|
||
|
tolerance: options.tolerance,
|
||
|
subscribers: []
|
||
|
};
|
||
|
}
|
||
|
|
||
|
debug('add', options.event, options.tolerance);
|
||
|
|
||
|
this.queue[options.event].subscribers.push(fn);
|
||
|
}
|
||
|
|
||
|
run(options) {
|
||
|
const event = options.event,
|
||
|
action = options.action,
|
||
|
eventData = options.eventData;
|
||
|
|
||
|
clearTimeout(this.toNotify[action].timeout);
|
||
|
this.toNotify[action].timeout = null;
|
||
|
|
||
|
debug('run', action, event, this.queue[event].subscribers.length, this.toNotify[action].notified.length);
|
||
|
|
||
|
if (this.queue[event].subscribers.length && this.queue[event].subscribers.length !== this.toNotify[action].notified.length) {
|
||
|
const fn = this.queue[event].subscribers[this.toNotify[action].notified.length];
|
||
|
|
||
|
debug('execute', action, event, this.toNotify[action].notified.length);
|
||
|
|
||
|
/**
|
||
|
* Currently no async operations happen in the subscribers functions.
|
||
|
* We can trigger the functions sync.
|
||
|
*/
|
||
|
try {
|
||
|
fn(eventData);
|
||
|
|
||
|
debug('executed', action, event, this.toNotify[action].notified.length);
|
||
|
this.toNotify[action].notified.push(fn);
|
||
|
this.run(options);
|
||
|
} catch (err) {
|
||
|
debug('error', err.message);
|
||
|
|
||
|
common.logging.error(new common.errors.InternalServerError({
|
||
|
message: 'Something bad happened.',
|
||
|
code: 'SERVICES_URL_QUEUE',
|
||
|
err: err
|
||
|
}));
|
||
|
|
||
|
// just try again
|
||
|
this.run(options);
|
||
|
}
|
||
|
} else {
|
||
|
// CASE 1: zero tolerance, kill run fn
|
||
|
// CASE 2: okay, i was tolerant enough, kill me
|
||
|
// CASE 3: wait for more subscribers, i am still tolerant
|
||
|
if (this.queue[event].tolerance === 0) {
|
||
|
delete this.toNotify[action];
|
||
|
debug('ended (1)', event, action);
|
||
|
this.emit('ended', event);
|
||
|
} else if (this.toNotify[action].timeoutInMS > this.queue[event].tolerance) {
|
||
|
delete this.toNotify[action];
|
||
|
debug('ended (2)', event, action);
|
||
|
this.emit('ended', event);
|
||
|
} else {
|
||
|
this.toNotify[action].timeoutInMS = this.toNotify[action].timeoutInMS * 1.1;
|
||
|
|
||
|
this.toNotify[action].timeout = setTimeout(() => {
|
||
|
this.run(options);
|
||
|
}, this.toNotify[action].timeoutInMS);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
start(options) {
|
||
|
debug('start');
|
||
|
|
||
|
// CASE: nobody is in the event queue waiting yet
|
||
|
// e.g. all resources are fetched already, but no subscribers (bootstrap)
|
||
|
// happens only for high tolerant events
|
||
|
if (!this.queue.hasOwnProperty(options.event)) {
|
||
|
this.queue[options.event] = {
|
||
|
tolerance: options.tolerance || 0,
|
||
|
subscribers: []
|
||
|
};
|
||
|
}
|
||
|
|
||
|
// an event doesn't need an action
|
||
|
if (!options.action) {
|
||
|
options.action = options.event;
|
||
|
}
|
||
|
|
||
|
// CASE 1: the queue supports killing an event e.g. resource edit is triggered twice very fast
|
||
|
// CASE 2: is the action already running, stop it, because e.g. configuration has changed
|
||
|
if (this.toNotify[options.action]) {
|
||
|
// CASE: timeout was registered, kill it, this will stop the run function of this action
|
||
|
if (this.toNotify[options.action].timeout) {
|
||
|
clearTimeout(this.toNotify[options.action].timeout);
|
||
|
this.toNotify[options.action].timeout = null;
|
||
|
} else {
|
||
|
debug('ignore. is already running', options.event, options.action);
|
||
|
return;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// reset who was already notified
|
||
|
this.toNotify[options.action] = {
|
||
|
event: options.event,
|
||
|
timeoutInMS: options.timeoutInMS || 50,
|
||
|
notified: []
|
||
|
};
|
||
|
|
||
|
this.emit('started', options.event);
|
||
|
this.run(options);
|
||
|
}
|
||
|
|
||
|
reset() {
|
||
|
this.queue = {};
|
||
|
|
||
|
_.each(this.toNotify, (obj) => {
|
||
|
clearTimeout(obj.timeout);
|
||
|
});
|
||
|
|
||
|
this.toNotify = {};
|
||
|
}
|
||
|
}
|
||
|
|
||
|
module.exports = Queue;
|