mirror of
https://github.com/TryGhost/Ghost.git
synced 2025-03-25 02:31:59 -05:00
Refactor mega service to use stored email content and batch/recipient records
no issue - store raw content in email record - keep any replacement strings in the html/plaintext content so that it can be used when sending email rather than needing to re-serialize the post content which may have changed - split post email serializer into separate serialization and replacement parsing functions - serialization now returns any email content that is derived from the post content (subject/html/plaintext) rather than post content plus replacements - `parseReplacements` has been split out so that it can be run against email content rather than a post, this allows mega and the email preview service to work with the stored email content - move mailgun-specific functionality into the mailgun provider - previously mailgun-specific behaviour was spread across the post email serializer, mega, and bulk-email service - the per-batch `send` functionality was moved from the `bulk-email` service to the mailgun provider and updated to take email content, recipient info, and replacement info so that all mailgun-specific headers and replacement formatting can be handled in one place - exposes the `BATCH_SIZE` constant because batch sizes are limited to what the provider allows - `bulk-email` service split into three methods - `send` responsible for taking email content and recipients, parsing replacement info from the email content and using that to collate a recipient data object, and finally coordinating the send via the mailgun provider. Usable directly for use-cases such as test emails - `processEmail` takes an email ID, loads it and coordinates sending related batches concurrently - `processEmailBatch` takes an email_batch ID, loads it along with associated email_recipient records and passes the data through to the `send` function, updating the batch status as it's processed - `processEmail` and `processEmailBatch` take IDs rather than objects ready for future use by job-queues, it's best to keep job parameters as minimal as possible - refactored `mega` service - modified `getEmailData` to collate email content (from/reply-to/subject/html/plaintext) rather than being responsible for dealing with replacements and mailgun-specific replacement formats - used for generating email content before storing in the email table, and when sending test emails - from/reply-to calculation moved out of the post-email-serializer into mega and extracted into separate functions used by `getEmailData` - `sendTestEmail` updated to generate `EmailRecipient`-like objects for each email address so that appropriate data can be supplied to the updated `bulk-email.send` method - `sendEmailJob` updated to create `email_batches` and associated `email_recipients` records then hand over processing to the `bulk-email` service - member row fetching extracted into a separate function and used by `createEmailBatches` - moved updating of email status from `mega` to the `bulk-email` service, keeps concept of Successful/FailedBatch internal to the `bulk-email` service
This commit is contained in:
parent
d34a3263e8
commit
474e6c4c45
5 changed files with 477 additions and 286 deletions
core/server
|
@ -31,16 +31,17 @@ module.exports = {
|
|||
});
|
||||
}
|
||||
|
||||
return mega.postEmailSerializer.serialize(model, {isBrowserPreview: true}).then(({emailTmpl, replacements}) => {
|
||||
// perform replacements using no member data
|
||||
return mega.postEmailSerializer.serialize(model, {isBrowserPreview: true}).then((emailContent) => {
|
||||
const replacements = mega.postEmailSerializer.parseReplacements(emailContent);
|
||||
|
||||
replacements.forEach((replacement) => {
|
||||
emailTmpl[replacement.format] = emailTmpl[replacement.format].replace(
|
||||
emailContent[replacement.format] = emailContent[replacement.format].replace(
|
||||
replacement.match,
|
||||
replacement.fallback || ''
|
||||
);
|
||||
});
|
||||
|
||||
return emailTmpl;
|
||||
return emailContent;
|
||||
});
|
||||
});
|
||||
}
|
||||
|
|
|
@ -1,31 +1,33 @@
|
|||
const _ = require('lodash');
|
||||
const moment = require('moment-timezone');
|
||||
const errors = require('@tryghost/errors');
|
||||
const {i18n} = require('../../lib/common');
|
||||
const logging = require('../../../shared/logging');
|
||||
const models = require('../../models');
|
||||
const mailgunProvider = require('./mailgun');
|
||||
const configService = require('../../../shared/config');
|
||||
const settingsCache = require('../settings/cache');
|
||||
const sentry = require('../../../shared/sentry');
|
||||
const debug = require('ghost-ignition').debug('mega');
|
||||
const postEmailSerializer = require('../mega/post-email-serializer');
|
||||
|
||||
const BATCH_SIZE = mailgunProvider.BATCH_SIZE;
|
||||
|
||||
/**
|
||||
* An object representing batch request result
|
||||
* @typedef { Object } BatchResultBase
|
||||
* @property { string } data - data that is returned from Mailgun or one which Mailgun was called with
|
||||
*/
|
||||
class BatchResultBase {
|
||||
}
|
||||
class BatchResultBase {}
|
||||
|
||||
class SuccessfulBatch extends BatchResultBase {
|
||||
constructor(data) {
|
||||
super();
|
||||
this.data = data;
|
||||
constructor(id) {
|
||||
super(...arguments);
|
||||
this.id = id;
|
||||
}
|
||||
}
|
||||
|
||||
class FailedBatch extends BatchResultBase {
|
||||
constructor(error, data) {
|
||||
super();
|
||||
constructor(id, error) {
|
||||
super(...arguments);
|
||||
error.originalMessage = error.message;
|
||||
|
||||
if (error.statusCode >= 500) {
|
||||
|
@ -37,11 +39,10 @@ class FailedBatch extends BatchResultBase {
|
|||
} else if (error.message.includes(`'to' parameter is not a valid address`)) {
|
||||
error.message = 'Recipient is not a valid address';
|
||||
} else {
|
||||
error.message = 'Email failed to send - please verify your email settings';
|
||||
error.message = `Email failed to send "${error.originalMessage}" - please verify your email settings`;
|
||||
}
|
||||
|
||||
this.error = error;
|
||||
this.data = data;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -58,95 +59,196 @@ class FailedBatch extends BatchResultBase {
|
|||
*/
|
||||
|
||||
module.exports = {
|
||||
BATCH_SIZE,
|
||||
SuccessfulBatch,
|
||||
FailedBatch,
|
||||
|
||||
// accepts an ID rather than an Email model to better support running via a job queue
|
||||
async processEmail({emailId, options}) {
|
||||
const knexOptions = _.pick(options, ['transacting', 'forUpdate']);
|
||||
const emailModel = await models.Email.findOne({id: emailId}, knexOptions);
|
||||
|
||||
if (!emailModel) {
|
||||
throw new errors.IncorrectUsageError({
|
||||
message: 'Provided email id does not match a known email record',
|
||||
context: {
|
||||
id: emailId
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (emailModel.get('status') !== 'pending') {
|
||||
throw new errors.IncorrectUsageError({
|
||||
message: 'Emails can only be processed when in the "pending" state',
|
||||
context: `Email "${emailId}" has state "${emailModel.get('status')}"`,
|
||||
code: 'EMAIL_NOT_PENDING'
|
||||
});
|
||||
}
|
||||
|
||||
await emailModel.save({status: 'submitting'}, Object.assign({}, knexOptions, {patch: true}));
|
||||
|
||||
// get batch IDs via knex to avoid model instantiation
|
||||
// only fetch pending or failed batches to avoid re-sending previously sent emails
|
||||
const batchIds = await models.EmailBatch
|
||||
.getFilteredCollectionQuery({filter: `email_id:${emailId}+status:[pending,failed]`}, knexOptions)
|
||||
.select('id');
|
||||
|
||||
const batchResults = Promise.map(batchIds, async ({id: emailBatchId}) => {
|
||||
try {
|
||||
await this.processEmailBatch({emailBatchId, options});
|
||||
return new SuccessfulBatch(emailBatchId);
|
||||
} catch (error) {
|
||||
return new FailedBatch(emailBatchId, error);
|
||||
}
|
||||
}, {concurrency: 10});
|
||||
|
||||
const successes = batchResults.filter(response => (response instanceof SuccessfulBatch));
|
||||
const failures = batchResults.filter(response => (response instanceof FailedBatch));
|
||||
const batchStatus = successes.length ? 'submitted' : 'failed';
|
||||
|
||||
let error;
|
||||
|
||||
if (failures.length) {
|
||||
error = failures[0].error.message;
|
||||
}
|
||||
|
||||
if (error && error.length > 2000) {
|
||||
error = error.substring(0, 2000);
|
||||
}
|
||||
|
||||
try {
|
||||
await models.Email.edit({
|
||||
status: batchStatus,
|
||||
results: JSON.stringify(successes),
|
||||
error: error,
|
||||
error_data: JSON.stringify(failures) // NOTE: need to discuss how we store this
|
||||
}, {
|
||||
id: emailModel.id
|
||||
});
|
||||
} catch (err) {
|
||||
logging.error(err);
|
||||
}
|
||||
|
||||
await emailModel.save({status: 'submitted'}, Object.assign({}, knexOptions, {patch: true}));
|
||||
|
||||
return batchResults;
|
||||
},
|
||||
|
||||
// accepts an ID rather than an EmailBatch model to better support running via a job queue
|
||||
async processEmailBatch({emailBatchId, options}) {
|
||||
const knexOptions = _.pick(options, ['transacting', 'forUpdate']);
|
||||
|
||||
const emailBatchModel = await models.EmailBatch
|
||||
.findOne({id: emailBatchId}, Object.assign({}, knexOptions, {withRelated: 'email'}));
|
||||
|
||||
if (!emailBatchModel) {
|
||||
throw new errors.IncorrectUsageError({
|
||||
message: 'Provided email_batch id does not match a known email_batch record',
|
||||
context: {
|
||||
id: emailBatchId
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (!['pending','failed'].includes(emailBatchModel.get('status'))) {
|
||||
throw new errors.IncorrectUsageError({
|
||||
message: 'Email batches can only be processed when in the "pending" or "failed" state',
|
||||
context: `Email batch "${emailBatchId}" has state "${emailBatchModel.get('status')}"`
|
||||
});
|
||||
}
|
||||
|
||||
// get recipient rows via knex to avoid costly bookshelf model instantiation
|
||||
const recipientRows = await models.EmailRecipient
|
||||
.getFilteredCollectionQuery({filter: `batch_id:${emailBatchId}`});
|
||||
|
||||
await emailBatchModel.save({status: 'submitting'}, knexOptions);
|
||||
|
||||
let result;
|
||||
|
||||
try {
|
||||
// send the email
|
||||
const sendResponse = await this.send(emailBatchModel.relations.email.toJSON(), recipientRows);
|
||||
|
||||
// update batch success status
|
||||
result = await emailBatchModel.save({
|
||||
status: 'submitted',
|
||||
provider_id: sendResponse.id
|
||||
}, Object.assign({}, knexOptions, {patch: true}));
|
||||
} catch (error) {
|
||||
// update batch failed status
|
||||
await emailBatchModel.save({status: 'failed'}, knexOptions);
|
||||
|
||||
// log any error that didn't come from the provider which would have already logged it
|
||||
if (!error.code || error.code !== 'BULK_EMAIL_SEND_FAILED') {
|
||||
let ghostError = new errors.InternalServerError({
|
||||
err: error
|
||||
});
|
||||
sentry.captureException(ghostError);
|
||||
logging.error(ghostError);
|
||||
throw ghostError;
|
||||
}
|
||||
} finally {
|
||||
// update all email recipients with a processed_at
|
||||
await models.EmailRecipient
|
||||
.where({batch_id: emailBatchId})
|
||||
.save({processed_at: moment()}, Object.assign({}, knexOptions, {patch: true}));
|
||||
}
|
||||
|
||||
return result;
|
||||
},
|
||||
|
||||
/**
|
||||
* @param {Email} message - The message to send
|
||||
* @param {[EmailAddress]} recipients - the recipients to send the email to
|
||||
* @param {[object]} recipientData - list of data keyed by email to inject into the email
|
||||
* @returns {Promise<Array<BatchResultBase>>} An array of promises representing the success of the batch email sending
|
||||
* @param {Email-like} emailData - The email to send, must be a POJO so emailModel.toJSON() before calling if needed
|
||||
* @param {[EmailRecipient]} recipients - The recipients to send the email to with their associated data
|
||||
* @returns {Object} - {providerId: 'xxx'}
|
||||
*/
|
||||
async send(message, recipients, recipientData = {}) {
|
||||
let BATCH_SIZE = 1000;
|
||||
send(emailData, recipients) {
|
||||
const mailgunInstance = mailgunProvider.getInstance();
|
||||
if (!mailgunInstance) {
|
||||
return;
|
||||
}
|
||||
let fromAddress = message.from;
|
||||
if (/@localhost$/.test(message.from) || /@ghost.local$/.test(message.from)) {
|
||||
fromAddress = 'localhost@example.com';
|
||||
logging.warn(`Rewriting bulk email from address ${message.from} to ${fromAddress}`);
|
||||
|
||||
BATCH_SIZE = 2;
|
||||
}
|
||||
const startTime = Date.now();
|
||||
debug(`sending message to ${recipients.length} recipients`);
|
||||
|
||||
const blogTitle = settingsCache.get('title') ? settingsCache.get('title').replace(/"/g, '\\"') : '';
|
||||
let supportAddress = message.supportAddress;
|
||||
delete message.supportAddress;
|
||||
const replyAddressOption = settingsCache.get('members_reply_address');
|
||||
const replyToAddress = (replyAddressOption === 'support') ? supportAddress : fromAddress;
|
||||
fromAddress = blogTitle ? `"${blogTitle}"<${fromAddress}>` : fromAddress;
|
||||
const replacements = postEmailSerializer.parseReplacements(emailData);
|
||||
|
||||
const chunkedRecipients = _.chunk(recipients, BATCH_SIZE);
|
||||
|
||||
return Promise.map(chunkedRecipients, (toAddresses, chunkIndex) => {
|
||||
const recipientVariables = {};
|
||||
toAddresses.forEach((email) => {
|
||||
recipientVariables[email] = recipientData[email];
|
||||
});
|
||||
|
||||
const batchData = {
|
||||
to: toAddresses,
|
||||
from: fromAddress,
|
||||
'h:Reply-To': replyToAddress || fromAddress,
|
||||
'recipient-variables': recipientVariables
|
||||
// collate static and dynamic data for each recipient ready for provider
|
||||
const recipientData = {};
|
||||
recipients.forEach((recipient) => {
|
||||
// static data for every recipient
|
||||
const data = {
|
||||
unique_id: recipient.member_uuid,
|
||||
unsubscribe_url: postEmailSerializer.createUnsubscribeUrl(recipient.member_uuid)
|
||||
};
|
||||
|
||||
const bulkEmailConfig = configService.get('bulkEmail');
|
||||
// computed properties on recipients - TODO: better way of handling these
|
||||
recipient.member_first_name = (recipient.member_name || '').split(' ')[0];
|
||||
|
||||
if (bulkEmailConfig && bulkEmailConfig.mailgun && bulkEmailConfig.mailgun.tag) {
|
||||
Object.assign(batchData, {
|
||||
'o:tag': [bulkEmailConfig.mailgun.tag, 'bulk-email']
|
||||
});
|
||||
}
|
||||
|
||||
if (bulkEmailConfig && bulkEmailConfig.mailgun && bulkEmailConfig.mailgun.testmode) {
|
||||
Object.assign(batchData, {
|
||||
'o:testmode': true
|
||||
});
|
||||
}
|
||||
|
||||
const messageData = Object.assign({}, message, batchData);
|
||||
|
||||
// Rename plaintext field to text for Mailgun
|
||||
messageData.text = messageData.plaintext;
|
||||
delete messageData.plaintext;
|
||||
|
||||
return new Promise((resolve) => {
|
||||
const batchStartTime = Date.now();
|
||||
debug(`sending message batch ${chunkIndex + 1} to ${toAddresses.length}`);
|
||||
mailgunInstance.messages().send(messageData, (error, body) => {
|
||||
if (error) {
|
||||
// NOTE: logging an error here only but actual handling should happen in more sophisticated batch retry handler
|
||||
// REF: possible mailgun errors https://documentation.mailgun.com/en/latest/api-intro.html#errors
|
||||
let ghostError = new errors.EmailError({
|
||||
err: error,
|
||||
context: i18n.t('errors.services.mega.requestFailed.error')
|
||||
});
|
||||
|
||||
sentry.captureException(ghostError);
|
||||
logging.warn(ghostError);
|
||||
|
||||
// NOTE: these are generated variables, so can be regenerated when retry is done
|
||||
const data = _.omit(batchData, ['recipient-variables']);
|
||||
debug(`failed message batch ${chunkIndex + 1} (${Date.now() - batchStartTime}ms)`);
|
||||
resolve(new FailedBatch(error, data));
|
||||
} else {
|
||||
debug(`sent message batch ${chunkIndex + 1} (${Date.now() - batchStartTime}ms)`);
|
||||
resolve(new SuccessfulBatch(body));
|
||||
}
|
||||
});
|
||||
// dynamic data from replacements
|
||||
replacements.forEach((id, recipientProp, fallback) => {
|
||||
data[id] = recipient[recipientProp] || fallback || '';
|
||||
});
|
||||
}, {concurrency: 10});
|
||||
|
||||
recipientData[recipient.member_email] = data;
|
||||
});
|
||||
|
||||
return mailgunProvider.send(emailData, recipientData, replacements).then((response) => {
|
||||
debug(`sent message (${Date.now() - startTime}ms)`);
|
||||
return response;
|
||||
}).catch((error) => {
|
||||
// REF: possible mailgun errors https://documentation.mailgun.com/en/latest/api-intro.html#errors
|
||||
let ghostError = new errors.EmailError({
|
||||
err: error,
|
||||
context: i18n.t('errors.services.mega.requestFailed.error'),
|
||||
code: 'BULK_EMAIL_SEND_FAILED'
|
||||
});
|
||||
|
||||
sentry.captureException(ghostError);
|
||||
logging.warn(ghostError);
|
||||
|
||||
debug(`failed to send message (${Date.now() - startTime}ms)`);
|
||||
throw ghostError;
|
||||
});
|
||||
}
|
||||
};
|
||||
|
|
|
@ -1,9 +1,12 @@
|
|||
const _ = require('lodash');
|
||||
const {URL} = require('url');
|
||||
const mailgun = require('mailgun-js');
|
||||
const logging = require('../../../shared/logging');
|
||||
const configService = require('../../../shared/config');
|
||||
const settingsCache = require('../settings/cache');
|
||||
|
||||
const BATCH_SIZE = 1000;
|
||||
|
||||
function createMailgun(config) {
|
||||
const baseUrl = new URL(config.baseUrl);
|
||||
|
||||
|
@ -31,16 +34,76 @@ function getInstance() {
|
|||
if (!hasMailgunConfig && !hasMailgunSetting) {
|
||||
logging.warn(`Bulk email service is not configured`);
|
||||
} else {
|
||||
try {
|
||||
let mailgunConfig = hasMailgunConfig ? bulkEmailConfig.mailgun : bulkEmailSetting;
|
||||
return createMailgun(mailgunConfig);
|
||||
} catch (err) {
|
||||
logging.warn(`Bulk email service is not configured`);
|
||||
}
|
||||
let mailgunConfig = hasMailgunConfig ? bulkEmailConfig.mailgun : bulkEmailSetting;
|
||||
return createMailgun(mailgunConfig);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// recipients format:
|
||||
// {
|
||||
// 'test@example.com': {
|
||||
// name: 'Test User',
|
||||
// unique_id: '12345abcde',
|
||||
// unsubscribe_url: 'https://example.com/unsub/me'
|
||||
// }
|
||||
// }
|
||||
function send(message, recipientData, replacements) {
|
||||
if (recipientData.length > BATCH_SIZE) {
|
||||
// err - too many recipients
|
||||
}
|
||||
|
||||
let messageData = {};
|
||||
|
||||
try {
|
||||
const bulkEmailConfig = configService.get('bulkEmail');
|
||||
const mailgunInstance = getInstance();
|
||||
|
||||
const messageContent = _.pick(message, 'html', 'plaintext');
|
||||
|
||||
// update content to use Mailgun variable syntax for replacements
|
||||
replacements.forEach((replacement) => {
|
||||
messageContent[replacement.format] = messageContent[replacement.format].replace(
|
||||
replacement.match,
|
||||
`%recipient.${replacement.id}%`
|
||||
);
|
||||
});
|
||||
|
||||
messageData = {
|
||||
toAddresses: Object.keys(recipientData),
|
||||
from: message.from,
|
||||
'h:Reply-To': message.replyTo,
|
||||
'recipient-variables': recipientData,
|
||||
html: messageContent.html,
|
||||
text: messageContent.plaintext
|
||||
};
|
||||
|
||||
if (bulkEmailConfig && bulkEmailConfig.mailgun && bulkEmailConfig.mailgun.tag) {
|
||||
messageData['o:tag'] = [bulkEmailConfig.mailgun.tag, 'bulk-email'];
|
||||
}
|
||||
|
||||
if (bulkEmailConfig && bulkEmailConfig.mailgun && bulkEmailConfig.mailgun.testmode) {
|
||||
messageData['o:testmode'] = true;
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
mailgunInstance.messages().send(messageData, (error, body) => {
|
||||
if (error) {
|
||||
return reject(error);
|
||||
}
|
||||
|
||||
return resolve({
|
||||
id: body.id
|
||||
});
|
||||
});
|
||||
});
|
||||
} catch (error) {
|
||||
return Promise.reject({error, messageData});
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
getInstance: getInstance
|
||||
BATCH_SIZE,
|
||||
getInstance,
|
||||
send
|
||||
};
|
||||
|
|
|
@ -2,74 +2,79 @@ const _ = require('lodash');
|
|||
const debug = require('ghost-ignition').debug('mega');
|
||||
const url = require('url');
|
||||
const moment = require('moment');
|
||||
const ObjectId = require('bson-objectid');
|
||||
const ObjectID = require('bson-objectid');
|
||||
const errors = require('@tryghost/errors');
|
||||
const {events, i18n} = require('../../lib/common');
|
||||
const logging = require('../../../shared/logging');
|
||||
const settingsCache = require('../settings/cache');
|
||||
const membersService = require('../members');
|
||||
const bulkEmailService = require('../bulk-email');
|
||||
const jobService = require('../jobs');
|
||||
const models = require('../../models');
|
||||
const db = require('../../data/db');
|
||||
const models = require('../../models');
|
||||
const postEmailSerializer = require('./post-email-serializer');
|
||||
|
||||
const getEmailData = async (postModel, memberRows = []) => {
|
||||
const startTime = Date.now();
|
||||
debug(`getEmailData: starting for ${memberRows.length} members`);
|
||||
const {emailTmpl, replacements} = await postEmailSerializer.serialize(postModel);
|
||||
const getFromAddress = () => {
|
||||
let fromAddress = membersService.config.getEmailFromAddress();
|
||||
|
||||
emailTmpl.from = membersService.config.getEmailFromAddress();
|
||||
emailTmpl.supportAddress = membersService.config.getEmailSupportAddress();
|
||||
if (/@localhost$/.test(fromAddress) || /@ghost.local$/.test(fromAddress)) {
|
||||
const localAddress = 'localhost@example.com';
|
||||
logging.warn(`Rewriting bulk email from address ${fromAddress} to ${localAddress}`);
|
||||
fromAddress = localAddress;
|
||||
}
|
||||
|
||||
// update templates to use Mailgun variable syntax for replacements
|
||||
replacements.forEach((replacement) => {
|
||||
emailTmpl[replacement.format] = emailTmpl[replacement.format].replace(
|
||||
replacement.match,
|
||||
`%recipient.${replacement.id}%`
|
||||
);
|
||||
});
|
||||
const siteTitle = settingsCache.get('title') ? settingsCache.get('title').replace(/"/g, '\\"') : '';
|
||||
|
||||
const emails = [];
|
||||
const emailData = {};
|
||||
memberRows.forEach((memberRow) => {
|
||||
emails.push(memberRow.email);
|
||||
|
||||
// first_name is a computed property only used here for now
|
||||
// TODO: move into model computed property or output serializer?
|
||||
memberRow.first_name = (memberRow.name || '').split(' ')[0];
|
||||
|
||||
// add static data to mailgun template variables
|
||||
const data = {
|
||||
unique_id: memberRow.uuid,
|
||||
unsubscribe_url: postEmailSerializer.createUnsubscribeUrl(memberRow.uuid)
|
||||
};
|
||||
|
||||
// add replacement data/requested fallback to mailgun template variables
|
||||
replacements.forEach(({id, memberProp, fallback}) => {
|
||||
data[id] = memberRow[memberProp] || fallback || '';
|
||||
});
|
||||
|
||||
emailData[memberRow.email] = data;
|
||||
});
|
||||
|
||||
debug(`getEmailData: done (${Date.now() - startTime}ms)`);
|
||||
return {emailTmpl, emails, emailData};
|
||||
return siteTitle ? `"${siteTitle}"<${fromAddress}>` : fromAddress;
|
||||
};
|
||||
|
||||
const sendEmail = async (postModel, memberRows) => {
|
||||
const {emailTmpl, emails, emailData} = await getEmailData(postModel, memberRows);
|
||||
const getReplyToAddress = () => {
|
||||
const fromAddress = membersService.config.getEmailFromAddress();
|
||||
const supportAddress = membersService.config.getEmailSupportAddress();
|
||||
const replyAddressOption = settingsCache.get('members_reply_address');
|
||||
|
||||
return bulkEmailService.send(emailTmpl, emails, emailData);
|
||||
return (replyAddressOption === 'support') ? supportAddress : fromAddress;
|
||||
};
|
||||
|
||||
const getEmailData = async (postModel, options) => {
|
||||
const {subject, html, plaintext} = await postEmailSerializer.serialize(postModel, options);
|
||||
|
||||
return {
|
||||
subject,
|
||||
html,
|
||||
plaintext,
|
||||
from: getFromAddress(),
|
||||
replyTo: getReplyToAddress()
|
||||
};
|
||||
};
|
||||
|
||||
const sendTestEmail = async (postModel, toEmails) => {
|
||||
const emailData = await getEmailData(postModel);
|
||||
emailData.subject = `[Test] ${emailData.subject}`;
|
||||
|
||||
// fetch any matching members so that replacements use expected values
|
||||
const recipients = await Promise.all(toEmails.map(async (email) => {
|
||||
const member = await membersService.api.members.get({email});
|
||||
return member || new models.Member({email});
|
||||
if (member) {
|
||||
return {
|
||||
member_uuid: member.get('id'),
|
||||
member_email: member.get('email'),
|
||||
member_name: member.get('name')
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
member_email: email
|
||||
};
|
||||
}));
|
||||
const {emailTmpl, emails, emailData} = await getEmailData(postModel, recipients);
|
||||
emailTmpl.subject = `[Test] ${emailTmpl.subject}`;
|
||||
return bulkEmailService.send(emailTmpl, emails, emailData);
|
||||
|
||||
const response = await bulkEmailService.send(emailData, recipients);
|
||||
|
||||
if (response instanceof bulkEmailService.FailedBatch) {
|
||||
return Promise.reject(response.error);
|
||||
}
|
||||
|
||||
return response;
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -105,21 +110,17 @@ const addEmail = async (postModel, options) => {
|
|||
if (!existing) {
|
||||
// get email contents and perform replacements using no member data so
|
||||
// we have a decent snapshot of email content for later display
|
||||
const {emailTmpl, replacements} = await postEmailSerializer.serialize(postModel, {isBrowserPreview: true});
|
||||
replacements.forEach((replacement) => {
|
||||
emailTmpl[replacement.format] = emailTmpl[replacement.format].replace(
|
||||
replacement.match,
|
||||
replacement.fallback || ''
|
||||
);
|
||||
});
|
||||
const emailData = await getEmailData(postModel);
|
||||
|
||||
return models.Email.add({
|
||||
post_id: postId,
|
||||
status: 'pending',
|
||||
email_count: membersCount,
|
||||
subject: emailTmpl.subject,
|
||||
html: emailTmpl.html,
|
||||
plaintext: emailTmpl.plaintext,
|
||||
subject: emailData.subject,
|
||||
from: emailData.from,
|
||||
reply_to: emailData.replyTo,
|
||||
html: emailData.html,
|
||||
plaintext: emailData.plaintext,
|
||||
submitted_at: moment().toDate()
|
||||
}, knexOptions);
|
||||
} else {
|
||||
|
@ -132,13 +133,13 @@ const addEmail = async (postModel, options) => {
|
|||
*
|
||||
* Accepts an Email model and resets it's fields to trigger retry listeners
|
||||
*
|
||||
* @param {object} model Email model
|
||||
* @param {Email} emailModel Email model
|
||||
*/
|
||||
const retryFailedEmail = async (model) => {
|
||||
const retryFailedEmail = async (emailModel) => {
|
||||
return await models.Email.edit({
|
||||
status: 'pending'
|
||||
}, {
|
||||
id: model.get('id')
|
||||
id: emailModel.get('id')
|
||||
});
|
||||
};
|
||||
|
||||
|
@ -189,106 +190,6 @@ async function handleUnsubscribeRequest(req) {
|
|||
}
|
||||
}
|
||||
|
||||
async function sendEmailJob({emailModel, options}) {
|
||||
const postModel = await models.Post.findOne({id: emailModel.get('post_id')}, {withRelated: ['authors']});
|
||||
let meta = [];
|
||||
let error = null;
|
||||
let startEmailSend = null;
|
||||
|
||||
try {
|
||||
// Check host limit for allowed member count and throw error if over limit
|
||||
await membersService.checkHostLimit();
|
||||
|
||||
const knexOptions = _.pick(options, ['transacting', 'forUpdate']);
|
||||
// TODO: this will clobber a user-assigned filter if/when we allow emails to be sent to filtered member lists
|
||||
const filterOptions = Object.assign({}, knexOptions, {filter: 'subscribed:true'});
|
||||
|
||||
if (postModel.get('visibility') === 'paid') {
|
||||
filterOptions.paid = true;
|
||||
}
|
||||
|
||||
const startRetrieve = Date.now();
|
||||
debug('getEmailMemberRows: retrieving members list');
|
||||
const memberRows = await models.Member.getFilteredCollectionQuery(filterOptions);
|
||||
debug(`getEmailMemberRows: retrieved members list - ${memberRows.length} members (${Date.now() - startRetrieve}ms)`);
|
||||
|
||||
if (!memberRows.length) {
|
||||
return;
|
||||
}
|
||||
|
||||
await models.Email.edit({
|
||||
status: 'submitting'
|
||||
}, {
|
||||
id: emailModel.id
|
||||
});
|
||||
|
||||
debug('pendingEmailHandler: storing recipient list');
|
||||
const startOfRecipientStorage = Date.now();
|
||||
const storeRecipientBatch = async function (recipients) {
|
||||
let batchModel = await models.EmailBatch.add({email_id: emailModel.id}, knexOptions);
|
||||
|
||||
// use knex rather than bookshelf to avoid overhead and event loop blocking
|
||||
// when instantiating large numbers of bookshelf model objects
|
||||
const recipientData = recipients.map((memberRow) => {
|
||||
return {
|
||||
id: ObjectId.generate(),
|
||||
email_id: emailModel.id,
|
||||
member_id: memberRow.id,
|
||||
batch_id: batchModel.id,
|
||||
member_uuid: memberRow.uuid,
|
||||
member_email: memberRow.email,
|
||||
member_name: memberRow.name
|
||||
};
|
||||
});
|
||||
return await db.knex('email_recipients').insert(recipientData);
|
||||
};
|
||||
await Promise.each(_.chunk(memberRows, 1000), storeRecipientBatch);
|
||||
debug(`pendingEmailHandler: stored recipient list (${Date.now() - startOfRecipientStorage}ms)`);
|
||||
|
||||
// NOTE: meta contains an array which can be a mix of successful and error responses
|
||||
// needs filtering and saving objects of {error, batchData} form to separate property
|
||||
debug('pendingEmailHandler: sending email');
|
||||
startEmailSend = Date.now();
|
||||
meta = await sendEmail(postModel, memberRows);
|
||||
debug(`pendingEmailHandler: sent email (${Date.now() - startEmailSend}ms)`);
|
||||
} catch (err) {
|
||||
if (startEmailSend) {
|
||||
debug(`pendingEmailHandler: send email failed (${Date.now() - startEmailSend}ms)`);
|
||||
}
|
||||
logging.error(new errors.GhostError({
|
||||
err: err,
|
||||
context: i18n.t('errors.services.mega.requestFailed.error')
|
||||
}));
|
||||
error = err.message;
|
||||
}
|
||||
|
||||
const successes = meta.filter(response => (response instanceof bulkEmailService.SuccessfulBatch));
|
||||
const failures = meta.filter(response => (response instanceof bulkEmailService.FailedBatch));
|
||||
const batchStatus = successes.length ? 'submitted' : 'failed';
|
||||
|
||||
if (!error && failures.length) {
|
||||
error = failures[0].error.message;
|
||||
}
|
||||
|
||||
if (error && error.length > 2000) {
|
||||
error = error.substring(0, 2000);
|
||||
}
|
||||
|
||||
try {
|
||||
// CASE: the batch partially succeeded
|
||||
await models.Email.edit({
|
||||
status: batchStatus,
|
||||
meta: JSON.stringify(successes),
|
||||
error: error,
|
||||
error_data: JSON.stringify(failures) // NOTE: need to discuss how we store this
|
||||
}, {
|
||||
id: emailModel.id
|
||||
});
|
||||
} catch (err) {
|
||||
logging.error(err);
|
||||
}
|
||||
}
|
||||
|
||||
async function pendingEmailHandler(emailModel, options) {
|
||||
// CASE: do not send email if we import a database
|
||||
// TODO: refactor post.published events to never fire on importing
|
||||
|
@ -303,6 +204,102 @@ async function pendingEmailHandler(emailModel, options) {
|
|||
return jobService.addJob(sendEmailJob, {emailModel, options});
|
||||
}
|
||||
|
||||
async function sendEmailJob({emailModel, options}) {
|
||||
let startEmailSend = null;
|
||||
|
||||
try {
|
||||
// Check host limit for allowed member count and throw error if over limit
|
||||
// - do this even if it's a retry so that there's no way around the limit
|
||||
await membersService.checkHostLimit();
|
||||
|
||||
// Create email batch and recipient rows unless this is a retry and they already exist
|
||||
const existingBatchCount = await emailModel.related('emailBatches').count();
|
||||
if (existingBatchCount === 0) {
|
||||
const newBatchCount = await createEmailBatches({emailModel, options});
|
||||
|
||||
if (newBatchCount === 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
debug('sendEmailJob: sending email');
|
||||
startEmailSend = Date.now();
|
||||
await bulkEmailService.processEmail({emailId: emailModel.get('id'), options});
|
||||
debug(`sendEmailJob: sent email (${Date.now() - startEmailSend}ms)`);
|
||||
} catch (err) {
|
||||
if (startEmailSend) {
|
||||
debug(`sendEmailJob: send email failed (${Date.now() - startEmailSend}ms)`);
|
||||
}
|
||||
logging.error(new errors.GhostError({
|
||||
err: err,
|
||||
context: i18n.t('errors.services.mega.requestFailed.error')
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch rows of members that should receive an email.
|
||||
// Uses knex directly rather than bookshelf to avoid thousands of bookshelf model
|
||||
// instantiations and associated processing and event loop blocking
|
||||
async function getEmailMemberRows({emailModel, options}) {
|
||||
const knexOptions = _.pick(options, ['transacting', 'forUpdate']);
|
||||
const postModel = await models.Post.findOne({id: emailModel.get('post_id')}, knexOptions);
|
||||
|
||||
// TODO: this will clobber a user-assigned filter if/when we allow emails to be sent to filtered member lists
|
||||
const filterOptions = Object.assign({}, knexOptions, {filter: 'subscribed:true'});
|
||||
|
||||
if (postModel.get('visibility') === 'paid') {
|
||||
filterOptions.paid = true;
|
||||
}
|
||||
|
||||
const startRetrieve = Date.now();
|
||||
debug('getEmailMemberRows: retrieving members list');
|
||||
const memberRows = await models.Member.getFilteredCollectionQuery(filterOptions);
|
||||
debug(`getEmailMemberRows: retrieved members list - ${memberRows.length} members (${Date.now() - startRetrieve}ms)`);
|
||||
|
||||
return memberRows;
|
||||
}
|
||||
|
||||
// Store email_batch and email_recipient records for an email.
|
||||
// Uses knex directly rather than bookshelf to avoid thousands of bookshelf model
|
||||
// instantiations and associated processing and event loop blocking.
|
||||
// Returns array of batch ids
|
||||
async function createEmailBatches({emailModel, options}) {
|
||||
const memberRows = await getEmailMemberRows({emailModel, options});
|
||||
|
||||
if (!memberRows.length) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const storeRecipientBatch = async function (recipients) {
|
||||
const knexOptions = _.pick(options, ['transacting', 'forUpdate']);
|
||||
const batchModel = await models.EmailBatch.add({email_id: emailModel.id}, knexOptions);
|
||||
|
||||
const recipientData = recipients.map((memberRow) => {
|
||||
return {
|
||||
id: ObjectID.generate(),
|
||||
email_id: emailModel.id,
|
||||
member_id: memberRow.id,
|
||||
batch_id: batchModel.id,
|
||||
member_uuid: memberRow.uuid,
|
||||
member_email: memberRow.email,
|
||||
member_name: memberRow.name
|
||||
};
|
||||
});
|
||||
|
||||
await db.knex('email_recipients').insert(recipientData);
|
||||
|
||||
return batchModel.id;
|
||||
};
|
||||
|
||||
debug('createEmailBatches: storing recipient list');
|
||||
const startOfRecipientStorage = Date.now();
|
||||
const batches = _.chunk(memberRows, bulkEmailService.BATCH_SIZE);
|
||||
const batchIds = await Promise.mapSeries(batches, storeRecipientBatch);
|
||||
debug(`createEmailBatches: stored recipient list (${Date.now() - startOfRecipientStorage}ms)`);
|
||||
|
||||
return batchIds;
|
||||
}
|
||||
|
||||
const statusChangedHandler = (emailModel, options) => {
|
||||
const emailRetried = emailModel.wasChanged()
|
||||
&& emailModel.get('status') === 'pending'
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
const _ = require('lodash');
|
||||
const juice = require('juice');
|
||||
const template = require('./template');
|
||||
const config = require('../../../shared/config');
|
||||
|
@ -10,6 +11,8 @@ const {URL} = require('url');
|
|||
const mobiledocLib = require('../../lib/mobiledoc');
|
||||
const htmlToText = require('html-to-text');
|
||||
|
||||
const ALLOWED_REPLACEMENTS = ['first_name'];
|
||||
|
||||
const getSite = () => {
|
||||
const publicSettings = settingsCache.getPublic();
|
||||
return Object.assign({}, publicSettings, {
|
||||
|
@ -55,33 +58,22 @@ const serializePostModel = async (model) => {
|
|||
return frame.response[docName][0];
|
||||
};
|
||||
|
||||
// parses templates and extracts an array of replacements with desired fallbacks
|
||||
// removes %% wrappers from unknown replacement strings (modifies emailTmpl in place)
|
||||
const _parseReplacements = (emailTmpl) => {
|
||||
const EMAIL_REPLACEMENT_REGEX = /%%(\{.*?\})%%/g;
|
||||
// the " is necessary here because `juice` will convert "->" for email compatibility
|
||||
const REPLACEMENT_STRING_REGEX = /\{(?<memberProp>\w*?)(?:,? *(?:"|")(?<fallback>.*?)(?:"|"))?\}/;
|
||||
const ALLOWED_REPLACEMENTS = ['first_name'];
|
||||
// removes %% wrappers from unknown replacement strings in email content
|
||||
const normalizeReplacementStrings = (email) => {
|
||||
// we don't want to modify the email object in-place
|
||||
const emailContent = _.pick(email, ['html', 'plaintext']);
|
||||
|
||||
const EMAIL_REPLACEMENT_REGEX = /%%(\{.*?\})%%/g;
|
||||
const REPLACEMENT_STRING_REGEX = /\{(?<recipientProperty>\w*?)(?:,? *(?:"|")(?<fallback>.*?)(?:"|"))?\}/;
|
||||
|
||||
const replacements = [];
|
||||
['html', 'plaintext'].forEach((format) => {
|
||||
emailTmpl[format] = emailTmpl[format].replace(EMAIL_REPLACEMENT_REGEX, (replacementMatch, replacementStr) => {
|
||||
emailContent[format] = emailContent[format].replace(EMAIL_REPLACEMENT_REGEX, (replacementMatch, replacementStr) => {
|
||||
const match = replacementStr.match(REPLACEMENT_STRING_REGEX);
|
||||
|
||||
if (match) {
|
||||
const {memberProp, fallback} = match.groups;
|
||||
|
||||
if (ALLOWED_REPLACEMENTS.includes(memberProp)) {
|
||||
const id = `replacement_${replacements.length + 1}`;
|
||||
|
||||
replacements.push({
|
||||
format,
|
||||
id,
|
||||
match: replacementMatch,
|
||||
memberProp,
|
||||
fallback
|
||||
});
|
||||
const {recipientProperty} = match.groups;
|
||||
|
||||
if (ALLOWED_REPLACEMENTS.includes(recipientProperty)) {
|
||||
// keeps wrapping %% for later replacement with real data
|
||||
return replacementMatch;
|
||||
}
|
||||
|
@ -92,6 +84,40 @@ const _parseReplacements = (emailTmpl) => {
|
|||
});
|
||||
});
|
||||
|
||||
return emailContent;
|
||||
};
|
||||
|
||||
// parses email content and extracts an array of replacements with desired fallbacks
|
||||
const parseReplacements = (email) => {
|
||||
const EMAIL_REPLACEMENT_REGEX = /%%(\{.*?\})%%/g;
|
||||
const REPLACEMENT_STRING_REGEX = /\{(?<recipientProperty>\w*?)(?:,? *(?:"|")(?<fallback>.*?)(?:"|"))?\}/;
|
||||
|
||||
const replacements = [];
|
||||
|
||||
['html', 'plaintext'].forEach((format) => {
|
||||
let result;
|
||||
while ((result = EMAIL_REPLACEMENT_REGEX.exec(email[format])) !== null) {
|
||||
const [replacementMatch, replacementStr] = result;
|
||||
const match = replacementStr.match(REPLACEMENT_STRING_REGEX);
|
||||
|
||||
if (match) {
|
||||
const {recipientProperty, fallback} = match.groups;
|
||||
|
||||
if (ALLOWED_REPLACEMENTS.includes(recipientProperty)) {
|
||||
const id = `replacement_${replacements.length + 1}`;
|
||||
|
||||
replacements.push({
|
||||
format,
|
||||
id,
|
||||
match: replacementMatch,
|
||||
recipientProperty: `member_${recipientProperty}`,
|
||||
fallback
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return replacements;
|
||||
};
|
||||
|
||||
|
@ -150,19 +176,21 @@ const serialize = async (postModel, options = {isBrowserPreview: false}) => {
|
|||
// Fix any unsupported chars in Outlook
|
||||
juicedHtml = juicedHtml.replace(/'/g, ''');
|
||||
|
||||
const emailTmpl = {
|
||||
subject: post.email_subject || post.title,
|
||||
// Clean up any unknown replacements strings to get our final content
|
||||
const {html, plaintext} = normalizeReplacementStrings({
|
||||
html: juicedHtml,
|
||||
plaintext: post.plaintext
|
||||
});
|
||||
|
||||
return {
|
||||
subject: post.email_subject || post.title,
|
||||
html,
|
||||
plaintext
|
||||
};
|
||||
|
||||
// Extract known replacements and clean up unknown replacement strings
|
||||
const replacements = _parseReplacements(emailTmpl);
|
||||
|
||||
return {emailTmpl, replacements};
|
||||
};
|
||||
|
||||
module.exports = {
|
||||
serialize,
|
||||
createUnsubscribeUrl
|
||||
createUnsubscribeUrl,
|
||||
parseReplacements
|
||||
};
|
||||
|
|
Loading…
Add table
Reference in a new issue