From cada88975a992735afd961263a41e53c3a041741 Mon Sep 17 00:00:00 2001 From: Fabien 'egg' O'Carroll Date: Tue, 18 Aug 2020 13:39:45 +0100 Subject: [PATCH] Updated bulk importer to improve performance (#12128) no-issue * Added bulkAdd method to Member,Customer&Subscription model This allows us to keep the db access in the model layer * Updated @tryghost/members-api to 0.27.2 This includes fixes for rate-limiting of requests, and exposes necessary Stripe methods for creating customers and complimentary subscriptions, without affecting the database. * Refactored importer to parallelise tasks where possible By parallelising our tasks we are able to improve the speed at which the entire import completes. --- core/server/api/canary/members.js | 5 +- core/server/models/member-stripe-customer.js | 27 ++ core/server/models/member.js | 26 + .../models/stripe-customer-subscription.js | 27 ++ .../server/services/members/importer/index.js | 458 ++++++++++++------ 5 files changed, 389 insertions(+), 154 deletions(-) diff --git a/core/server/api/canary/members.js b/core/server/api/canary/members.js index 92c0412b10..9a58ae3aee 100644 --- a/core/server/api/canary/members.js +++ b/core/server/api/canary/members.js @@ -651,7 +651,10 @@ module.exports = { invalid, createdBy }); - }).then(() => { + }).then((result) => { + invalid.errors = invalid.errors.concat(result.invalid.errors); + invalid.count += result.invalid.count; + imported.count += result.imported.count; // NOTE: grouping by context because messages can contain unique data like "customer_id" const groupedErrors = _.groupBy(invalid.errors, 'context'); const uniqueErrors = _.uniqBy(invalid.errors, 'context'); diff --git a/core/server/models/member-stripe-customer.js b/core/server/models/member-stripe-customer.js index 70976b2548..761168dba1 100644 --- a/core/server/models/member-stripe-customer.js +++ b/core/server/models/member-stripe-customer.js @@ -1,3 +1,4 @@ +const _ = require('lodash'); const ghostBookshelf = require('./base'); const MemberStripeCustomer = ghostBookshelf.Model.extend({ @@ -28,6 +29,32 @@ const MemberStripeCustomer = ghostBookshelf.Model.extend({ return this.add(data, unfilteredOptions); }, + async bulkAdd(data, unfilteredOptions = {}) { + if (!unfilteredOptions.transacting) { + return ghostBookshelf.transaction((transacting) => { + return this.bulkAdd(data, Object.assign({transacting}, unfilteredOptions)); + }); + } + const result = { + successful: 0, + unsuccessful: 0, + errors: [] + }; + + const CHUNK_SIZE = 100; + + for (const chunk of _.chunk(data, CHUNK_SIZE)) { + try { + await ghostBookshelf.knex(this.prototype.tableName).insert(chunk); + result.successful += chunk.length; + } catch (err) { + result.unsuccessful += chunk.length; + result.errors.push(err); + } + } + return result; + }, + add(data, unfilteredOptions = {}) { if (!unfilteredOptions.transacting) { return ghostBookshelf.transaction((transacting) => { diff --git a/core/server/models/member.js b/core/server/models/member.js index 6650c16f9f..4c777d3bac 100644 --- a/core/server/models/member.js +++ b/core/server/models/member.js @@ -259,6 +259,32 @@ const Member = ghostBookshelf.Model.extend({ return options; }, + async bulkAdd(data, unfilteredOptions = {}) { + if (!unfilteredOptions.transacting) { + return ghostBookshelf.transaction((transacting) => { + return this.bulkAdd(data, Object.assign({transacting}, unfilteredOptions)); + }); + } + const result = { + successful: 0, + unsuccessful: 0, + errors: [] + }; + + const CHUNK_SIZE = 100; + + for (const chunk of _.chunk(data, CHUNK_SIZE)) { + try { + await ghostBookshelf.knex(this.prototype.tableName).insert(chunk); + result.successful += chunk.length; + } catch (err) { + result.unsuccessful += chunk.length; + result.errors.push(err); + } + } + return result; + }, + add(data, unfilteredOptions = {}) { if (!unfilteredOptions.transacting) { return ghostBookshelf.transaction((transacting) => { diff --git a/core/server/models/stripe-customer-subscription.js b/core/server/models/stripe-customer-subscription.js index 6ad94664e2..8bb5e5736b 100644 --- a/core/server/models/stripe-customer-subscription.js +++ b/core/server/models/stripe-customer-subscription.js @@ -1,3 +1,4 @@ +const _ = require('lodash'); const ghostBookshelf = require('./base'); const CURRENCY_SYMBOLS = { @@ -53,6 +54,32 @@ const StripeCustomerSubscription = ghostBookshelf.Model.extend({ })); } return this.add(data, unfilteredOptions); + }, + + async bulkAdd(data, unfilteredOptions = {}) { + if (!unfilteredOptions.transacting) { + return ghostBookshelf.transaction((transacting) => { + return this.bulkAdd(data, Object.assign({transacting}, unfilteredOptions)); + }); + } + const result = { + successful: 0, + unsuccessful: 0, + errors: [] + }; + + const CHUNK_SIZE = 100; + + for (const chunk of _.chunk(data, CHUNK_SIZE)) { + try { + await ghostBookshelf.knex(this.prototype.tableName).insert(chunk); + result.successful += chunk.length; + } catch (err) { + result.unsuccessful += chunk.length; + result.errors.push(err); + } + } + return result; } }); diff --git a/core/server/services/members/importer/index.js b/core/server/services/members/importer/index.js index 10a5518443..65a0725498 100644 --- a/core/server/services/members/importer/index.js +++ b/core/server/services/members/importer/index.js @@ -4,50 +4,175 @@ const ObjectId = require('bson-objectid'); const moment = require('moment-timezone'); const errors = require('@tryghost/errors'); const membersService = require('../index'); +const models = require('../../../models'); const {i18n} = require('../../../lib/common'); const db = require('../../../data/db'); const logging = require('../../../../shared/logging'); -const cleanupUndefined = (obj) => { - for (let key in obj) { - if (obj[key] === 'undefined') { - delete obj[key]; +const doImport = async ({members, allLabelModels, importSetLabels, createdBy}) => { + const createInserter = table => data => insert(db.knex, table, data); + const createDeleter = table => data => del(db.knex, table, data); + + const deleteMembers = createDeleter('members'); + const insertLabelAssociations = createInserter('members_labels'); + + const { + invalidMembers, + membersToInsert, + stripeCustomersToFetch, + stripeCustomersToCreate, + labelAssociationsToInsert + } = getMemberData({members, allLabelModels, importSetLabels, createdBy}); + + const fetchedStripeCustomersPromise = fetchStripeCustomers(stripeCustomersToFetch); + const createdStripeCustomersPromise = createStripeCustomers(stripeCustomersToCreate); + const insertedMembersPromise = models.Member.bulkAdd(membersToInsert); + + const insertedLabelsPromise = insertedMembersPromise + .then(() => insertLabelAssociations(labelAssociationsToInsert)); + + const insertedCustomersPromise = Promise.all([ + fetchedStripeCustomersPromise, + createdStripeCustomersPromise, + insertedMembersPromise + ]).then( + ([fetchedStripeCustomers, createdStripeCustomers]) => models.MemberStripeCustomer.bulkAdd( + fetchedStripeCustomers.customersToInsert.concat(createdStripeCustomers.customersToInsert) + ) + ); + + const insertedSubscriptionsPromise = Promise.all([ + fetchedStripeCustomersPromise, + createdStripeCustomersPromise, + insertedCustomersPromise + ]).then( + ([fetchedStripeCustomers, createdStripeCustomers]) => models.StripeCustomerSubscription.bulkAdd( + fetchedStripeCustomers.subscriptionsToInsert.concat(createdStripeCustomers.subscriptionsToInsert) + ) + ); + + const deletedMembersPromise = Promise.all([ + fetchedStripeCustomersPromise, + createdStripeCustomersPromise, + insertedMembersPromise + ]).then( + ([fetchedStripeCustomers, createdStripeCustomers]) => deleteMembers( + fetchedStripeCustomers.membersToDelete.concat(createdStripeCustomers.membersToDelete) + ) + ); + + // This looks sequential, but at the point insertedCustomersPromise has resolved so have all the others + const insertedSubscriptions = await insertedSubscriptionsPromise; + const insertedCustomers = await insertedCustomersPromise; + const insertedMembers = await insertedMembersPromise; + const deletedMembers = await deletedMembersPromise; + const fetchedCustomers = await fetchedStripeCustomersPromise; + const insertedLabels = await insertedLabelsPromise; + + const result = { + imported: { + count: insertedMembers.successful - deletedMembers.successful + }, + invalid: { + count: insertedMembers.unsuccessful + deletedMembers.unsuccessful + invalidMembers.length, + errors: [ + ...insertedMembers.errors, + ...insertedCustomers.errors, + ...insertedSubscriptions.errors, + ...insertedLabels.errors, + ...fetchedCustomers.errors + ] } - } + }; + + // Allow logging to happen outside of the request cycle + process.nextTick(() => { + // @TODO wrap errors with validation errors (or whatever is reasonable) + result.invalid.errors.forEach(err => logging.error(err)); + }); + + return result; }; -function serializeMemberLabels(labels) { - if (_.isString(labels)) { - if (labels === '') { - return []; - } +const CHUNK_SIZE = 100; - return [{ - name: labels.trim() - }]; - } else if (labels) { - return labels.filter((label) => { - return !!label; - }).map((label) => { - if (_.isString(label)) { - return { - name: label.trim() - }; - } - return label; - }); +async function insert(knex, table, data) { + const result = { + successful: 0, + unsuccessful: 0, + errors: [] + }; + for (const chunk of _.chunk(data, CHUNK_SIZE)) { + try { + await knex(table).insert(chunk); + result.successful += chunk.length; + } catch (error) { + result.unsuccessful += chunk.length; + result.errors.push(error); + } } - return []; + return result; } -const doImport = async ({members, allLabelModels, importSetLabels, imported, invalid, createdBy}) => { - const mappedMemberBatchData = []; - const mappedMembersLabelsBatchAssociations = []; - const membersWithStripeCustomers = []; - const membersWithComplimentaryPlans = []; +async function del(knex, table, ids) { + const result = { + successful: 0, + unsuccessful: 0, + errors: [] + }; + for (const chunk of _.chunk(ids, CHUNK_SIZE)) { + try { + await knex(table).whereIn('id', chunk).del(); + result.successful += chunk.length; + } catch (error) { + result.unsuccessful += chunk.length; + result.errors.push(error); + } + } + return result; +} - members.forEach((member) => { - cleanupUndefined(member); +function serializeMemberLabels(labels) { + return labels.reduce((labelsAcc, label) => { + if (!label) { + return labels; + } + if (typeof label !== 'string') { + return labelsAcc.concat(label.name); + } + return labelsAcc.concat(label); + }); +} + +function getMemberData({members, allLabelModels, importSetLabels, createdBy}) { + const labelIdLookup = allLabelModels.reduce(function (labelIdLookupAcc, labelModel) { + return Object.assign(labelIdLookupAcc, { + [labelModel.get('name')]: labelModel.id + }); + }, {}); + + const importedLabels = importSetLabels.map(label => label.name); + + const stripeIsConnected = membersService.config.isStripeConnected(); + + const invalidMembers = []; + const membersToInsert = []; + const stripeCustomersToFetch = []; + const stripeCustomersToCreate = []; + const labelAssociationsToInsert = []; + + members.forEach(function (member) { + if ((member.stripe_customer_id || member.comped) && !stripeIsConnected) { + invalidMembers.push(member); + return; + } + + // @TODO This is expensive, maybe we can just error if we get shoddy data? + for (let key in member) { + if (member[key] === 'undefined') { + delete member[key]; + } + } let subscribed; if (_.isUndefined(member.subscribed_to_emails)) { @@ -57,10 +182,6 @@ const doImport = async ({members, allLabelModels, importSetLabels, imported, inv subscribed = (String(member.subscribed_to_emails).toLowerCase() !== 'false'); } - member.labels = (member.labels && member.labels.split(',')) || []; - const entryLabels = serializeMemberLabels(member.labels); - const mergedLabels = _.unionBy(entryLabels, importSetLabels, 'name'); - let createdAt = member.created_at === '' ? undefined : member.created_at; if (createdAt) { @@ -80,9 +201,8 @@ const doImport = async ({members, allLabelModels, importSetLabels, imported, inv createdAt = new Date(); } - const memberId = ObjectId.generate(); - mappedMemberBatchData.push({ - id: memberId, + const memberToInsert = { + id: ObjectId.generate(), uuid: uuid.v4(), // member model default email: member.email, name: member.name, @@ -90,140 +210,172 @@ const doImport = async ({members, allLabelModels, importSetLabels, imported, inv subscribed: subscribed, created_at: createdAt, created_by: createdBy + }; + membersToInsert.push(memberToInsert); + + const memberLabels = serializeMemberLabels((member.labels || '').split(',')); + const allLabels = _.union(memberLabels, importedLabels); + + const memberLabelAssociationsToInsert = allLabels.map((label, index) => { + return { + id: ObjectId.generate(), + member_id: memberToInsert.id, + label_id: labelIdLookup[label], + sort_order: index + }; }); - - if (mergedLabels) { - mergedLabels.forEach((label, index) => { - const matchedLabel = allLabelModels.find(labelModel => labelModel.get('name') === label.name); - - mappedMembersLabelsBatchAssociations.push({ - id: ObjectId.generate(), - member_id: memberId, - label_id: matchedLabel.id, - sort_order: index - }); - }); - } + labelAssociationsToInsert.push(...memberLabelAssociationsToInsert); if (member.stripe_customer_id) { - membersWithStripeCustomers.push({ - stripe_customer_id: member.stripe_customer_id, - id: memberId, - email: member.email - }); + const stripeCustomerToFetch = { + customer_id: member.stripe_customer_id, + member_id: memberToInsert.id + }; + stripeCustomersToFetch.push(stripeCustomerToFetch); } - if ((String(member.complimentary_plan).toLocaleLowerCase() === 'true')) { - membersWithComplimentaryPlans.push({ - id: memberId, - email: member.email - }); + if (String(member.complimentary_plan).toLowerCase() === 'true') { + const stripeCustomerToCreate = { + member_id: memberToInsert.id, + name: memberToInsert.name, + email: memberToInsert.email + }; + stripeCustomersToCreate.push(stripeCustomerToCreate); } }); - try { - // TODO: below inserts most likely need to be wrapped into transaction - // to avoid creating orphaned member_labels connections - const CHUNK_SIZE = 5000; - const chunkedMembers = _.chunk(mappedMemberBatchData, CHUNK_SIZE); - for (const data of chunkedMembers) { - await db.knex('members') - .insert(data); - } + return { + invalidMembers, + membersToInsert, + stripeCustomersToFetch, + stripeCustomersToCreate, + labelAssociationsToInsert + }; +} - const chunkedLebelAssociations = _.chunk(mappedMembersLabelsBatchAssociations, CHUNK_SIZE); +async function createStripeCustomers(stripeCustomersToCreate) { + const result = { + errors: [], + customersToInsert: [], + subscriptionsToInsert: [], + membersToDelete: [] + }; - for (const data of chunkedLebelAssociations) { - await db.knex('members_labels') - .insert(data); - } + await Promise.all(stripeCustomersToCreate.map(async function createStripeCustomer(customerToCreate) { + try { + const customer = await membersService.api.members.createStripeCustomer({ + email: customerToCreate.email, + name: customerToCreate.name + }); - imported.count += mappedMemberBatchData.length; - } catch (error) { - logging.error(error); + result.customersToInsert.push({ + id: ObjectId.generate(), + member_id: customerToCreate.member_id, + customer_id: customer.id, + email: customer.email, + name: customer.name, + created_at: new Date(), + updated_at: new Date(), + created_by: 1, + updated_by: 1 + }); - if (error.code && error.message.toLowerCase().indexOf('unique') !== -1) { - invalid.errors.push(new errors.ValidationError({ - message: i18n.t('errors.api.members.memberAlreadyExists.message'), - context: i18n.t('errors.api.members.memberAlreadyExists.context') - })); - } else { - // NOTE: probably need to wrap this error into something more specific e.g. ImportError - invalid.errors.push(error); - } + const subscription = await membersService.api.members.createComplimentarySubscription(customer); - invalid.count += mappedMemberBatchData.length; - } - - if (membersWithStripeCustomers.length || membersWithComplimentaryPlans.length) { - const deleteMemberKnex = async (id) => { - // TODO: cascading wont work on SQLite needs 2 separate deletes - // for members_labels and members wrapped into a transaction - const deletedMembersCount = await db.knex('members') - .where('id', id) - .del(); - - if (deletedMembersCount) { - imported.count -= deletedMembersCount; - invalid.count += deletedMembersCount; + const payment = subscription.default_payment_method; + result.subscriptionsToInsert.push({ + id: ObjectId.generate(), + customer_id: customer.id, + subscription_id: subscription.id, + plan_id: subscription.plan.id, + status: subscription.status, + cancel_at_period_end: subscription.cancel_at_period_end, + current_period_end: new Date(subscription.current_period_end * 1000), + start_date: new Date(subscription.start_date * 1000), + default_payment_card_last4: payment && payment.card && payment.card.last4 || null, + plan_nickname: subscription.plan.nickname || '', + plan_interval: subscription.plan.interval, + plan_amount: subscription.plan.amount, + plan_currency: subscription.plan.currency, + created_at: new Date(), + updated_at: new Date(), + created_by: 1, + updated_by: 1 + }); + } catch (error) { + if (error.message.indexOf('customer') && error.code === 'resource_missing') { + error.message = `Member not imported. ${error.message}`; + error.context = i18n.t('errors.api.members.stripeCustomerNotFound.context'); + error.help = i18n.t('errors.api.members.stripeCustomerNotFound.help'); } - }; + result.errors.push(error); + result.membersToDelete.push(customerToCreate.member_id); + } + })); - if (!membersService.config.isStripeConnected()) { - const memberIdsToDestroy = _.uniq([ - ...membersWithStripeCustomers.map(m => m.id), - ...membersWithComplimentaryPlans.map(m => m.id) - ]); + return result; +} - // TODO: cascading wont work on SQLite needs 2 separate deletes - // for members_labels and members wrapped into a transaction - const deleteMembersCount = await db.knex('members') - .whereIn('id', memberIdsToDestroy) - .del(); +async function fetchStripeCustomers(stripeCustomersToInsert) { + const result = { + errors: [], + customersToInsert: [], + subscriptionsToInsert: [], + membersToDelete: [] + }; - imported.count -= deleteMembersCount; - invalid.count += deleteMembersCount; - invalid.errors.push(new errors.ValidationError({ - message: i18n.t('errors.api.members.stripeNotConnected.message'), - context: i18n.t('errors.api.members.stripeNotConnected.context'), - help: i18n.t('errors.api.members.stripeNotConnected.help') - })); - } else { - if (membersWithStripeCustomers.length) { - await Promise.map(membersWithStripeCustomers, async (stripeMember) => { - try { - await membersService.api.members.linkStripeCustomerById(stripeMember.stripe_customer_id, stripeMember.id); - } catch (error) { - if (error.message.indexOf('customer') && error.code === 'resource_missing') { - error.message = `Member not imported. ${error.message}`; - error.context = i18n.t('errors.api.members.stripeCustomerNotFound.context'); - error.help = i18n.t('errors.api.members.stripeCustomerNotFound.help'); - } - logging.error(error); - invalid.errors.push(error); + await Promise.all(stripeCustomersToInsert.map(async function fetchStripeCustomer(customer) { + try { + const fetchedCustomer = await membersService.api.members.getStripeCustomer(customer.customer_id, { + expand: ['subscriptions', 'subscriptions.data.default_payment_method'] + }); - await deleteMemberKnex(stripeMember.id); - } - }, { - concurrency: 9 - }); - } - - if (membersWithComplimentaryPlans.length) { - await Promise.map(membersWithComplimentaryPlans, async (complimentaryMember) => { - try { - await membersService.api.members.setComplimentarySubscriptionById(complimentaryMember.id); - } catch (error) { - logging.error(error); - invalid.errors.push(error); - await deleteMemberKnex(complimentaryMember.id); - } - }, { - concurrency: 10 // TODO: check if this concurrency level doesn't fail rate limits + result.customersToInsert.push({ + id: ObjectId.generate(), + member_id: customer.member_id, + customer_id: customer.customer_id, + email: fetchedCustomer.email, + name: fetchedCustomer.name, + created_at: new Date(), + updated_at: new Date(), + created_by: 1, + updated_by: 1 + }); + + fetchedCustomer.subscriptions.data.forEach((subscription) => { + const payment = subscription.default_payment_method; + result.subscriptionsToInsert.push({ + id: ObjectId.generate(), + customer_id: customer.customer_id, + subscription_id: subscription.id, + plan_id: subscription.plan.id, + status: subscription.status, + cancel_at_period_end: subscription.cancel_at_period_end, + current_period_end: new Date(subscription.current_period_end * 1000), + start_date: new Date(subscription.start_date * 1000), + default_payment_card_last4: payment && payment.card && payment.card.last4 || null, + plan_nickname: subscription.plan.nickname || '', + plan_interval: subscription.plan.interval, + plan_amount: subscription.plan.amount, + plan_currency: subscription.plan.currency, + created_at: new Date(), + updated_at: new Date(), + created_by: 1, + updated_by: 1 }); + }); + } catch (error) { + if (error.message.indexOf('customer') && error.code === 'resource_missing') { + error.message = `Member not imported. ${error.message}`; + error.context = i18n.t('errors.api.members.stripeCustomerNotFound.context'); + error.help = i18n.t('errors.api.members.stripeCustomerNotFound.help'); } + result.errors.push(error); + result.membersToDelete.push(customer.member_id); } - } -}; + })); + + return result; +} module.exports = doImport;