diff --git a/core/server/services/url/Queue.js b/core/server/services/url/Queue.js index c8bb70407f..42b714c87d 100644 --- a/core/server/services/url/Queue.js +++ b/core/server/services/url/Queue.js @@ -82,6 +82,7 @@ class Queue extends EventEmitter { if (!this.queue.hasOwnProperty(options.event)) { this.queue[options.event] = { tolerance: options.tolerance, + requiredSubscriberCount: options.requiredSubscriberCount || 0, subscribers: [] }; } @@ -136,7 +137,8 @@ class Queue extends EventEmitter { delete this.toNotify[action]; debug('ended (1)', event, action); this.emit('ended', event); - } else if (this.toNotify[action].timeoutInMS > this.queue[event].tolerance) { + } else if (this.queue[options.event].subscribers.length >= this.queue[options.event].requiredSubscriberCount && + this.toNotify[action].timeoutInMS > this.queue[event].tolerance) { delete this.toNotify[action]; debug('ended (2)', event, action); this.emit('ended', event); @@ -159,6 +161,7 @@ class Queue extends EventEmitter { if (!this.queue.hasOwnProperty(options.event)) { this.queue[options.event] = { tolerance: options.tolerance || 0, + requiredSubscriberCount: options.requiredSubscriberCount || 0, subscribers: [] }; } diff --git a/core/server/services/url/Resources.js b/core/server/services/url/Resources.js index 7ce88c746b..85d4a325a1 100644 --- a/core/server/services/url/Resources.js +++ b/core/server/services/url/Resources.js @@ -126,7 +126,8 @@ class Resources { // CASE: all resources are fetched, start the queue this.queue.start({ event: 'init', - tolerance: 100 + tolerance: 100, + requiredSubscriberCount: 1 }); }); } diff --git a/core/test/unit/services/url/Queue_spec.js b/core/test/unit/services/url/Queue_spec.js index c4dc0dbe8d..7412e493d5 100644 --- a/core/test/unit/services/url/Queue_spec.js +++ b/core/test/unit/services/url/Queue_spec.js @@ -241,5 +241,36 @@ describe('Unit: services/url/Queue', function () { timeoutInMS: 20 }); }); + + it('late subscribers', function (done) { + let notified = 0; + let called = 0; + + queue.addListener('ended', function (event) { + event.should.eql('nachos'); + notified.should.eql(1); + called.should.eql(1); + done(); + }); + + setTimeout(function () { + queue.register({ + event: 'nachos', + tolerance: 100, + timeoutInMS: 20, + requiredSubscriberCount: 1 + }, function () { + called = called + 1; + notified = notified + 1; + }); + }, 500); + + queue.start({ + event: 'nachos', + tolerance: 60, + timeoutInMS: 20, + requiredSubscriberCount: 1 + }); + }); }); });