diff --git a/ghost/core/core/server/services/bulk-email/bulk-email-processor.js b/ghost/core/core/server/services/bulk-email/bulk-email-processor.js index c6b52b1898..e8493039fd 100644 --- a/ghost/core/core/server/services/bulk-email/bulk-email-processor.js +++ b/ghost/core/core/server/services/bulk-email/bulk-email-processor.js @@ -93,7 +93,7 @@ module.exports = { } catch (error) { return new FailedBatch(emailBatchId, error); } - }, {concurrency: 10}); + }, {concurrency: 2}); const successes = batchResults.filter(response => (response instanceof SuccessfulBatch)); const failures = batchResults.filter(response => (response instanceof FailedBatch)); diff --git a/ghost/email-service/lib/batch-sending-service.js b/ghost/email-service/lib/batch-sending-service.js index 65601f3d55..ffb77c1355 100644 --- a/ghost/email-service/lib/batch-sending-service.js +++ b/ghost/email-service/lib/batch-sending-service.js @@ -8,6 +8,8 @@ const messages = { emailError: 'An unexpected error occurred, please retry sending your newsletter.' }; +const MAX_SENDING_CONCURRENCY = 2; + /** * @typedef {import('./sending-service')} SendingService * @typedef {import('./email-segmenter')} EmailSegmenter @@ -267,8 +269,8 @@ class BatchSendingService { } }; - // Run maximum 10 at the same time - await Promise.all(new Array(10).fill(0).map(() => runNext())); + // Run maximum MAX_SENDING_CONCURRENCY at the same time + await Promise.all(new Array(MAX_SENDING_CONCURRENCY).fill(0).map(() => runNext())); if (succeededCount < batches.length) { if (succeededCount > 0) { diff --git a/ghost/email-service/lib/email-event-processor.js b/ghost/email-service/lib/email-event-processor.js index 4130212dcc..5cd6cdd3ce 100644 --- a/ghost/email-service/lib/email-event-processor.js +++ b/ghost/email-service/lib/email-event-processor.js @@ -1,5 +1,11 @@ const {EmailDeliveredEvent, EmailOpenedEvent, EmailBouncedEvent, SpamComplaintEvent, EmailUnsubscribedEvent, EmailTemporaryBouncedEvent} = require('@tryghost/email-events'); +async function waitForEvent() { + return new Promise((resolve) => { + setTimeout(resolve, 100); + }); +} + /** * @typedef EmailIdentification * @property {string} email @@ -43,6 +49,8 @@ class EmailEventProcessor { emailId: recipient.emailId, timestamp })); + // We cannot await the dispatched domainEvent, but we need to limit the number of events thare are processed at the same time + await waitForEvent(); } return recipient; } @@ -61,6 +69,8 @@ class EmailEventProcessor { emailId: recipient.emailId, timestamp })); + // We cannot await the dispatched domainEvent, but we need to limit the number of events thare are processed at the same time + await waitForEvent(); } return recipient; } @@ -81,6 +91,8 @@ class EmailEventProcessor { emailRecipientId: recipient.emailRecipientId, timestamp })); + // We cannot await the dispatched domainEvent, but we need to limit the number of events thare are processed at the same time + await waitForEvent(); } return recipient; } @@ -101,6 +113,8 @@ class EmailEventProcessor { emailRecipientId: recipient.emailRecipientId, timestamp })); + // We cannot await the dispatched domainEvent, but we need to limit the number of events thare are processed at the same time + await waitForEvent(); } return recipient; } @@ -118,6 +132,8 @@ class EmailEventProcessor { emailId: recipient.emailId, timestamp })); + // We cannot await the dispatched domainEvent, but we need to limit the number of events thare are processed at the same time + await waitForEvent(); } return recipient; } @@ -135,6 +151,8 @@ class EmailEventProcessor { emailId: recipient.emailId, timestamp })); + // We cannot await the dispatched domainEvent, but we need to limit the number of events thare are processed at the same time + await waitForEvent(); } return recipient; } diff --git a/ghost/email-service/test/batch-sending-service.test.js b/ghost/email-service/test/batch-sending-service.test.js index 7a2c88b463..ea7f55aaa1 100644 --- a/ghost/email-service/test/batch-sending-service.test.js +++ b/ghost/email-service/test/batch-sending-service.test.js @@ -440,7 +440,7 @@ describe('Batch Sending Service', function () { assert.equal(arg.batch, batches[0]); }); - it('Works for more than 10 batches', async function () { + it('Works for more than 2 batches', async function () { const service = new BatchSendingService({}); let runningCount = 0; let maxRunningCount = 0; @@ -461,7 +461,7 @@ describe('Batch Sending Service', function () { sinon.assert.callCount(sendBatch, 101); const sendBatches = sendBatch.getCalls().map(call => call.args[0].batch); assert.deepEqual(sendBatches, batches); - assert.equal(maxRunningCount, 10); + assert.equal(maxRunningCount, 2); }); it('Throws error if all batches fail', async function () { @@ -485,7 +485,7 @@ describe('Batch Sending Service', function () { sinon.assert.callCount(sendBatch, 101); const sendBatches = sendBatch.getCalls().map(call => call.args[0].batch); assert.deepEqual(sendBatches, batches); - assert.equal(maxRunningCount, 10); + assert.equal(maxRunningCount, 2); }); it('Throws error if a single batch fails', async function () { @@ -511,7 +511,7 @@ describe('Batch Sending Service', function () { sinon.assert.callCount(sendBatch, 101); const sendBatches = sendBatch.getCalls().map(call => call.args[0].batch); assert.deepEqual(sendBatches, batches); - assert.equal(maxRunningCount, 10); + assert.equal(maxRunningCount, 2); }); });