From 69228b29472ecaf3bc4e9d97cb83a283cb3a4a7a Mon Sep 17 00:00:00 2001 From: Simon Backx Date: Thu, 17 Nov 2022 13:36:52 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Fixed=20race=20condition=20when?= =?UTF-8?q?=20sending=20email=20(#15829)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit refs https://github.com/TryGhost/Team/issues/2246 - This change helps avoid race conditions due to a lack of a transaction in the email job. It also moves the status check before creating the email batches (can take a while) to prevent other timing issues in case the job got scheduled multiple times. - Sets the patch option to true when changing the status of an email batch. If we don't do this, the bookshelf-relations plugin might try to save relations too. This could have caused a 'no rows updated' error. - Added a test that tests if the email job can only run once - Added logging to batching logic --- .../bulk-email/bulk-email-processor.js | 37 +++++-------- ghost/core/core/server/services/mega/mega.js | 55 +++++++++++++++++-- .../test/integration/services/mega.test.js | 31 ++++++++++- 3 files changed, 92 insertions(+), 31 deletions(-) diff --git a/ghost/core/core/server/services/bulk-email/bulk-email-processor.js b/ghost/core/core/server/services/bulk-email/bulk-email-processor.js index 10f6743321..1bb3ec47ea 100644 --- a/ghost/core/core/server/services/bulk-email/bulk-email-processor.js +++ b/ghost/core/core/server/services/bulk-email/bulk-email-processor.js @@ -70,28 +70,9 @@ module.exports = { FailedBatch, // accepts an ID rather than an Email model to better support running via a job queue - async processEmail({emailId, options}) { + async processEmail({emailModel, options}) { const knexOptions = _.pick(options, ['transacting', 'forUpdate']); - const emailModel = await models.Email.findOne({id: emailId}, knexOptions); - - if (!emailModel) { - throw new errors.IncorrectUsageError({ - message: 'Provided email id does not match a known email record', - context: { - id: emailId - } - }); - } - - if (emailModel.get('status') !== 'pending') { - throw new errors.IncorrectUsageError({ - message: 'Emails can only be processed when in the "pending" state', - context: `Email "${emailId}" has state "${emailModel.get('status')}"`, - code: 'EMAIL_NOT_PENDING' - }); - } - - await emailModel.save({status: 'submitting'}, Object.assign({}, knexOptions, {patch: true})); + const emailId = emailModel.get('id'); // get batch IDs via knex to avoid model instantiation // only fetch pending or failed batches to avoid re-sending previously sent emails @@ -141,6 +122,8 @@ module.exports = { // accepts an ID rather than an EmailBatch model to better support running via a job queue async processEmailBatch({emailBatchId, options, memberSegment}) { + logging.info('[sendEmailJob] Processing email batch ' + emailBatchId); + const knexOptions = _.pick(options, ['transacting', 'forUpdate']); const emailBatchModel = await models.EmailBatch @@ -166,7 +149,8 @@ module.exports = { const recipientRows = await models.EmailRecipient .getFilteredCollectionQuery({filter: `batch_id:${emailBatchId}`}); - await emailBatchModel.save({status: 'submitting'}, knexOptions); + // Patch to prevent saving the related email model + await emailBatchModel.save({status: 'submitting'}, {...knexOptions, patch: true}); try { // Load newsletter data on email @@ -178,14 +162,18 @@ module.exports = { // send the email const sendResponse = await this.send(emailBatchModel.relations.email.toJSON(), recipientRows, memberSegment); + logging.info('[sendEmailJob] Submitted email batch ' + emailBatchId); + // update batch success status return await emailBatchModel.save({ status: 'submitted', provider_id: sendResponse.id.trim().replace(/^<|>$/g, '') }, Object.assign({}, knexOptions, {patch: true})); } catch (error) { + logging.info('[sendEmailJob] Failed email batch ' + emailBatchId); + // update batch failed status - await emailBatchModel.save({status: 'failed'}, knexOptions); + await emailBatchModel.save({status: 'failed'}, {...knexOptions, patch: true}); // log any error that didn't come from the provider which would have already logged it if (!error.code || error.code !== 'BULK_EMAIL_SEND_FAILED') { @@ -213,6 +201,8 @@ module.exports = { * @returns {Promise} - {providerId: 'xxx'} */ async send(emailData, recipients, memberSegment) { + logging.info(`[sendEmailJob] Sending email batch to ${recipients.length} recipients`); + const mailgunConfigured = mailgunClient.isConfigured(); if (!mailgunConfigured) { logging.warn('Bulk email has not been configured'); @@ -252,6 +242,7 @@ module.exports = { try { const response = await mailgunClient.send(emailData, recipientData, replacements); debug(`sent message (${Date.now() - startTime}ms)`); + logging.info(`[sendEmailJob] Sent message (${Date.now() - startTime}ms)`); return response; } catch (err) { let ghostError = new errors.EmailError({ diff --git a/ghost/core/core/server/services/mega/mega.js b/ghost/core/core/server/services/mega/mega.js index 921d3a82f2..8caac3ae08 100644 --- a/ghost/core/core/server/services/mega/mega.js +++ b/ghost/core/core/server/services/mega/mega.js @@ -283,13 +283,14 @@ async function pendingEmailHandler(emailModel, options) { if (!process.env.NODE_ENV.startsWith('test')) { return jobsService.addJob({ job: sendEmailJob, - data: {emailModel}, + data: {emailId: emailModel.id}, offloaded: false }); } } -async function sendEmailJob({emailModel, options}) { +async function sendEmailJob({emailId, options}) { + logging.info('[sendEmailJob] Started for ' + emailId); let startEmailSend = null; try { @@ -304,10 +305,45 @@ async function sendEmailJob({emailModel, options}) { await limitService.errorIfWouldGoOverLimit('emails'); } + // Check if the email is still pending. And set the status to submitting in one transaction. + let hasSingleAccess = false; + let emailModel; + await models.Base.transaction(async (transacting) => { + const knexOptions = {...options, transacting, forUpdate: true}; + emailModel = await models.Email.findOne({id: emailId}, knexOptions); + + if (!emailModel) { + throw new errors.IncorrectUsageError({ + message: 'Provided email id does not match a known email record', + context: { + id: emailId + } + }); + } + + if (emailModel.get('status') !== 'pending') { + // We don't throw this, because we don't want to mark this email as failed + logging.error(new errors.IncorrectUsageError({ + message: 'Emails can only be processed when in the "pending" state', + context: `Email "${emailId}" has state "${emailModel.get('status')}"`, + code: 'EMAIL_NOT_PENDING' + })); + return; + } + + await emailModel.save({status: 'submitting'}, Object.assign({}, knexOptions, {patch: true})); + hasSingleAccess = true; + }); + + if (!hasSingleAccess || !emailModel) { + return; + } + // Create email batch and recipient rows unless this is a retry and they already exist const existingBatchCount = await emailModel.related('emailBatches').count('id'); if (existingBatchCount === 0) { + logging.info('[sendEmailJob] Creating new batches for ' + emailId); let newBatchCount = 0; await models.Base.transaction(async (transacting) => { @@ -316,15 +352,23 @@ async function sendEmailJob({emailModel, options}) { }); if (newBatchCount === 0) { + logging.info('[sendEmailJob] No batches created for ' + emailId); + await emailModel.save({status: 'submitted'}, {patch: true}); return; } } debug('sendEmailJob: sending email'); startEmailSend = Date.now(); - await bulkEmailService.processEmail({emailId: emailModel.get('id'), options}); + await bulkEmailService.processEmail({emailModel, options}); debug(`sendEmailJob: sent email (${Date.now() - startEmailSend}ms)`); } catch (error) { + if (startEmailSend) { + logging.info(`[sendEmailJob] Failed sending ${emailId} (${Date.now() - startEmailSend}ms)`); + } else { + logging.info(`[sendEmailJob] Failed sending ${emailId}`); + } + if (startEmailSend) { debug(`sendEmailJob: send email failed (${Date.now() - startEmailSend}ms)`); } @@ -334,10 +378,10 @@ async function sendEmailJob({emailModel, options}) { errorMessage = errorMessage.substring(0, 2000); } - await emailModel.save({ + await models.Email.edit({ status: 'failed', error: errorMessage - }, {patch: true}); + }, {id: emailId}); throw new errors.InternalServerError({ err: error, @@ -514,6 +558,7 @@ async function createEmailBatches({emailModel, memberRows, memberSegment, option const batches = _.chunk(memberRows, bulkEmailService.BATCH_SIZE); const batchIds = await Promise.mapSeries(batches, storeRecipientBatch); debug(`createEmailBatches: stored recipient list (${Date.now() - startOfRecipientStorage}ms)`); + logging.info(`[createEmailBatches] stored recipient list (${Date.now() - startOfRecipientStorage}ms)`); return batchIds; } diff --git a/ghost/core/test/integration/services/mega.test.js b/ghost/core/test/integration/services/mega.test.js index 7a46b50095..fd186f8e28 100644 --- a/ghost/core/test/integration/services/mega.test.js +++ b/ghost/core/test/integration/services/mega.test.js @@ -79,12 +79,37 @@ describe('MEGA', function () { const emailModel = await createPublishedPostEmail(); // Launch email job - await _sendEmailJob({emailModel, options: {}}); + await _sendEmailJob({emailId: emailModel.id, options: {}}); await emailModel.refresh(); emailModel.get('status').should.eql('submitted'); }); + it('Protects the email job from being run multiple times at the same time', async function () { + sinon.stub(_mailgunClient, 'getInstance').returns({}); + sinon.stub(_mailgunClient, 'send').callsFake(async () => { + return { + id: 'stubbed-email-id' + }; + }); + + // Prepare a post and email model + const emailModel = await createPublishedPostEmail(); + + // Launch a lot of email jobs in the hope to mimic a possible race condition + const promises = []; + for (let i = 0; i < 100; i++) { + promises.push(_sendEmailJob({emailId: emailModel.id, options: {}})); + } + await Promise.all(promises); + + await emailModel.refresh(); + assert.equal(emailModel.get('status'), 'submitted'); + + const batchCount = await emailModel.related('emailBatches').count('id'); + assert.equal(batchCount, 1, 'Should only have created one batch'); + }); + it('Can handle a failed post email', async function () { sinon.stub(_mailgunClient, 'getInstance').returns({}); sinon.stub(_mailgunClient, 'send').callsFake(async () => { @@ -95,7 +120,7 @@ describe('MEGA', function () { const emailModel = await createPublishedPostEmail(); // Launch email job - await _sendEmailJob({emailModel, options: {}}); + await _sendEmailJob({emailId: emailModel.id, options: {}}); await emailModel.refresh(); emailModel.get('status').should.eql('failed'); @@ -143,7 +168,7 @@ describe('MEGA', function () { const emailModel = await createPublishedPostEmail(); // Launch email job - await _sendEmailJob({emailModel, options: {}}); + await _sendEmailJob({emailId: emailModel.id, options: {}}); await emailModel.refresh(); emailModel.get('status').should.eql('submitted');