0
Fork 0
mirror of https://github.com/TryGhost/Ghost.git synced 2025-04-01 02:41:39 -05:00

Added automatic database query retrying to the email service (#16218)

fixes https://github.com/TryGhost/Team/issues/2512

The email sending is a crucial flow that should not be interrupted. If
there are database connection issues, we should try to recover from them
automatically by retrying individual database queries and transactions
for a limited amount of time.

This adds a helper method to retry database operations. It limits all
database queries before sending the actual email to maximum 10 minutes.
Database operations that happen after sending have a higher retry time
because they are more crucial to prevent data loss (e.g. saving that an
email was sent).
This commit is contained in:
Simon Backx 2023-02-02 14:12:54 +01:00 committed by GitHub
parent e82ed951f4
commit 06de10786f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 432 additions and 36 deletions

View file

@ -31,6 +31,10 @@ class BatchSendingService {
#db;
#sentry;
// Retry database queries happening before sending the email
#BEFORE_RETRY_CONFIG = {maxRetries: 10, maxTime: 10 * 60 * 1000, sleep: 2000};
#AFTER_RETRY_CONFIG = {maxRetries: 20, maxTime: 30 * 60 * 1000, sleep: 2000};
/**
* @param {Object} dependencies
* @param {EmailRenderer} dependencies.emailRenderer
@ -44,6 +48,8 @@ class BatchSendingService {
* @param {object} dependencies.models.Member
* @param {object} dependencies.db
* @param {object} [dependencies.sentry]
* @param {object} [dependencies.BEFORE_RETRY_CONFIG]
* @param {object} [dependencies.AFTER_RETRY_CONFIG]
*/
constructor({
emailRenderer,
@ -52,7 +58,9 @@ class BatchSendingService {
emailSegmenter,
models,
db,
sentry
sentry,
BEFORE_RETRY_CONFIG,
AFTER_RETRY_CONFIG
}) {
this.#emailRenderer = emailRenderer;
this.#sendingService = sendingService;
@ -61,6 +69,20 @@ class BatchSendingService {
this.#models = models;
this.#db = db;
this.#sentry = sentry;
if (BEFORE_RETRY_CONFIG) {
this.#BEFORE_RETRY_CONFIG = BEFORE_RETRY_CONFIG;
}
if (AFTER_RETRY_CONFIG) {
this.#AFTER_RETRY_CONFIG = AFTER_RETRY_CONFIG;
}
}
#getBeforeRetryConfig(email) {
if (email._retryCutOffTime) {
return {...this.#BEFORE_RETRY_CONFIG, stopAfterDate: email._retryCutOffTime};
}
return this.#BEFORE_RETRY_CONFIG;
}
/**
@ -84,22 +106,35 @@ class BatchSendingService {
async emailJob({emailId}) {
logging.info(`Starting email job for email ${emailId}`);
// We'll stop all automatic DB retries after this date
const retryCutOffTime = new Date(Date.now() + this.#BEFORE_RETRY_CONFIG.maxTime);
// Check if email is 'pending' only + change status to submitting in one transaction.
// This allows us to have a lock around the email job that makes sure an email can only have one active job.
let email = await this.updateStatusLock(this.#models.Email, emailId, 'submitting', ['pending', 'failed']);
let email = await this.retryDb(
async () => {
return await this.updateStatusLock(this.#models.Email, emailId, 'submitting', ['pending', 'failed']);
},
{...this.#BEFORE_RETRY_CONFIG, description: `updateStatusLock email ${emailId} -> submitting`}
);
if (!email) {
logging.error(`Tried sending email that is not pending or failed ${emailId}`);
return;
}
// Save a strict cutoff time for retries
email._retryCutOffTime = retryCutOffTime;
try {
await this.sendEmail(email);
await email.save({
status: 'submitted',
submitted_at: new Date(),
error: null
}, {patch: true, autoRefresh: false});
await this.retryDb(async () => {
await email.save({
status: 'submitted',
submitted_at: new Date(),
error: null
}, {patch: true, autoRefresh: false});
}, {...this.#AFTER_RETRY_CONFIG, description: `email ${emailId} -> submitted`});
} catch (e) {
const ghostError = new errors.EmailError({
err: e,
@ -113,11 +148,13 @@ class BatchSendingService {
this.#sentry.captureException(e);
}
// Edge case: Store error in email model (that are not caught by the batch)
await email.save({
status: 'failed',
error: e.message || 'Something went wrong while sending the email'
}, {patch: true, autoRefresh: false});
// Store error and status in email model
await this.retryDb(async () => {
await email.save({
status: 'failed',
error: e.message || 'Something went wrong while sending the email'
}, {patch: true, autoRefresh: false});
}, {...this.#AFTER_RETRY_CONFIG, description: `email ${emailId} -> failed`});
}
}
@ -130,10 +167,18 @@ class BatchSendingService {
logging.info(`Sending email ${email.id}`);
// Load required relations
const newsletter = await email.getLazyRelation('newsletter', {require: true});
const post = await email.getLazyRelation('post', {require: true, withRelated: ['posts_meta', 'authors']});
const newsletter = await this.retryDb(async () => {
return await email.getLazyRelation('newsletter', {require: true});
}, {...this.#getBeforeRetryConfig(email), description: `getLazyRelation newsletter for email ${email.id}`});
const post = await this.retryDb(async () => {
return await email.getLazyRelation('post', {require: true, withRelated: ['posts_meta', 'authors']});
}, {...this.#getBeforeRetryConfig(email), description: `getLazyRelation post for email ${email.id}`});
let batches = await this.retryDb(async () => {
return await this.getBatches(email);
}, {...this.#getBeforeRetryConfig(email), description: `getBatches for email ${email.id}`});
let batches = await this.getBatches(email);
if (batches.length === 0) {
batches = await this.createBatches({email, newsletter, post});
}
@ -186,7 +231,12 @@ class BatchSendingService {
if (members.length > 0) {
totalCount += Math.min(members.length, BATCH_SIZE);
const batch = await this.createBatch(email, segment, members.slice(0, BATCH_SIZE));
const batch = await this.retryDb(
async () => {
return await this.createBatch(email, segment, members.slice(0, BATCH_SIZE));
},
{...this.#getBeforeRetryConfig(email), description: `createBatch email ${email.id} segment ${segment}`}
);
batches.push(batch);
}
@ -307,7 +357,13 @@ class BatchSendingService {
logging.info(`Sending batch ${originalBatch.id} for email ${email.id}`);
// Check the status of the email batch in a 'for update' transaction
const batch = await this.updateStatusLock(this.#models.EmailBatch, originalBatch.id, 'submitting', ['pending', 'failed']);
const batch = await this.retryDb(
async () => {
return await this.updateStatusLock(this.#models.EmailBatch, originalBatch.id, 'submitting', ['pending', 'failed']);
},
{...this.#getBeforeRetryConfig(email), description: `updateStatusLock batch ${originalBatch.id} -> submitting`}
);
if (!batch) {
logging.error(`Tried sending email batch that is not pending or failed ${originalBatch.id}`);
return true;
@ -316,7 +372,23 @@ class BatchSendingService {
let succeeded = false;
try {
const members = await this.getBatchMembers(batch.id);
const members = await this.retryDb(
async () => {
const m = await this.getBatchMembers(batch.id);
// If we receive 0 rows, there is a possibility that we switched to a secondary database and have replication lag
// So we throw an error and we retry
if (m.length === 0) {
throw new errors.EmailError({
message: `No members found for batch ${batch.id}, possible replication lag`
});
}
return m;
},
{...this.#getBeforeRetryConfig(email), description: `getBatchMembers batch ${originalBatch.id}`}
);
const response = await this.#sendingService.send({
emailId: email.id,
post,
@ -327,16 +399,21 @@ class BatchSendingService {
openTrackingEnabled: !!email.get('track_opens'),
clickTrackingEnabled: !!email.get('track_clicks')
});
await batch.save({
status: 'submitted',
provider_id: response.id,
// reset error fields when sending succeeds
error_status_code: null,
error_message: null,
error_data: null
}, {patch: true, require: false, autoRefresh: false});
succeeded = true;
await this.retryDb(
async () => {
await batch.save({
status: 'submitted',
provider_id: response.id,
// reset error fields when sending succeeds
error_status_code: null,
error_message: null,
error_data: null
}, {patch: true, require: false, autoRefresh: false});
},
{...this.#AFTER_RETRY_CONFIG, description: `save batch ${originalBatch.id} -> submitted`}
);
} catch (err) {
if (!err.code || err.code !== 'BULK_EMAIL_SEND_FAILED') {
// BULK_EMAIL_SEND_FAILED are already logged in mailgun-email-provider
@ -354,18 +431,31 @@ class BatchSendingService {
}
}
await batch.save({
status: 'failed',
error_status_code: err.statusCode ?? null,
error_message: err.message,
error_data: err.errorDetails ?? null
}, {patch: true, require: false, autoRefresh: false});
if (!succeeded) {
// We check succeeded because a Rare edge case where the batch was send, but we failed to set status to submitted, then we don't want to set it to failed
await this.retryDb(
async () => {
await batch.save({
status: 'failed',
error_status_code: err.statusCode ?? null,
error_message: err.message,
error_data: err.errorDetails ?? null
}, {patch: true, require: false, autoRefresh: false});
},
{...this.#AFTER_RETRY_CONFIG, description: `save batch ${originalBatch.id} -> failed`}
);
}
}
// Mark as processed, even when failed
await this.#models.EmailRecipient
.where({batch_id: batch.id})
.save({processed_at: new Date()}, {patch: true, require: false, autoRefresh: false});
await this.retryDb(
async () => {
await this.#models.EmailRecipient
.where({batch_id: batch.id})
.save({processed_at: new Date()}, {patch: true, require: false, autoRefresh: false});
},
{...this.#AFTER_RETRY_CONFIG, description: `save EmailRecipients ${originalBatch.id} processed_at`}
);
return succeeded;
}
@ -411,6 +501,64 @@ class BatchSendingService {
});
return model;
}
/**
* @private
* Retry a function until it doesn't throw an error or the max retries / max time are reached.
* @template T
* @param {() => Promise<T>} func
* @param {object} options
* @param {string} options.description Used for logging
* @param {number} options.sleep time between each retry (ms), will get multiplied by the number of retries
* @param {number} options.maxRetries note: retries, not tries. So 0 means maximum 1 try, 1 means maximum 2 tries, etc.
* @param {number} [options.retryCount] (internal) Amount of retries already done. 0 intially.
* @param {number} [options.maxTime] (ms)
* @param {Date} [options.stopAfterDate]
* @returns {Promise<T>}
*/
async retryDb(func, options) {
if (options.maxTime !== undefined) {
const stopAfterDate = new Date(Date.now() + options.maxTime);
if (!options.stopAfterDate || stopAfterDate < options.stopAfterDate) {
options = {...options, stopAfterDate};
}
}
try {
return await func();
} catch (e) {
const retryCount = (options.retryCount ?? 0);
const sleep = (options.sleep ?? 0) * (retryCount + 1);
if (retryCount >= options.maxRetries || (options.stopAfterDate && (new Date(Date.now() + sleep)) > options.stopAfterDate)) {
const ghostError = new errors.EmailError({
err: e,
code: 'BULK_EMAIL_DB_RETRY',
message: `[BULK_EMAIL_DB_RETRY] ${options.description} - Stopped retrying`,
context: e.message
});
logging.error(ghostError);
throw e;
}
const ghostError = new errors.EmailError({
err: e,
code: 'BULK_EMAIL_DB_RETRY',
message: `[BULK_EMAIL_DB_RETRY] ${options.description} - After ${retryCount} retries`,
context: e.message
});
logging.error(ghostError);
if (sleep) {
await new Promise((resolve) => {
setTimeout(resolve, sleep);
});
}
return await this.retryDb(func, {...options, retryCount: retryCount + 1});
}
}
}
module.exports = BatchSendingService;

View file

@ -119,6 +119,43 @@ describe('Batch Sending Service', function () {
assert.equal(afterEmailModel.get('error'), 'Unexpected test error');
});
it('retries saving error state if sending fails', async function () {
const Email = createModelClass({
findOne: {
status: 'pending'
}
});
const service = new BatchSendingService({
models: {Email},
AFTER_RETRY_CONFIG: {maxRetries: 20, maxTime: 2000, sleep: 1}
});
let afterEmailModel;
const sendEmail = sinon.stub(service, 'sendEmail').callsFake((email) => {
afterEmailModel = email;
let called = 0;
const originalSave = email.save;
email.save = async function () {
called += 1;
if (called === 2) {
return await originalSave.call(this, ...arguments);
}
throw new Error('Database connection error');
};
return Promise.reject(new Error('Unexpected test error'));
});
const result = await service.emailJob({emailId: '123'});
assert.equal(result, undefined);
sinon.assert.calledTwice(errorLog);
const loggedExeption = errorLog.getCall(1).args[0];
assert.match(loggedExeption.message, /\[BULK_EMAIL_DB_RETRY\] email 123 -> failed/);
assert.match(loggedExeption.context, /Database connection error/);
assert.equal(loggedExeption.code, 'BULK_EMAIL_DB_RETRY');
sinon.assert.calledOnce(sendEmail);
assert.equal(afterEmailModel.get('status'), 'failed', 'The email status is failed after sending');
assert.equal(afterEmailModel.get('error'), 'Unexpected test error');
});
it('saves default error message if sending fails', async function () {
const Email = createModelClass({
findOne: {
@ -736,5 +773,216 @@ describe('Batch Sending Service', function () {
assert.equal(batch.get('error_message'), 'Test error');
assert.equal(batch.get('error_data'), '{"error":"test","messageData":"test"}');
});
it('Retries fetching recipients if 0 are returned', async function () {
const EmailBatch = createModelClass({
findOne: {
status: 'pending',
member_segment: null
}
});
const sendingService = {
send: sinon.stub().resolves({id: 'providerid@example.com'})
};
const WrongEmailRecipient = createModelClass({
findAll: []
});
let called = 0;
const MappedEmailRecipient = {
...EmailRecipient,
findAll() {
called += 1;
if (called === 1) {
return WrongEmailRecipient.findAll(...arguments);
}
return EmailRecipient.findAll(...arguments);
}
};
const findOne = sinon.spy(EmailBatch, 'findOne');
const service = new BatchSendingService({
models: {EmailBatch, EmailRecipient: MappedEmailRecipient},
sendingService,
BEFORE_RETRY_CONFIG: {maxRetries: 10, maxTime: 2000, sleep: 1}
});
const result = await service.sendBatch({
email: createModel({}),
batch: createModel({}),
post: createModel({}),
newsletter: createModel({})
});
assert.equal(result, true);
sinon.assert.calledOnce(errorLog);
const loggedExeption = errorLog.firstCall.args[0];
assert.match(loggedExeption.message, /\[BULK_EMAIL_DB_RETRY\] getBatchMembers batch/);
assert.match(loggedExeption.context, /No members found for batch/);
assert.equal(loggedExeption.code, 'BULK_EMAIL_DB_RETRY');
sinon.assert.calledOnce(sendingService.send);
sinon.assert.calledOnce(findOne);
const batch = await findOne.firstCall.returnValue;
assert.equal(batch.get('status'), 'submitted');
assert.equal(batch.get('provider_id'), 'providerid@example.com');
const {members} = sendingService.send.firstCall.args[0];
assert.equal(members.length, 2);
});
it('Stops retrying after the email retry cut off time', async function () {
const EmailBatch = createModelClass({
findOne: {
status: 'pending',
member_segment: null
}
});
const sendingService = {
send: sinon.stub().resolves({id: 'providerid@example.com'})
};
const WrongEmailRecipient = createModelClass({
findAll: []
});
let called = 0;
const MappedEmailRecipient = {
...EmailRecipient,
findAll() {
called += 1;
return WrongEmailRecipient.findAll(...arguments);
}
};
const service = new BatchSendingService({
models: {EmailBatch, EmailRecipient: MappedEmailRecipient},
sendingService,
BEFORE_RETRY_CONFIG: {maxRetries: 10, maxTime: 2000, sleep: 300}
});
const email = createModel({});
email._retryCutOffTime = new Date(Date.now() + 400);
const result = await service.sendBatch({
email,
batch: createModel({}),
post: createModel({}),
newsletter: createModel({})
});
assert.equal(called, 2);
assert.equal(result, false);
sinon.assert.calledThrice(errorLog); // First retry, second retry failed + bulk email send failed
const loggedExeption = errorLog.firstCall.args[0];
assert.match(loggedExeption.message, /\[BULK_EMAIL_DB_RETRY\] getBatchMembers batch/);
assert.match(loggedExeption.context, /No members found for batch/);
assert.equal(loggedExeption.code, 'BULK_EMAIL_DB_RETRY');
sinon.assert.notCalled(sendingService.send);
});
});
describe('retryDb', function () {
it('Does retry', async function () {
const service = new BatchSendingService({});
let callCount = 0;
const result = await service.retryDb(() => {
callCount += 1;
if (callCount === 3) {
return 'ok';
}
throw new Error('Test error');
}, {
maxRetries: 2, sleep: 10
});
assert.equal(result, 'ok');
assert.equal(callCount, 3);
});
it('Stops after maxRetries', async function () {
const service = new BatchSendingService({});
let callCount = 0;
const result = service.retryDb(() => {
callCount += 1;
if (callCount === 3) {
return 'ok';
}
throw new Error('Test error');
}, {
maxRetries: 1, sleep: 10
});
await assert.rejects(result, /Test error/);
assert.equal(callCount, 2);
});
it('Stops after stopAfterDate', async function () {
const clock = sinon.useFakeTimers({now: new Date(2023, 0, 1, 0, 0, 0, 0), shouldAdvanceTime: true});
const service = new BatchSendingService({});
let callCount = 0;
const result = service.retryDb(() => {
callCount += 1;
clock.tick(1000 * 60);
throw new Error('Test error');
}, {
maxRetries: 1000, stopAfterDate: new Date(2023, 0, 1, 0, 2, 50)
});
await assert.rejects(result, /Test error/);
assert.equal(callCount, 3);
});
it('Stops after maxTime', async function () {
const clock = sinon.useFakeTimers({now: new Date(2023, 0, 1, 0, 0, 0, 0), shouldAdvanceTime: true});
const service = new BatchSendingService({});
let callCount = 0;
const result = service.retryDb(() => {
callCount += 1;
clock.tick(1000 * 60);
throw new Error('Test error');
}, {
maxRetries: 1000, maxTime: 1000 * 60 * 3 - 1
});
await assert.rejects(result, /Test error/);
assert.equal(callCount, 3);
});
it('Resolves after maxTime', async function () {
const clock = sinon.useFakeTimers({now: new Date(2023, 0, 1, 0, 0, 0, 0), shouldAdvanceTime: true});
const service = new BatchSendingService({});
let callCount = 0;
const result = await service.retryDb(() => {
callCount += 1;
clock.tick(1000 * 60);
if (callCount === 3) {
return 'ok';
}
throw new Error('Test error');
}, {
maxRetries: 1000, maxTime: 1000 * 60 * 3
});
assert.equal(result, 'ok');
assert.equal(callCount, 3);
});
it('Resolves with stopAfterDate', async function () {
const clock = sinon.useFakeTimers({now: new Date(2023, 0, 1, 0, 0, 0, 0), shouldAdvanceTime: true});
const service = new BatchSendingService({});
let callCount = 0;
const result = await service.retryDb(() => {
callCount += 1;
clock.tick(1000 * 60);
if (callCount === 4) {
return 'ok';
}
throw new Error('Test error');
}, {
maxRetries: 1000, stopAfterDate: new Date(2023, 0, 1, 0, 10, 50)
});
assert.equal(result, 'ok');
assert.equal(callCount, 4);
});
});
});