mirror of
https://github.com/TryGhost/Ghost.git
synced 2025-02-17 23:44:39 -05:00
🐛 Fixed race condition when sending email (#15829)
refs https://github.com/TryGhost/Team/issues/2246 - This change helps avoid race conditions due to a lack of a transaction in the email job. It also moves the status check before creating the email batches (can take a while) to prevent other timing issues in case the job got scheduled multiple times. - Sets the patch option to true when changing the status of an email batch. If we don't do this, the bookshelf-relations plugin might try to save relations too. This could have caused a 'no rows updated' error. - Added a test that tests if the email job can only run once - Added logging to batching logic
This commit is contained in:
parent
0452d35360
commit
69228b2947
3 changed files with 92 additions and 31 deletions
|
@ -70,28 +70,9 @@ module.exports = {
|
|||
FailedBatch,
|
||||
|
||||
// accepts an ID rather than an Email model to better support running via a job queue
|
||||
async processEmail({emailId, options}) {
|
||||
async processEmail({emailModel, options}) {
|
||||
const knexOptions = _.pick(options, ['transacting', 'forUpdate']);
|
||||
const emailModel = await models.Email.findOne({id: emailId}, knexOptions);
|
||||
|
||||
if (!emailModel) {
|
||||
throw new errors.IncorrectUsageError({
|
||||
message: 'Provided email id does not match a known email record',
|
||||
context: {
|
||||
id: emailId
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (emailModel.get('status') !== 'pending') {
|
||||
throw new errors.IncorrectUsageError({
|
||||
message: 'Emails can only be processed when in the "pending" state',
|
||||
context: `Email "${emailId}" has state "${emailModel.get('status')}"`,
|
||||
code: 'EMAIL_NOT_PENDING'
|
||||
});
|
||||
}
|
||||
|
||||
await emailModel.save({status: 'submitting'}, Object.assign({}, knexOptions, {patch: true}));
|
||||
const emailId = emailModel.get('id');
|
||||
|
||||
// get batch IDs via knex to avoid model instantiation
|
||||
// only fetch pending or failed batches to avoid re-sending previously sent emails
|
||||
|
@ -141,6 +122,8 @@ module.exports = {
|
|||
|
||||
// accepts an ID rather than an EmailBatch model to better support running via a job queue
|
||||
async processEmailBatch({emailBatchId, options, memberSegment}) {
|
||||
logging.info('[sendEmailJob] Processing email batch ' + emailBatchId);
|
||||
|
||||
const knexOptions = _.pick(options, ['transacting', 'forUpdate']);
|
||||
|
||||
const emailBatchModel = await models.EmailBatch
|
||||
|
@ -166,7 +149,8 @@ module.exports = {
|
|||
const recipientRows = await models.EmailRecipient
|
||||
.getFilteredCollectionQuery({filter: `batch_id:${emailBatchId}`});
|
||||
|
||||
await emailBatchModel.save({status: 'submitting'}, knexOptions);
|
||||
// Patch to prevent saving the related email model
|
||||
await emailBatchModel.save({status: 'submitting'}, {...knexOptions, patch: true});
|
||||
|
||||
try {
|
||||
// Load newsletter data on email
|
||||
|
@ -178,14 +162,18 @@ module.exports = {
|
|||
// send the email
|
||||
const sendResponse = await this.send(emailBatchModel.relations.email.toJSON(), recipientRows, memberSegment);
|
||||
|
||||
logging.info('[sendEmailJob] Submitted email batch ' + emailBatchId);
|
||||
|
||||
// update batch success status
|
||||
return await emailBatchModel.save({
|
||||
status: 'submitted',
|
||||
provider_id: sendResponse.id.trim().replace(/^<|>$/g, '')
|
||||
}, Object.assign({}, knexOptions, {patch: true}));
|
||||
} catch (error) {
|
||||
logging.info('[sendEmailJob] Failed email batch ' + emailBatchId);
|
||||
|
||||
// update batch failed status
|
||||
await emailBatchModel.save({status: 'failed'}, knexOptions);
|
||||
await emailBatchModel.save({status: 'failed'}, {...knexOptions, patch: true});
|
||||
|
||||
// log any error that didn't come from the provider which would have already logged it
|
||||
if (!error.code || error.code !== 'BULK_EMAIL_SEND_FAILED') {
|
||||
|
@ -213,6 +201,8 @@ module.exports = {
|
|||
* @returns {Promise<Object>} - {providerId: 'xxx'}
|
||||
*/
|
||||
async send(emailData, recipients, memberSegment) {
|
||||
logging.info(`[sendEmailJob] Sending email batch to ${recipients.length} recipients`);
|
||||
|
||||
const mailgunConfigured = mailgunClient.isConfigured();
|
||||
if (!mailgunConfigured) {
|
||||
logging.warn('Bulk email has not been configured');
|
||||
|
@ -252,6 +242,7 @@ module.exports = {
|
|||
try {
|
||||
const response = await mailgunClient.send(emailData, recipientData, replacements);
|
||||
debug(`sent message (${Date.now() - startTime}ms)`);
|
||||
logging.info(`[sendEmailJob] Sent message (${Date.now() - startTime}ms)`);
|
||||
return response;
|
||||
} catch (err) {
|
||||
let ghostError = new errors.EmailError({
|
||||
|
|
|
@ -283,13 +283,14 @@ async function pendingEmailHandler(emailModel, options) {
|
|||
if (!process.env.NODE_ENV.startsWith('test')) {
|
||||
return jobsService.addJob({
|
||||
job: sendEmailJob,
|
||||
data: {emailModel},
|
||||
data: {emailId: emailModel.id},
|
||||
offloaded: false
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async function sendEmailJob({emailModel, options}) {
|
||||
async function sendEmailJob({emailId, options}) {
|
||||
logging.info('[sendEmailJob] Started for ' + emailId);
|
||||
let startEmailSend = null;
|
||||
|
||||
try {
|
||||
|
@ -304,10 +305,45 @@ async function sendEmailJob({emailModel, options}) {
|
|||
await limitService.errorIfWouldGoOverLimit('emails');
|
||||
}
|
||||
|
||||
// Check if the email is still pending. And set the status to submitting in one transaction.
|
||||
let hasSingleAccess = false;
|
||||
let emailModel;
|
||||
await models.Base.transaction(async (transacting) => {
|
||||
const knexOptions = {...options, transacting, forUpdate: true};
|
||||
emailModel = await models.Email.findOne({id: emailId}, knexOptions);
|
||||
|
||||
if (!emailModel) {
|
||||
throw new errors.IncorrectUsageError({
|
||||
message: 'Provided email id does not match a known email record',
|
||||
context: {
|
||||
id: emailId
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (emailModel.get('status') !== 'pending') {
|
||||
// We don't throw this, because we don't want to mark this email as failed
|
||||
logging.error(new errors.IncorrectUsageError({
|
||||
message: 'Emails can only be processed when in the "pending" state',
|
||||
context: `Email "${emailId}" has state "${emailModel.get('status')}"`,
|
||||
code: 'EMAIL_NOT_PENDING'
|
||||
}));
|
||||
return;
|
||||
}
|
||||
|
||||
await emailModel.save({status: 'submitting'}, Object.assign({}, knexOptions, {patch: true}));
|
||||
hasSingleAccess = true;
|
||||
});
|
||||
|
||||
if (!hasSingleAccess || !emailModel) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Create email batch and recipient rows unless this is a retry and they already exist
|
||||
const existingBatchCount = await emailModel.related('emailBatches').count('id');
|
||||
|
||||
if (existingBatchCount === 0) {
|
||||
logging.info('[sendEmailJob] Creating new batches for ' + emailId);
|
||||
let newBatchCount = 0;
|
||||
|
||||
await models.Base.transaction(async (transacting) => {
|
||||
|
@ -316,15 +352,23 @@ async function sendEmailJob({emailModel, options}) {
|
|||
});
|
||||
|
||||
if (newBatchCount === 0) {
|
||||
logging.info('[sendEmailJob] No batches created for ' + emailId);
|
||||
await emailModel.save({status: 'submitted'}, {patch: true});
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
debug('sendEmailJob: sending email');
|
||||
startEmailSend = Date.now();
|
||||
await bulkEmailService.processEmail({emailId: emailModel.get('id'), options});
|
||||
await bulkEmailService.processEmail({emailModel, options});
|
||||
debug(`sendEmailJob: sent email (${Date.now() - startEmailSend}ms)`);
|
||||
} catch (error) {
|
||||
if (startEmailSend) {
|
||||
logging.info(`[sendEmailJob] Failed sending ${emailId} (${Date.now() - startEmailSend}ms)`);
|
||||
} else {
|
||||
logging.info(`[sendEmailJob] Failed sending ${emailId}`);
|
||||
}
|
||||
|
||||
if (startEmailSend) {
|
||||
debug(`sendEmailJob: send email failed (${Date.now() - startEmailSend}ms)`);
|
||||
}
|
||||
|
@ -334,10 +378,10 @@ async function sendEmailJob({emailModel, options}) {
|
|||
errorMessage = errorMessage.substring(0, 2000);
|
||||
}
|
||||
|
||||
await emailModel.save({
|
||||
await models.Email.edit({
|
||||
status: 'failed',
|
||||
error: errorMessage
|
||||
}, {patch: true});
|
||||
}, {id: emailId});
|
||||
|
||||
throw new errors.InternalServerError({
|
||||
err: error,
|
||||
|
@ -514,6 +558,7 @@ async function createEmailBatches({emailModel, memberRows, memberSegment, option
|
|||
const batches = _.chunk(memberRows, bulkEmailService.BATCH_SIZE);
|
||||
const batchIds = await Promise.mapSeries(batches, storeRecipientBatch);
|
||||
debug(`createEmailBatches: stored recipient list (${Date.now() - startOfRecipientStorage}ms)`);
|
||||
logging.info(`[createEmailBatches] stored recipient list (${Date.now() - startOfRecipientStorage}ms)`);
|
||||
|
||||
return batchIds;
|
||||
}
|
||||
|
|
|
@ -79,12 +79,37 @@ describe('MEGA', function () {
|
|||
const emailModel = await createPublishedPostEmail();
|
||||
|
||||
// Launch email job
|
||||
await _sendEmailJob({emailModel, options: {}});
|
||||
await _sendEmailJob({emailId: emailModel.id, options: {}});
|
||||
|
||||
await emailModel.refresh();
|
||||
emailModel.get('status').should.eql('submitted');
|
||||
});
|
||||
|
||||
it('Protects the email job from being run multiple times at the same time', async function () {
|
||||
sinon.stub(_mailgunClient, 'getInstance').returns({});
|
||||
sinon.stub(_mailgunClient, 'send').callsFake(async () => {
|
||||
return {
|
||||
id: 'stubbed-email-id'
|
||||
};
|
||||
});
|
||||
|
||||
// Prepare a post and email model
|
||||
const emailModel = await createPublishedPostEmail();
|
||||
|
||||
// Launch a lot of email jobs in the hope to mimic a possible race condition
|
||||
const promises = [];
|
||||
for (let i = 0; i < 100; i++) {
|
||||
promises.push(_sendEmailJob({emailId: emailModel.id, options: {}}));
|
||||
}
|
||||
await Promise.all(promises);
|
||||
|
||||
await emailModel.refresh();
|
||||
assert.equal(emailModel.get('status'), 'submitted');
|
||||
|
||||
const batchCount = await emailModel.related('emailBatches').count('id');
|
||||
assert.equal(batchCount, 1, 'Should only have created one batch');
|
||||
});
|
||||
|
||||
it('Can handle a failed post email', async function () {
|
||||
sinon.stub(_mailgunClient, 'getInstance').returns({});
|
||||
sinon.stub(_mailgunClient, 'send').callsFake(async () => {
|
||||
|
@ -95,7 +120,7 @@ describe('MEGA', function () {
|
|||
const emailModel = await createPublishedPostEmail();
|
||||
|
||||
// Launch email job
|
||||
await _sendEmailJob({emailModel, options: {}});
|
||||
await _sendEmailJob({emailId: emailModel.id, options: {}});
|
||||
|
||||
await emailModel.refresh();
|
||||
emailModel.get('status').should.eql('failed');
|
||||
|
@ -143,7 +168,7 @@ describe('MEGA', function () {
|
|||
const emailModel = await createPublishedPostEmail();
|
||||
|
||||
// Launch email job
|
||||
await _sendEmailJob({emailModel, options: {}});
|
||||
await _sendEmailJob({emailId: emailModel.id, options: {}});
|
||||
|
||||
await emailModel.refresh();
|
||||
emailModel.get('status').should.eql('submitted');
|
||||
|
|
Loading…
Add table
Reference in a new issue