diff --git a/core/server/api/canary/email-preview.js b/core/server/api/canary/email-preview.js index 40d53598e2..12ca89d3cd 100644 --- a/core/server/api/canary/email-preview.js +++ b/core/server/api/canary/email-preview.js @@ -31,16 +31,17 @@ module.exports = { }); } - return mega.postEmailSerializer.serialize(model, {isBrowserPreview: true}).then(({emailTmpl, replacements}) => { - // perform replacements using no member data + return mega.postEmailSerializer.serialize(model, {isBrowserPreview: true}).then((emailContent) => { + const replacements = mega.postEmailSerializer.parseReplacements(emailContent); + replacements.forEach((replacement) => { - emailTmpl[replacement.format] = emailTmpl[replacement.format].replace( + emailContent[replacement.format] = emailContent[replacement.format].replace( replacement.match, replacement.fallback || '' ); }); - return emailTmpl; + return emailContent; }); }); } diff --git a/core/server/services/bulk-email/index.js b/core/server/services/bulk-email/index.js index bb60bd14c4..e7cfe969df 100644 --- a/core/server/services/bulk-email/index.js +++ b/core/server/services/bulk-email/index.js @@ -1,31 +1,33 @@ const _ = require('lodash'); +const moment = require('moment-timezone'); const errors = require('@tryghost/errors'); const {i18n} = require('../../lib/common'); const logging = require('../../../shared/logging'); +const models = require('../../models'); const mailgunProvider = require('./mailgun'); -const configService = require('../../../shared/config'); -const settingsCache = require('../settings/cache'); const sentry = require('../../../shared/sentry'); const debug = require('ghost-ignition').debug('mega'); +const postEmailSerializer = require('../mega/post-email-serializer'); + +const BATCH_SIZE = mailgunProvider.BATCH_SIZE; /** * An object representing batch request result * @typedef { Object } BatchResultBase * @property { string } data - data that is returned from Mailgun or one which Mailgun was called with */ -class BatchResultBase { -} +class BatchResultBase {} class SuccessfulBatch extends BatchResultBase { - constructor(data) { - super(); - this.data = data; + constructor(id) { + super(...arguments); + this.id = id; } } class FailedBatch extends BatchResultBase { - constructor(error, data) { - super(); + constructor(id, error) { + super(...arguments); error.originalMessage = error.message; if (error.statusCode >= 500) { @@ -37,11 +39,10 @@ class FailedBatch extends BatchResultBase { } else if (error.message.includes(`'to' parameter is not a valid address`)) { error.message = 'Recipient is not a valid address'; } else { - error.message = 'Email failed to send - please verify your email settings'; + error.message = `Email failed to send "${error.originalMessage}" - please verify your email settings`; } this.error = error; - this.data = data; } } @@ -58,95 +59,196 @@ class FailedBatch extends BatchResultBase { */ module.exports = { + BATCH_SIZE, SuccessfulBatch, FailedBatch, + + // accepts an ID rather than an Email model to better support running via a job queue + async processEmail({emailId, options}) { + const knexOptions = _.pick(options, ['transacting', 'forUpdate']); + const emailModel = await models.Email.findOne({id: emailId}, knexOptions); + + if (!emailModel) { + throw new errors.IncorrectUsageError({ + message: 'Provided email id does not match a known email record', + context: { + id: emailId + } + }); + } + + if (emailModel.get('status') !== 'pending') { + throw new errors.IncorrectUsageError({ + message: 'Emails can only be processed when in the "pending" state', + context: `Email "${emailId}" has state "${emailModel.get('status')}"`, + code: 'EMAIL_NOT_PENDING' + }); + } + + await emailModel.save({status: 'submitting'}, Object.assign({}, knexOptions, {patch: true})); + + // get batch IDs via knex to avoid model instantiation + // only fetch pending or failed batches to avoid re-sending previously sent emails + const batchIds = await models.EmailBatch + .getFilteredCollectionQuery({filter: `email_id:${emailId}+status:[pending,failed]`}, knexOptions) + .select('id'); + + const batchResults = Promise.map(batchIds, async ({id: emailBatchId}) => { + try { + await this.processEmailBatch({emailBatchId, options}); + return new SuccessfulBatch(emailBatchId); + } catch (error) { + return new FailedBatch(emailBatchId, error); + } + }, {concurrency: 10}); + + const successes = batchResults.filter(response => (response instanceof SuccessfulBatch)); + const failures = batchResults.filter(response => (response instanceof FailedBatch)); + const batchStatus = successes.length ? 'submitted' : 'failed'; + + let error; + + if (failures.length) { + error = failures[0].error.message; + } + + if (error && error.length > 2000) { + error = error.substring(0, 2000); + } + + try { + await models.Email.edit({ + status: batchStatus, + results: JSON.stringify(successes), + error: error, + error_data: JSON.stringify(failures) // NOTE: need to discuss how we store this + }, { + id: emailModel.id + }); + } catch (err) { + logging.error(err); + } + + await emailModel.save({status: 'submitted'}, Object.assign({}, knexOptions, {patch: true})); + + return batchResults; + }, + + // accepts an ID rather than an EmailBatch model to better support running via a job queue + async processEmailBatch({emailBatchId, options}) { + const knexOptions = _.pick(options, ['transacting', 'forUpdate']); + + const emailBatchModel = await models.EmailBatch + .findOne({id: emailBatchId}, Object.assign({}, knexOptions, {withRelated: 'email'})); + + if (!emailBatchModel) { + throw new errors.IncorrectUsageError({ + message: 'Provided email_batch id does not match a known email_batch record', + context: { + id: emailBatchId + } + }); + } + + if (!['pending','failed'].includes(emailBatchModel.get('status'))) { + throw new errors.IncorrectUsageError({ + message: 'Email batches can only be processed when in the "pending" or "failed" state', + context: `Email batch "${emailBatchId}" has state "${emailBatchModel.get('status')}"` + }); + } + + // get recipient rows via knex to avoid costly bookshelf model instantiation + const recipientRows = await models.EmailRecipient + .getFilteredCollectionQuery({filter: `batch_id:${emailBatchId}`}); + + await emailBatchModel.save({status: 'submitting'}, knexOptions); + + let result; + + try { + // send the email + const sendResponse = await this.send(emailBatchModel.relations.email.toJSON(), recipientRows); + + // update batch success status + result = await emailBatchModel.save({ + status: 'submitted', + provider_id: sendResponse.id + }, Object.assign({}, knexOptions, {patch: true})); + } catch (error) { + // update batch failed status + await emailBatchModel.save({status: 'failed'}, knexOptions); + + // log any error that didn't come from the provider which would have already logged it + if (!error.code || error.code !== 'BULK_EMAIL_SEND_FAILED') { + let ghostError = new errors.InternalServerError({ + err: error + }); + sentry.captureException(ghostError); + logging.error(ghostError); + throw ghostError; + } + } finally { + // update all email recipients with a processed_at + await models.EmailRecipient + .where({batch_id: emailBatchId}) + .save({processed_at: moment()}, Object.assign({}, knexOptions, {patch: true})); + } + + return result; + }, + /** - * @param {Email} message - The message to send - * @param {[EmailAddress]} recipients - the recipients to send the email to - * @param {[object]} recipientData - list of data keyed by email to inject into the email - * @returns {Promise>} An array of promises representing the success of the batch email sending + * @param {Email-like} emailData - The email to send, must be a POJO so emailModel.toJSON() before calling if needed + * @param {[EmailRecipient]} recipients - The recipients to send the email to with their associated data + * @returns {Object} - {providerId: 'xxx'} */ - async send(message, recipients, recipientData = {}) { - let BATCH_SIZE = 1000; + send(emailData, recipients) { const mailgunInstance = mailgunProvider.getInstance(); if (!mailgunInstance) { return; } - let fromAddress = message.from; - if (/@localhost$/.test(message.from) || /@ghost.local$/.test(message.from)) { - fromAddress = 'localhost@example.com'; - logging.warn(`Rewriting bulk email from address ${message.from} to ${fromAddress}`); - BATCH_SIZE = 2; - } + const startTime = Date.now(); + debug(`sending message to ${recipients.length} recipients`); - const blogTitle = settingsCache.get('title') ? settingsCache.get('title').replace(/"/g, '\\"') : ''; - let supportAddress = message.supportAddress; - delete message.supportAddress; - const replyAddressOption = settingsCache.get('members_reply_address'); - const replyToAddress = (replyAddressOption === 'support') ? supportAddress : fromAddress; - fromAddress = blogTitle ? `"${blogTitle}"<${fromAddress}>` : fromAddress; + const replacements = postEmailSerializer.parseReplacements(emailData); - const chunkedRecipients = _.chunk(recipients, BATCH_SIZE); - - return Promise.map(chunkedRecipients, (toAddresses, chunkIndex) => { - const recipientVariables = {}; - toAddresses.forEach((email) => { - recipientVariables[email] = recipientData[email]; - }); - - const batchData = { - to: toAddresses, - from: fromAddress, - 'h:Reply-To': replyToAddress || fromAddress, - 'recipient-variables': recipientVariables + // collate static and dynamic data for each recipient ready for provider + const recipientData = {}; + recipients.forEach((recipient) => { + // static data for every recipient + const data = { + unique_id: recipient.member_uuid, + unsubscribe_url: postEmailSerializer.createUnsubscribeUrl(recipient.member_uuid) }; - const bulkEmailConfig = configService.get('bulkEmail'); + // computed properties on recipients - TODO: better way of handling these + recipient.member_first_name = (recipient.member_name || '').split(' ')[0]; - if (bulkEmailConfig && bulkEmailConfig.mailgun && bulkEmailConfig.mailgun.tag) { - Object.assign(batchData, { - 'o:tag': [bulkEmailConfig.mailgun.tag, 'bulk-email'] - }); - } - - if (bulkEmailConfig && bulkEmailConfig.mailgun && bulkEmailConfig.mailgun.testmode) { - Object.assign(batchData, { - 'o:testmode': true - }); - } - - const messageData = Object.assign({}, message, batchData); - - // Rename plaintext field to text for Mailgun - messageData.text = messageData.plaintext; - delete messageData.plaintext; - - return new Promise((resolve) => { - const batchStartTime = Date.now(); - debug(`sending message batch ${chunkIndex + 1} to ${toAddresses.length}`); - mailgunInstance.messages().send(messageData, (error, body) => { - if (error) { - // NOTE: logging an error here only but actual handling should happen in more sophisticated batch retry handler - // REF: possible mailgun errors https://documentation.mailgun.com/en/latest/api-intro.html#errors - let ghostError = new errors.EmailError({ - err: error, - context: i18n.t('errors.services.mega.requestFailed.error') - }); - - sentry.captureException(ghostError); - logging.warn(ghostError); - - // NOTE: these are generated variables, so can be regenerated when retry is done - const data = _.omit(batchData, ['recipient-variables']); - debug(`failed message batch ${chunkIndex + 1} (${Date.now() - batchStartTime}ms)`); - resolve(new FailedBatch(error, data)); - } else { - debug(`sent message batch ${chunkIndex + 1} (${Date.now() - batchStartTime}ms)`); - resolve(new SuccessfulBatch(body)); - } - }); + // dynamic data from replacements + replacements.forEach((id, recipientProp, fallback) => { + data[id] = recipient[recipientProp] || fallback || ''; }); - }, {concurrency: 10}); + + recipientData[recipient.member_email] = data; + }); + + return mailgunProvider.send(emailData, recipientData, replacements).then((response) => { + debug(`sent message (${Date.now() - startTime}ms)`); + return response; + }).catch((error) => { + // REF: possible mailgun errors https://documentation.mailgun.com/en/latest/api-intro.html#errors + let ghostError = new errors.EmailError({ + err: error, + context: i18n.t('errors.services.mega.requestFailed.error'), + code: 'BULK_EMAIL_SEND_FAILED' + }); + + sentry.captureException(ghostError); + logging.warn(ghostError); + + debug(`failed to send message (${Date.now() - startTime}ms)`); + throw ghostError; + }); } }; diff --git a/core/server/services/bulk-email/mailgun.js b/core/server/services/bulk-email/mailgun.js index 710487b2d1..75d90cd35f 100644 --- a/core/server/services/bulk-email/mailgun.js +++ b/core/server/services/bulk-email/mailgun.js @@ -1,9 +1,12 @@ +const _ = require('lodash'); const {URL} = require('url'); const mailgun = require('mailgun-js'); const logging = require('../../../shared/logging'); const configService = require('../../../shared/config'); const settingsCache = require('../settings/cache'); +const BATCH_SIZE = 1000; + function createMailgun(config) { const baseUrl = new URL(config.baseUrl); @@ -31,16 +34,76 @@ function getInstance() { if (!hasMailgunConfig && !hasMailgunSetting) { logging.warn(`Bulk email service is not configured`); } else { - try { - let mailgunConfig = hasMailgunConfig ? bulkEmailConfig.mailgun : bulkEmailSetting; - return createMailgun(mailgunConfig); - } catch (err) { - logging.warn(`Bulk email service is not configured`); - } + let mailgunConfig = hasMailgunConfig ? bulkEmailConfig.mailgun : bulkEmailSetting; + return createMailgun(mailgunConfig); } return null; } +// recipients format: +// { +// 'test@example.com': { +// name: 'Test User', +// unique_id: '12345abcde', +// unsubscribe_url: 'https://example.com/unsub/me' +// } +// } +function send(message, recipientData, replacements) { + if (recipientData.length > BATCH_SIZE) { + // err - too many recipients + } + + let messageData = {}; + + try { + const bulkEmailConfig = configService.get('bulkEmail'); + const mailgunInstance = getInstance(); + + const messageContent = _.pick(message, 'html', 'plaintext'); + + // update content to use Mailgun variable syntax for replacements + replacements.forEach((replacement) => { + messageContent[replacement.format] = messageContent[replacement.format].replace( + replacement.match, + `%recipient.${replacement.id}%` + ); + }); + + messageData = { + toAddresses: Object.keys(recipientData), + from: message.from, + 'h:Reply-To': message.replyTo, + 'recipient-variables': recipientData, + html: messageContent.html, + text: messageContent.plaintext + }; + + if (bulkEmailConfig && bulkEmailConfig.mailgun && bulkEmailConfig.mailgun.tag) { + messageData['o:tag'] = [bulkEmailConfig.mailgun.tag, 'bulk-email']; + } + + if (bulkEmailConfig && bulkEmailConfig.mailgun && bulkEmailConfig.mailgun.testmode) { + messageData['o:testmode'] = true; + } + + return new Promise((resolve, reject) => { + mailgunInstance.messages().send(messageData, (error, body) => { + if (error) { + return reject(error); + } + + return resolve({ + id: body.id + }); + }); + }); + } catch (error) { + return Promise.reject({error, messageData}); + } +} + module.exports = { - getInstance: getInstance + BATCH_SIZE, + getInstance, + send }; diff --git a/core/server/services/mega/mega.js b/core/server/services/mega/mega.js index 0554a1d413..f82c0d7742 100644 --- a/core/server/services/mega/mega.js +++ b/core/server/services/mega/mega.js @@ -2,74 +2,79 @@ const _ = require('lodash'); const debug = require('ghost-ignition').debug('mega'); const url = require('url'); const moment = require('moment'); -const ObjectId = require('bson-objectid'); +const ObjectID = require('bson-objectid'); const errors = require('@tryghost/errors'); const {events, i18n} = require('../../lib/common'); const logging = require('../../../shared/logging'); +const settingsCache = require('../settings/cache'); const membersService = require('../members'); const bulkEmailService = require('../bulk-email'); const jobService = require('../jobs'); -const models = require('../../models'); const db = require('../../data/db'); +const models = require('../../models'); const postEmailSerializer = require('./post-email-serializer'); -const getEmailData = async (postModel, memberRows = []) => { - const startTime = Date.now(); - debug(`getEmailData: starting for ${memberRows.length} members`); - const {emailTmpl, replacements} = await postEmailSerializer.serialize(postModel); +const getFromAddress = () => { + let fromAddress = membersService.config.getEmailFromAddress(); - emailTmpl.from = membersService.config.getEmailFromAddress(); - emailTmpl.supportAddress = membersService.config.getEmailSupportAddress(); + if (/@localhost$/.test(fromAddress) || /@ghost.local$/.test(fromAddress)) { + const localAddress = 'localhost@example.com'; + logging.warn(`Rewriting bulk email from address ${fromAddress} to ${localAddress}`); + fromAddress = localAddress; + } - // update templates to use Mailgun variable syntax for replacements - replacements.forEach((replacement) => { - emailTmpl[replacement.format] = emailTmpl[replacement.format].replace( - replacement.match, - `%recipient.${replacement.id}%` - ); - }); + const siteTitle = settingsCache.get('title') ? settingsCache.get('title').replace(/"/g, '\\"') : ''; - const emails = []; - const emailData = {}; - memberRows.forEach((memberRow) => { - emails.push(memberRow.email); - - // first_name is a computed property only used here for now - // TODO: move into model computed property or output serializer? - memberRow.first_name = (memberRow.name || '').split(' ')[0]; - - // add static data to mailgun template variables - const data = { - unique_id: memberRow.uuid, - unsubscribe_url: postEmailSerializer.createUnsubscribeUrl(memberRow.uuid) - }; - - // add replacement data/requested fallback to mailgun template variables - replacements.forEach(({id, memberProp, fallback}) => { - data[id] = memberRow[memberProp] || fallback || ''; - }); - - emailData[memberRow.email] = data; - }); - - debug(`getEmailData: done (${Date.now() - startTime}ms)`); - return {emailTmpl, emails, emailData}; + return siteTitle ? `"${siteTitle}"<${fromAddress}>` : fromAddress; }; -const sendEmail = async (postModel, memberRows) => { - const {emailTmpl, emails, emailData} = await getEmailData(postModel, memberRows); +const getReplyToAddress = () => { + const fromAddress = membersService.config.getEmailFromAddress(); + const supportAddress = membersService.config.getEmailSupportAddress(); + const replyAddressOption = settingsCache.get('members_reply_address'); - return bulkEmailService.send(emailTmpl, emails, emailData); + return (replyAddressOption === 'support') ? supportAddress : fromAddress; +}; + +const getEmailData = async (postModel, options) => { + const {subject, html, plaintext} = await postEmailSerializer.serialize(postModel, options); + + return { + subject, + html, + plaintext, + from: getFromAddress(), + replyTo: getReplyToAddress() + }; }; const sendTestEmail = async (postModel, toEmails) => { + const emailData = await getEmailData(postModel); + emailData.subject = `[Test] ${emailData.subject}`; + + // fetch any matching members so that replacements use expected values const recipients = await Promise.all(toEmails.map(async (email) => { const member = await membersService.api.members.get({email}); - return member || new models.Member({email}); + if (member) { + return { + member_uuid: member.get('id'), + member_email: member.get('email'), + member_name: member.get('name') + }; + } + + return { + member_email: email + }; })); - const {emailTmpl, emails, emailData} = await getEmailData(postModel, recipients); - emailTmpl.subject = `[Test] ${emailTmpl.subject}`; - return bulkEmailService.send(emailTmpl, emails, emailData); + + const response = await bulkEmailService.send(emailData, recipients); + + if (response instanceof bulkEmailService.FailedBatch) { + return Promise.reject(response.error); + } + + return response; }; /** @@ -105,21 +110,17 @@ const addEmail = async (postModel, options) => { if (!existing) { // get email contents and perform replacements using no member data so // we have a decent snapshot of email content for later display - const {emailTmpl, replacements} = await postEmailSerializer.serialize(postModel, {isBrowserPreview: true}); - replacements.forEach((replacement) => { - emailTmpl[replacement.format] = emailTmpl[replacement.format].replace( - replacement.match, - replacement.fallback || '' - ); - }); + const emailData = await getEmailData(postModel); return models.Email.add({ post_id: postId, status: 'pending', email_count: membersCount, - subject: emailTmpl.subject, - html: emailTmpl.html, - plaintext: emailTmpl.plaintext, + subject: emailData.subject, + from: emailData.from, + reply_to: emailData.replyTo, + html: emailData.html, + plaintext: emailData.plaintext, submitted_at: moment().toDate() }, knexOptions); } else { @@ -132,13 +133,13 @@ const addEmail = async (postModel, options) => { * * Accepts an Email model and resets it's fields to trigger retry listeners * - * @param {object} model Email model + * @param {Email} emailModel Email model */ -const retryFailedEmail = async (model) => { +const retryFailedEmail = async (emailModel) => { return await models.Email.edit({ status: 'pending' }, { - id: model.get('id') + id: emailModel.get('id') }); }; @@ -189,106 +190,6 @@ async function handleUnsubscribeRequest(req) { } } -async function sendEmailJob({emailModel, options}) { - const postModel = await models.Post.findOne({id: emailModel.get('post_id')}, {withRelated: ['authors']}); - let meta = []; - let error = null; - let startEmailSend = null; - - try { - // Check host limit for allowed member count and throw error if over limit - await membersService.checkHostLimit(); - - const knexOptions = _.pick(options, ['transacting', 'forUpdate']); - // TODO: this will clobber a user-assigned filter if/when we allow emails to be sent to filtered member lists - const filterOptions = Object.assign({}, knexOptions, {filter: 'subscribed:true'}); - - if (postModel.get('visibility') === 'paid') { - filterOptions.paid = true; - } - - const startRetrieve = Date.now(); - debug('getEmailMemberRows: retrieving members list'); - const memberRows = await models.Member.getFilteredCollectionQuery(filterOptions); - debug(`getEmailMemberRows: retrieved members list - ${memberRows.length} members (${Date.now() - startRetrieve}ms)`); - - if (!memberRows.length) { - return; - } - - await models.Email.edit({ - status: 'submitting' - }, { - id: emailModel.id - }); - - debug('pendingEmailHandler: storing recipient list'); - const startOfRecipientStorage = Date.now(); - const storeRecipientBatch = async function (recipients) { - let batchModel = await models.EmailBatch.add({email_id: emailModel.id}, knexOptions); - - // use knex rather than bookshelf to avoid overhead and event loop blocking - // when instantiating large numbers of bookshelf model objects - const recipientData = recipients.map((memberRow) => { - return { - id: ObjectId.generate(), - email_id: emailModel.id, - member_id: memberRow.id, - batch_id: batchModel.id, - member_uuid: memberRow.uuid, - member_email: memberRow.email, - member_name: memberRow.name - }; - }); - return await db.knex('email_recipients').insert(recipientData); - }; - await Promise.each(_.chunk(memberRows, 1000), storeRecipientBatch); - debug(`pendingEmailHandler: stored recipient list (${Date.now() - startOfRecipientStorage}ms)`); - - // NOTE: meta contains an array which can be a mix of successful and error responses - // needs filtering and saving objects of {error, batchData} form to separate property - debug('pendingEmailHandler: sending email'); - startEmailSend = Date.now(); - meta = await sendEmail(postModel, memberRows); - debug(`pendingEmailHandler: sent email (${Date.now() - startEmailSend}ms)`); - } catch (err) { - if (startEmailSend) { - debug(`pendingEmailHandler: send email failed (${Date.now() - startEmailSend}ms)`); - } - logging.error(new errors.GhostError({ - err: err, - context: i18n.t('errors.services.mega.requestFailed.error') - })); - error = err.message; - } - - const successes = meta.filter(response => (response instanceof bulkEmailService.SuccessfulBatch)); - const failures = meta.filter(response => (response instanceof bulkEmailService.FailedBatch)); - const batchStatus = successes.length ? 'submitted' : 'failed'; - - if (!error && failures.length) { - error = failures[0].error.message; - } - - if (error && error.length > 2000) { - error = error.substring(0, 2000); - } - - try { - // CASE: the batch partially succeeded - await models.Email.edit({ - status: batchStatus, - meta: JSON.stringify(successes), - error: error, - error_data: JSON.stringify(failures) // NOTE: need to discuss how we store this - }, { - id: emailModel.id - }); - } catch (err) { - logging.error(err); - } -} - async function pendingEmailHandler(emailModel, options) { // CASE: do not send email if we import a database // TODO: refactor post.published events to never fire on importing @@ -303,6 +204,102 @@ async function pendingEmailHandler(emailModel, options) { return jobService.addJob(sendEmailJob, {emailModel, options}); } +async function sendEmailJob({emailModel, options}) { + let startEmailSend = null; + + try { + // Check host limit for allowed member count and throw error if over limit + // - do this even if it's a retry so that there's no way around the limit + await membersService.checkHostLimit(); + + // Create email batch and recipient rows unless this is a retry and they already exist + const existingBatchCount = await emailModel.related('emailBatches').count(); + if (existingBatchCount === 0) { + const newBatchCount = await createEmailBatches({emailModel, options}); + + if (newBatchCount === 0) { + return; + } + } + + debug('sendEmailJob: sending email'); + startEmailSend = Date.now(); + await bulkEmailService.processEmail({emailId: emailModel.get('id'), options}); + debug(`sendEmailJob: sent email (${Date.now() - startEmailSend}ms)`); + } catch (err) { + if (startEmailSend) { + debug(`sendEmailJob: send email failed (${Date.now() - startEmailSend}ms)`); + } + logging.error(new errors.GhostError({ + err: err, + context: i18n.t('errors.services.mega.requestFailed.error') + })); + } +} + +// Fetch rows of members that should receive an email. +// Uses knex directly rather than bookshelf to avoid thousands of bookshelf model +// instantiations and associated processing and event loop blocking +async function getEmailMemberRows({emailModel, options}) { + const knexOptions = _.pick(options, ['transacting', 'forUpdate']); + const postModel = await models.Post.findOne({id: emailModel.get('post_id')}, knexOptions); + + // TODO: this will clobber a user-assigned filter if/when we allow emails to be sent to filtered member lists + const filterOptions = Object.assign({}, knexOptions, {filter: 'subscribed:true'}); + + if (postModel.get('visibility') === 'paid') { + filterOptions.paid = true; + } + + const startRetrieve = Date.now(); + debug('getEmailMemberRows: retrieving members list'); + const memberRows = await models.Member.getFilteredCollectionQuery(filterOptions); + debug(`getEmailMemberRows: retrieved members list - ${memberRows.length} members (${Date.now() - startRetrieve}ms)`); + + return memberRows; +} + +// Store email_batch and email_recipient records for an email. +// Uses knex directly rather than bookshelf to avoid thousands of bookshelf model +// instantiations and associated processing and event loop blocking. +// Returns array of batch ids +async function createEmailBatches({emailModel, options}) { + const memberRows = await getEmailMemberRows({emailModel, options}); + + if (!memberRows.length) { + return []; + } + + const storeRecipientBatch = async function (recipients) { + const knexOptions = _.pick(options, ['transacting', 'forUpdate']); + const batchModel = await models.EmailBatch.add({email_id: emailModel.id}, knexOptions); + + const recipientData = recipients.map((memberRow) => { + return { + id: ObjectID.generate(), + email_id: emailModel.id, + member_id: memberRow.id, + batch_id: batchModel.id, + member_uuid: memberRow.uuid, + member_email: memberRow.email, + member_name: memberRow.name + }; + }); + + await db.knex('email_recipients').insert(recipientData); + + return batchModel.id; + }; + + debug('createEmailBatches: storing recipient list'); + const startOfRecipientStorage = Date.now(); + const batches = _.chunk(memberRows, bulkEmailService.BATCH_SIZE); + const batchIds = await Promise.mapSeries(batches, storeRecipientBatch); + debug(`createEmailBatches: stored recipient list (${Date.now() - startOfRecipientStorage}ms)`); + + return batchIds; +} + const statusChangedHandler = (emailModel, options) => { const emailRetried = emailModel.wasChanged() && emailModel.get('status') === 'pending' diff --git a/core/server/services/mega/post-email-serializer.js b/core/server/services/mega/post-email-serializer.js index a47e85cb7b..69c93be32d 100644 --- a/core/server/services/mega/post-email-serializer.js +++ b/core/server/services/mega/post-email-serializer.js @@ -1,3 +1,4 @@ +const _ = require('lodash'); const juice = require('juice'); const template = require('./template'); const config = require('../../../shared/config'); @@ -10,6 +11,8 @@ const {URL} = require('url'); const mobiledocLib = require('../../lib/mobiledoc'); const htmlToText = require('html-to-text'); +const ALLOWED_REPLACEMENTS = ['first_name']; + const getSite = () => { const publicSettings = settingsCache.getPublic(); return Object.assign({}, publicSettings, { @@ -55,33 +58,22 @@ const serializePostModel = async (model) => { return frame.response[docName][0]; }; -// parses templates and extracts an array of replacements with desired fallbacks -// removes %% wrappers from unknown replacement strings (modifies emailTmpl in place) -const _parseReplacements = (emailTmpl) => { - const EMAIL_REPLACEMENT_REGEX = /%%(\{.*?\})%%/g; - // the " is necessary here because `juice` will convert "->" for email compatibility - const REPLACEMENT_STRING_REGEX = /\{(?\w*?)(?:,? *(?:"|")(?.*?)(?:"|"))?\}/; - const ALLOWED_REPLACEMENTS = ['first_name']; +// removes %% wrappers from unknown replacement strings in email content +const normalizeReplacementStrings = (email) => { + // we don't want to modify the email object in-place + const emailContent = _.pick(email, ['html', 'plaintext']); + + const EMAIL_REPLACEMENT_REGEX = /%%(\{.*?\})%%/g; + const REPLACEMENT_STRING_REGEX = /\{(?\w*?)(?:,? *(?:"|")(?.*?)(?:"|"))?\}/; - const replacements = []; ['html', 'plaintext'].forEach((format) => { - emailTmpl[format] = emailTmpl[format].replace(EMAIL_REPLACEMENT_REGEX, (replacementMatch, replacementStr) => { + emailContent[format] = emailContent[format].replace(EMAIL_REPLACEMENT_REGEX, (replacementMatch, replacementStr) => { const match = replacementStr.match(REPLACEMENT_STRING_REGEX); if (match) { - const {memberProp, fallback} = match.groups; - - if (ALLOWED_REPLACEMENTS.includes(memberProp)) { - const id = `replacement_${replacements.length + 1}`; - - replacements.push({ - format, - id, - match: replacementMatch, - memberProp, - fallback - }); + const {recipientProperty} = match.groups; + if (ALLOWED_REPLACEMENTS.includes(recipientProperty)) { // keeps wrapping %% for later replacement with real data return replacementMatch; } @@ -92,6 +84,40 @@ const _parseReplacements = (emailTmpl) => { }); }); + return emailContent; +}; + +// parses email content and extracts an array of replacements with desired fallbacks +const parseReplacements = (email) => { + const EMAIL_REPLACEMENT_REGEX = /%%(\{.*?\})%%/g; + const REPLACEMENT_STRING_REGEX = /\{(?\w*?)(?:,? *(?:"|")(?.*?)(?:"|"))?\}/; + + const replacements = []; + + ['html', 'plaintext'].forEach((format) => { + let result; + while ((result = EMAIL_REPLACEMENT_REGEX.exec(email[format])) !== null) { + const [replacementMatch, replacementStr] = result; + const match = replacementStr.match(REPLACEMENT_STRING_REGEX); + + if (match) { + const {recipientProperty, fallback} = match.groups; + + if (ALLOWED_REPLACEMENTS.includes(recipientProperty)) { + const id = `replacement_${replacements.length + 1}`; + + replacements.push({ + format, + id, + match: replacementMatch, + recipientProperty: `member_${recipientProperty}`, + fallback + }); + } + } + } + }); + return replacements; }; @@ -150,19 +176,21 @@ const serialize = async (postModel, options = {isBrowserPreview: false}) => { // Fix any unsupported chars in Outlook juicedHtml = juicedHtml.replace(/'/g, '''); - const emailTmpl = { - subject: post.email_subject || post.title, + // Clean up any unknown replacements strings to get our final content + const {html, plaintext} = normalizeReplacementStrings({ html: juicedHtml, plaintext: post.plaintext + }); + + return { + subject: post.email_subject || post.title, + html, + plaintext }; - - // Extract known replacements and clean up unknown replacement strings - const replacements = _parseReplacements(emailTmpl); - - return {emailTmpl, replacements}; }; module.exports = { serialize, - createUnsubscribeUrl + createUnsubscribeUrl, + parseReplacements };