diff --git a/core/server/models/base/index.js b/core/server/models/base/index.js index d2bbf504a0..744f249138 100644 --- a/core/server/models/base/index.js +++ b/core/server/models/base/index.js @@ -855,6 +855,20 @@ ghostBookshelf.Model = ghostBookshelf.Model.extend({ return filteredCollection; }, + getFilteredCollectionQuery: function getFilteredCollectionQuery(options) { + const filteredCollection = this.getFilteredCollection(options); + const filteredCollectionQuery = filteredCollection.query(); + + if (options.transacting) { + filteredCollectionQuery.transacting(options.transacting); + if (options.forUpdate) { + filteredCollectionQuery.forUpdate(); + } + } + + return filteredCollectionQuery; + }, + /** * ### Find All * Fetches all the data for a particular model diff --git a/core/server/models/email-batch.js b/core/server/models/email-batch.js new file mode 100644 index 0000000000..496fde08d0 --- /dev/null +++ b/core/server/models/email-batch.js @@ -0,0 +1,30 @@ +const ghostBookshelf = require('./base'); + +const EmailBatch = ghostBookshelf.Model.extend({ + tableName: 'email_batches', + + defaults() { + return { + status: 'pending' + }; + }, + + email() { + return this.belongsTo('Email', 'email_id'); + }, + recipients() { + return this.hasMany('EmailRecipient', 'batch_id'); + }, + members() { + return this.belongsToMany('Member', 'email_recipients', 'batch_id', 'member_id'); + } +}); + +const EmailBatches = ghostBookshelf.Collection.extend({ + model: EmailBatch +}); + +module.exports = { + EmailBatch: ghostBookshelf.model('EmailBatch', EmailBatch), + EmailBatches: ghostBookshelf.model('EmailBatches', EmailBatches) +}; diff --git a/core/server/models/email-recipient.js b/core/server/models/email-recipient.js new file mode 100644 index 0000000000..315216e3cc --- /dev/null +++ b/core/server/models/email-recipient.js @@ -0,0 +1,25 @@ +const ghostBookshelf = require('./base'); + +const EmailRecipient = ghostBookshelf.Model.extend({ + tableName: 'email_recipients', + hasTimestamps: false, + + email() { + return this.belongsTo('Email', 'email_id'); + }, + emailBatch() { + return this.belongsTo('EmailBatch', 'batch_id'); + }, + member() { + return this.belongsTo('Member', 'member_id'); + } +}); + +const EmailRecipients = ghostBookshelf.Collection.extend({ + model: EmailRecipient +}); + +module.exports = { + EmailRecipient: ghostBookshelf.model('EmailRecipient', EmailRecipient), + EmailRecipients: ghostBookshelf.model('EmailRecipients', EmailRecipients) +}; diff --git a/core/server/models/email.js b/core/server/models/email.js index 69950f7307..9793a93301 100644 --- a/core/server/models/email.js +++ b/core/server/models/email.js @@ -19,6 +19,16 @@ const Email = ghostBookshelf.Model.extend({ }; }, + post() { + return this.belongsTo('Post', 'post_id'); + }, + emailBatches() { + return this.hasMany('EmailBatch', 'email_id'); + }, + recipients() { + return this.hasMany('EmailRecipient', 'email_id'); + }, + emitChange: function emitChange(event, options) { const eventToTrigger = 'email' + '.' + event; ghostBookshelf.Model.prototype.emitChange.bind(this)(this, eventToTrigger, options); diff --git a/core/server/models/index.js b/core/server/models/index.js index 74b580819b..83e1ae4100 100644 --- a/core/server/models/index.js +++ b/core/server/models/index.js @@ -33,6 +33,8 @@ const models = [ 'member-stripe-customer', 'stripe-customer-subscription', 'email', + 'email-batch', + 'email-recipient', 'label', 'single-use-token' ]; diff --git a/core/server/services/mega/mega.js b/core/server/services/mega/mega.js index f5bddc7253..0554a1d413 100644 --- a/core/server/services/mega/mega.js +++ b/core/server/services/mega/mega.js @@ -2,6 +2,7 @@ const _ = require('lodash'); const debug = require('ghost-ignition').debug('mega'); const url = require('url'); const moment = require('moment'); +const ObjectId = require('bson-objectid'); const errors = require('@tryghost/errors'); const {events, i18n} = require('../../lib/common'); const logging = require('../../../shared/logging'); @@ -9,11 +10,12 @@ const membersService = require('../members'); const bulkEmailService = require('../bulk-email'); const jobService = require('../jobs'); const models = require('../../models'); +const db = require('../../data/db'); const postEmailSerializer = require('./post-email-serializer'); -const getEmailData = async (postModel, memberModels = []) => { +const getEmailData = async (postModel, memberRows = []) => { const startTime = Date.now(); - debug(`getEmailData: starting for ${memberModels.length} members`); + debug(`getEmailData: starting for ${memberRows.length} members`); const {emailTmpl, replacements} = await postEmailSerializer.serialize(postModel); emailTmpl.from = membersService.config.getEmailFromAddress(); @@ -29,33 +31,33 @@ const getEmailData = async (postModel, memberModels = []) => { const emails = []; const emailData = {}; - memberModels.forEach((memberModel) => { - emails.push(memberModel.get('email')); + memberRows.forEach((memberRow) => { + emails.push(memberRow.email); // first_name is a computed property only used here for now // TODO: move into model computed property or output serializer? - memberModel.first_name = (memberModel.get('name') || '').split(' ')[0]; + memberRow.first_name = (memberRow.name || '').split(' ')[0]; // add static data to mailgun template variables const data = { - unique_id: memberModel.uuid, - unsubscribe_url: postEmailSerializer.createUnsubscribeUrl(memberModel.get('uuid')) + unique_id: memberRow.uuid, + unsubscribe_url: postEmailSerializer.createUnsubscribeUrl(memberRow.uuid) }; // add replacement data/requested fallback to mailgun template variables replacements.forEach(({id, memberProp, fallback}) => { - data[id] = memberModel[memberProp] || fallback || ''; + data[id] = memberRow[memberProp] || fallback || ''; }); - emailData[memberModel.get('email')] = data; + emailData[memberRow.email] = data; }); debug(`getEmailData: done (${Date.now() - startTime}ms)`); return {emailTmpl, emails, emailData}; }; -const sendEmail = async (postModel, memberModels) => { - const {emailTmpl, emails, emailData} = await getEmailData(postModel, memberModels); +const sendEmail = async (postModel, memberRows) => { + const {emailTmpl, emails, emailData} = await getEmailData(postModel, memberRows); return bulkEmailService.send(emailTmpl, emails, emailData); }; @@ -197,20 +199,20 @@ async function sendEmailJob({emailModel, options}) { // Check host limit for allowed member count and throw error if over limit await membersService.checkHostLimit(); - // No need to fetch list until after we've passed the check const knexOptions = _.pick(options, ['transacting', 'forUpdate']); - const filterOptions = Object.assign({}, knexOptions, {filter: 'subscribed:true', limit: 'all'}); + // TODO: this will clobber a user-assigned filter if/when we allow emails to be sent to filtered member lists + const filterOptions = Object.assign({}, knexOptions, {filter: 'subscribed:true'}); if (postModel.get('visibility') === 'paid') { filterOptions.paid = true; } const startRetrieve = Date.now(); - debug('pendingEmailHandler: retrieving members list'); - const {data: members} = await membersService.api.members.list(Object.assign({}, knexOptions, filterOptions)); - debug(`pendingEmailHandler: retrieved members list - ${members.length} members (${Date.now() - startRetrieve}ms)`); + debug('getEmailMemberRows: retrieving members list'); + const memberRows = await models.Member.getFilteredCollectionQuery(filterOptions); + debug(`getEmailMemberRows: retrieved members list - ${memberRows.length} members (${Date.now() - startRetrieve}ms)`); - if (!members.length) { + if (!memberRows.length) { return; } @@ -220,11 +222,34 @@ async function sendEmailJob({emailModel, options}) { id: emailModel.id }); - // NOTE: meta can contains an array which can be a mix of successful and error responses + debug('pendingEmailHandler: storing recipient list'); + const startOfRecipientStorage = Date.now(); + const storeRecipientBatch = async function (recipients) { + let batchModel = await models.EmailBatch.add({email_id: emailModel.id}, knexOptions); + + // use knex rather than bookshelf to avoid overhead and event loop blocking + // when instantiating large numbers of bookshelf model objects + const recipientData = recipients.map((memberRow) => { + return { + id: ObjectId.generate(), + email_id: emailModel.id, + member_id: memberRow.id, + batch_id: batchModel.id, + member_uuid: memberRow.uuid, + member_email: memberRow.email, + member_name: memberRow.name + }; + }); + return await db.knex('email_recipients').insert(recipientData); + }; + await Promise.each(_.chunk(memberRows, 1000), storeRecipientBatch); + debug(`pendingEmailHandler: stored recipient list (${Date.now() - startOfRecipientStorage}ms)`); + + // NOTE: meta contains an array which can be a mix of successful and error responses // needs filtering and saving objects of {error, batchData} form to separate property debug('pendingEmailHandler: sending email'); startEmailSend = Date.now(); - meta = await sendEmail(postModel, members); + meta = await sendEmail(postModel, memberRows); debug(`pendingEmailHandler: sent email (${Date.now() - startEmailSend}ms)`); } catch (err) { if (startEmailSend) {