0
Fork 0
mirror of https://github.com/TryGhost/Ghost.git synced 2025-01-20 22:42:53 -05:00
ghost/core/server/services/mega/mega.js
Naz fc458efa90 Added periodic "emails" limit check when (re)sending a newsletter
refs https://github.com/TryGhost/Team/issues/588
refs d72ba77aba

- When limit is in place we don't want to allow sending out a new batch of emails if it would go over limit
- See referenced commit for example configuration
2021-05-07 18:45:14 +04:00

427 lines
14 KiB
JavaScript

const _ = require('lodash');
const Promise = require('bluebird');
const debug = require('ghost-ignition').debug('mega');
const url = require('url');
const moment = require('moment');
const ObjectID = require('bson-objectid');
const errors = require('@tryghost/errors');
const events = require('../../lib/common/events');
const i18n = require('../../../shared/i18n');
const logging = require('../../../shared/logging');
const settingsCache = require('../settings/cache');
const membersService = require('../members');
const limitService = require('../limits');
const bulkEmailService = require('../bulk-email');
const jobsService = require('../jobs');
const db = require('../../data/db');
const models = require('../../models');
const postEmailSerializer = require('./post-email-serializer');
const getFromAddress = () => {
let fromAddress = membersService.config.getEmailFromAddress();
if (/@localhost$/.test(fromAddress) || /@ghost.local$/.test(fromAddress)) {
const localAddress = 'localhost@example.com';
logging.warn(`Rewriting bulk email from address ${fromAddress} to ${localAddress}`);
fromAddress = localAddress;
}
const siteTitle = settingsCache.get('title') ? settingsCache.get('title').replace(/"/g, '\\"') : '';
return siteTitle ? `"${siteTitle}"<${fromAddress}>` : fromAddress;
};
const getReplyToAddress = () => {
const fromAddress = membersService.config.getEmailFromAddress();
const supportAddress = membersService.config.getEmailSupportAddress();
const replyAddressOption = settingsCache.get('members_reply_address');
return (replyAddressOption === 'support') ? supportAddress : fromAddress;
};
/**
*
* @param {Object} postModel - post model instance
* @param {Object} options
* @param {ValidAPIVersion} options.apiVersion - api version to be used when serializing email data
*/
const getEmailData = async (postModel, options) => {
const {subject, html, plaintext} = await postEmailSerializer.serialize(postModel, options);
return {
subject,
html,
plaintext,
from: getFromAddress(),
replyTo: getReplyToAddress()
};
};
/**
*
* @param {Object} postModel - post model instance
* @param {[string]} toEmails - member email addresses to send email to
* @param {ValidAPIVersion} options.apiVersion - api version to be used when serializing email data
*/
const sendTestEmail = async (postModel, toEmails, apiVersion) => {
const emailData = await getEmailData(postModel, {apiVersion});
emailData.subject = `[Test] ${emailData.subject}`;
// fetch any matching members so that replacements use expected values
const recipients = await Promise.all(toEmails.map(async (email) => {
const member = await membersService.api.members.get({email});
if (member) {
return {
member_uuid: member.get('id'),
member_email: member.get('email'),
member_name: member.get('name')
};
}
return {
member_email: email
};
}));
// enable tracking for previews to match real-world behaviour
emailData.track_opens = !!settingsCache.get('email_track_opens');
const response = await bulkEmailService.send(emailData, recipients);
if (response instanceof bulkEmailService.FailedBatch) {
return Promise.reject(response.error);
}
return response;
};
/**
* addEmail
*
* Accepts a post model and creates an email record based on it. Only creates one
* record per post
*
* @param {object} postModel Post Model Object
* @param {object} options
* @param {ValidAPIVersion} options.apiVersion - api version to be used when serializing email data
*/
const addEmail = async (postModel, options) => {
if (limitService.isLimited('emails')) {
await limitService.errorIfWouldGoOverLimit('emails');
}
const knexOptions = _.pick(options, ['transacting', 'forUpdate']);
const filterOptions = Object.assign({}, knexOptions, {limit: 1});
const emailRecipientFilter = postModel.get('email_recipient_filter');
switch (emailRecipientFilter) {
// `paid` and `free` were swapped out for NQL filters in 4.5.0, we shouldn't see them here now
case 'paid':
case 'free':
throw new Error(`Unexpected email_recipient_filter value "${emailRecipientFilter}", expected an NQL equivalent`);
case 'all':
filterOptions.filter = 'subscribed:true';
break;
case 'none':
throw new Error('Cannot sent email to "none" email_recipient_filter');
default:
filterOptions.filter = `subscribed:true+${emailRecipientFilter}`;
}
const startRetrieve = Date.now();
debug('addEmail: retrieving members count');
const {meta: {pagination: {total: membersCount}}} = await membersService.api.members.list(Object.assign({}, knexOptions, filterOptions));
debug(`addEmail: retrieved members count - ${membersCount} members (${Date.now() - startRetrieve}ms)`);
// NOTE: don't create email object when there's nobody to send the email to
if (membersCount === 0) {
return null;
}
if (limitService.isLimited('emails')) {
await limitService.errorIfWouldGoOverLimit('emails', {addedCount: membersCount});
}
const postId = postModel.get('id');
const existing = await models.Email.findOne({post_id: postId}, knexOptions);
if (!existing) {
// get email contents and perform replacements using no member data so
// we have a decent snapshot of email content for later display
const emailData = await getEmailData(postModel, options);
return models.Email.add({
post_id: postId,
status: 'pending',
email_count: membersCount,
subject: emailData.subject,
from: emailData.from,
reply_to: emailData.replyTo,
html: emailData.html,
plaintext: emailData.plaintext,
submitted_at: moment().toDate(),
track_opens: !!settingsCache.get('email_track_opens'),
recipient_filter: emailRecipientFilter
}, knexOptions);
} else {
return existing;
}
};
/**
* retryFailedEmail
*
* Accepts an Email model and resets it's fields to trigger retry listeners
*
* @param {Email} emailModel Email model
*/
const retryFailedEmail = async (emailModel) => {
return await models.Email.edit({
status: 'pending'
}, {
id: emailModel.get('id')
});
};
/**
* handleUnsubscribeRequest
*
* Takes a request/response pair and reads the `unsubscribe` query parameter,
* using the content to update the members service to set the `subscribed` flag
* to false on the member
*
* If any operation fails, or the request is invalid the function will error - so using
* as middleware should consider wrapping with `try/catch`
*
* @param {Request} req
* @returns {Promise<void>}
*/
async function handleUnsubscribeRequest(req) {
if (!req.url) {
throw new errors.BadRequestError({
message: 'Unsubscribe failed! Could not find member'
});
}
const {query} = url.parse(req.url, true);
if (!query || !query.uuid) {
throw new errors.BadRequestError({
message: (query.preview ? 'Unsubscribe preview' : 'Unsubscribe failed! Could not find member')
});
}
const member = await membersService.api.members.get({
uuid: query.uuid
});
if (!member) {
throw new errors.BadRequestError({
message: 'Unsubscribe failed! Could not find member'
});
}
try {
const memberModel = await membersService.api.members.update({subscribed: false}, {id: member.id});
return memberModel.toJSON();
} catch (err) {
throw new errors.InternalServerError({
err,
message: 'Failed to unsubscribe member'
});
}
}
async function pendingEmailHandler(emailModel, options) {
// CASE: do not send email if we import a database
// TODO: refactor post.published events to never fire on importing
if (options && options.importing) {
return;
}
if (emailModel.get('status') !== 'pending') {
return;
}
// make sure recurring background analytics jobs are running once we have emails
const emailAnalyticsJobs = require('../email-analytics/jobs');
emailAnalyticsJobs.scheduleRecurringJobs();
return jobsService.addJob({
job: sendEmailJob,
data: {emailModel},
offloaded: false
});
}
async function sendEmailJob({emailModel, options}) {
let startEmailSend = null;
try {
// Check host limit for allowed member count and throw error if over limit
// - do this even if it's a retry so that there's no way around the limit
if (limitService.isLimited('members')) {
await limitService.errorIfIsOverLimit('members');
}
// Check host limit for disabled emails or going over emails limit
if (limitService.isLimited('emails')) {
await limitService.errorIfWouldGoOverLimit('emails');
}
// Create email batch and recipient rows unless this is a retry and they already exist
const existingBatchCount = await emailModel.related('emailBatches').count('id');
if (existingBatchCount === 0) {
let newBatchCount;
await models.Base.transaction(async (transacting) => {
newBatchCount = await createEmailBatches({emailModel, options: {transacting}});
});
if (newBatchCount === 0) {
return;
}
}
debug('sendEmailJob: sending email');
startEmailSend = Date.now();
await bulkEmailService.processEmail({emailId: emailModel.get('id'), options});
debug(`sendEmailJob: sent email (${Date.now() - startEmailSend}ms)`);
} catch (error) {
if (startEmailSend) {
debug(`sendEmailJob: send email failed (${Date.now() - startEmailSend}ms)`);
}
let errorMessage = error.message;
if (errorMessage.length > 2000) {
errorMessage = errorMessage.substring(0, 2000);
}
await emailModel.save({
status: 'failed',
error: errorMessage
}, {patch: true});
throw new errors.GhostError({
err: error,
context: i18n.t('errors.services.mega.requestFailed.error')
});
}
}
// Fetch rows of members that should receive an email.
// Uses knex directly rather than bookshelf to avoid thousands of bookshelf model
// instantiations and associated processing and event loop blocking
async function getEmailMemberRows({emailModel, options}) {
const knexOptions = _.pick(options, ['transacting', 'forUpdate']);
const filterOptions = Object.assign({}, knexOptions);
const recipientFilter = emailModel.get('recipient_filter');
switch (recipientFilter) {
// `paid` and `free` were swapped out for NQL filters in 4.5.0, we shouldn't see them here now
case 'paid':
case 'free':
throw new Error(`Unexpected recipient_filter value "${recipientFilter}", expected an NQL equivalent`);
case 'all':
filterOptions.filter = 'subscribed:true';
break;
case 'none':
throw new Error('Cannot sent email to "none" recipient_filter');
default:
filterOptions.filter = `subscribed:true+${recipientFilter}`;
}
const startRetrieve = Date.now();
debug('getEmailMemberRows: retrieving members list');
// select('members.*') is necessary here to avoid duplicate `email` columns in the result set
// without it we do `select *` which pulls in the Stripe customer email too which overrides the member email
const memberRows = await models.Member.getFilteredCollectionQuery(filterOptions).select('members.*').distinct();
debug(`getEmailMemberRows: retrieved members list - ${memberRows.length} members (${Date.now() - startRetrieve}ms)`);
return memberRows;
}
// Store email_batch and email_recipient records for an email.
// Uses knex directly rather than bookshelf to avoid thousands of bookshelf model
// instantiations and associated processing and event loop blocking.
// Returns array of batch ids
async function createEmailBatches({emailModel, options}) {
const memberRows = await getEmailMemberRows({emailModel, options});
if (!memberRows.length) {
return [];
}
const storeRecipientBatch = async function (recipients) {
const knexOptions = _.pick(options, ['transacting', 'forUpdate']);
const batchModel = await models.EmailBatch.add({email_id: emailModel.id}, knexOptions);
const recipientData = [];
recipients.forEach((memberRow) => {
if (!memberRow.id || !memberRow.uuid || !memberRow.email) {
logging.warn(`Member row not included as email recipient due to missing data - id: ${memberRow.id}, uuid: ${memberRow.uuid}, email: ${memberRow.email}`);
return;
}
recipientData.push({
id: ObjectID().toHexString(),
email_id: emailModel.id,
member_id: memberRow.id,
batch_id: batchModel.id,
member_uuid: memberRow.uuid,
member_email: memberRow.email,
member_name: memberRow.name
});
});
const insertQuery = db.knex('email_recipients').insert(recipientData);
if (knexOptions.transacting) {
insertQuery.transacting(knexOptions.transacting);
}
await insertQuery;
return batchModel.id;
};
debug('createEmailBatches: storing recipient list');
const startOfRecipientStorage = Date.now();
const batches = _.chunk(memberRows, bulkEmailService.BATCH_SIZE);
const batchIds = await Promise.mapSeries(batches, storeRecipientBatch);
debug(`createEmailBatches: stored recipient list (${Date.now() - startOfRecipientStorage}ms)`);
return batchIds;
}
const statusChangedHandler = (emailModel, options) => {
const emailRetried = emailModel.wasChanged()
&& emailModel.get('status') === 'pending'
&& emailModel.previous('status') === 'failed';
if (emailRetried) {
pendingEmailHandler(emailModel, options);
}
};
function listen() {
events.on('email.added', pendingEmailHandler);
events.on('email.edited', statusChangedHandler);
}
// Public API
module.exports = {
listen,
addEmail,
retryFailedEmail,
sendTestEmail,
handleUnsubscribeRequest
};
/**
* @typedef {'v2' | 'v3' | 'v4' | 'canary' } ValidAPIVersion
*/