From bbcc0f5178327e6cee83a8f7814e815d2449431d Mon Sep 17 00:00:00 2001 From: Nazar Gargol Date: Thu, 6 Aug 2020 13:58:32 +1200 Subject: [PATCH] Added batched members import API method no issue - New Member API batched import is meant to be a substitution to current import with improved performance while keeping same behaviore. Current import processes 1 record at a time using internal API calls and times out consistently when large number of members has to be imported (~10k records without Stripe). - New import's aim is to improve performance and process >50K records without timing out both with and without Stripe connected members - Batched import can be conceptually devided into 3 stages which have their own ways to improve performance: 1. labels - can be at current performance as number of labels is usually small, but could also be improved through batching 2. member records + member<->labels relations - these could be performed as batched inserts into the database 3. Stripe connections - most challanging bottleneck to solve because API request are slow by it's nature and have to deal with rate limits of Stripe's API itself - It's a heavy WIP, with lots of known pitfalls which are marked with TODOs. Will be solved iteratively through time untill the method can be declared stable - The new batched import method will be hidden behind 'enableDeveloperExperiments' flag to allow early testing --- core/server/api/canary/members.js | 287 ++++++++++++++++++++++++++++++ 1 file changed, 287 insertions(+) diff --git a/core/server/api/canary/members.js b/core/server/api/canary/members.js index 691463a4dc..d810c03ef1 100644 --- a/core/server/api/canary/members.js +++ b/core/server/api/canary/members.js @@ -1,7 +1,9 @@ // NOTE: We must not cache references to membersService.api // as it is a getter and may change during runtime. const Promise = require('bluebird'); +const ObjectId = require('bson-objectid'); const moment = require('moment-timezone'); +const uuid = require('uuid'); const errors = require('@tryghost/errors'); const config = require('../../../shared/config'); const models = require('../../models'); @@ -67,6 +69,10 @@ const sanitizeInput = (members) => { function serializeMemberLabels(labels) { if (_.isString(labels)) { + if (labels === '') { + return []; + } + return [{ name: labels.trim() }]; @@ -560,6 +566,287 @@ const members = { } }, + importCSVBatched: { + statusCode: 201, + permissions: { + method: 'add' + }, + async query(frame) { + let imported = { + count: 0 + }; + let invalid = { + count: 0, + errors: [] + }; + let duplicateStripeCustomerIdCount = 0; + + // NOTE: custom labels have to be created in advance otherwise there are conflicts + // when processing member creation in parallel later on in import process + const importSetLabels = serializeMemberLabels(frame.data.labels); + + // NOTE: adding an import label allows for imports to be "undone" via bulk delete + let importLabel; + if (frame.data.members.length) { + const siteTimezone = settingsCache.get('timezone'); + const name = `Import ${moment().tz(siteTimezone).format('YYYY-MM-DD HH:mm')}`; + const result = await findOrCreateLabels([{name}], frame.options); + importLabel = result[0] && result[0].toJSON(); + + importSetLabels.push(importLabel); + } + + const importSetLabelModels = await findOrCreateLabels(importSetLabels, frame.options); + + // NOTE: member-specific labels have to be pre-created as they cause conflicts when processed + // in parallel + const memberLabels = serializeMemberLabels(getUniqueMemberLabels(frame.data.members)); + const memberLabelModels = await findOrCreateLabels(memberLabels, frame.options); + + const allLabelModels = [...importSetLabelModels, ...memberLabelModels].filter(model => model !== undefined); + + return Promise.resolve().then(async () => { + const sanitized = sanitizeInput(frame.data.members); + duplicateStripeCustomerIdCount = frame.data.members.length - sanitized.length; + invalid.count += duplicateStripeCustomerIdCount; + + if (duplicateStripeCustomerIdCount) { + invalid.errors.push(new errors.ValidationError({ + message: i18n.t('errors.api.members.duplicateStripeCustomerIds.message'), + context: i18n.t('errors.api.members.duplicateStripeCustomerIds.context'), + help: i18n.t('errors.api.members.duplicateStripeCustomerIds.help') + })); + } + + const CHUNK_SIZE = 100; + const memberBatches = _.chunk(sanitized, CHUNK_SIZE); + + return Promise.map(memberBatches, async (membersBatch) => { + const mappedMemberBatchData = []; + const mappedMembersLabelsBatchAssociations = []; + const membersWithStripeCustomers = []; + const membersWithComplimentaryPlans = []; + + membersBatch.forEach((entry) => { + cleanupUndefined(entry); + + let subscribed; + if (_.isUndefined(entry.subscribed_to_emails)) { + // model default + subscribed = 'true'; + } else { + subscribed = (String(entry.subscribed_to_emails).toLowerCase() !== 'false'); + } + + entry.labels = (entry.labels && entry.labels.split(',')) || []; + const entryLabels = serializeMemberLabels(entry.labels); + const mergedLabels = _.unionBy(entryLabels, importSetLabels, 'name'); + + let createdAt = entry.created_at === '' ? undefined : entry.created_at; + + if (createdAt) { + const date = new Date(createdAt); + + // CASE: client sends `0000-00-00 00:00:00` + if (isNaN(date)) { + // TODO: throw in validation stage for single record, not whole batch! + throw new errors.ValidationError({ + message: i18n.t('errors.models.base.invalidDate', {key: 'created_at'}), + code: 'DATE_INVALID' + }); + } + + createdAt = moment(createdAt).toDate(); + } else { + createdAt = new Date(); + } + + // NOTE: redacted copy from models.Base module + const contextUser = (options) => { + options = options || {}; + options.context = options.context || {}; + + if (options.context.user || models.Base.Model.isExternalUser(options.context.user)) { + return options.context.user; + } else if (options.context.integration) { + return models.Base.Model.internalUser; + } + }; + + const memberId = ObjectId.generate(); + mappedMemberBatchData.push({ + id: memberId, + uuid: uuid.v4(), // member model default + email: entry.email, + name: entry.name, + note: entry.note, + subscribed: subscribed, + created_at: createdAt, + created_by: String(contextUser(frame.options)) + }); + + if (mergedLabels) { + mergedLabels.forEach((label) => { + const matchedLabel = allLabelModels.find(labelModel => labelModel.get('name') === label.name); + + mappedMembersLabelsBatchAssociations.push({ + id: ObjectId.generate(), + member_id: memberId, + label_id: matchedLabel.id, + sort_order: 0 //TODO: implementme + }); + }); + } + + if (entry.stripe_customer_id) { + membersWithStripeCustomers.push({ + stripe_customer_id: entry.stripe_customer_id, + id: memberId, + email: entry.email + }); + } + + if ((String(entry.complimentary_plan).toLocaleLowerCase() === 'true')) { + membersWithComplimentaryPlans.push({ + id: memberId, + email: entry.email + }); + } + }); + + try { + // TODO: below inserts most likely need to be wrapped into transaction + // to avoid creating orphaned member_labels connections + await db.knex('members') + .insert(mappedMemberBatchData); + + await db.knex('members_labels') + .insert(mappedMembersLabelsBatchAssociations); + + imported.count += mappedMemberBatchData.length; + } catch (error) { + logging.error(error); + + if (error.code && error.message.toLowerCase().indexOf('unique') !== -1) { + invalid.errors.push(new errors.ValidationError({ + message: i18n.t('errors.api.members.memberAlreadyExists.message'), + context: i18n.t('errors.api.members.memberAlreadyExists.context') + })); + } else { + // NOTE: probably need to wrap this error into something more specific e.g. ImportError + invalid.errors.push(error); + } + + invalid.count += mappedMemberBatchData.length; + } + + if (membersWithStripeCustomers.length || membersWithComplimentaryPlans.length) { + const deleteMemberKnex = async (id) => { + // TODO: cascading wont work on SQLite needs 2 separate deletes + // for members_labels and members wrapped into a transaction + const deletedMembersCount = await db.knex('members') + .where('id', id) + .del(); + + if (deletedMembersCount) { + imported.count -= deletedMembersCount; + invalid.count += deletedMembersCount; + } + }; + + if (!membersService.config.isStripeConnected()) { + const memberIdsToDestroy = _.uniq([ + ...membersWithStripeCustomers.map(m => m.id), + ...membersWithComplimentaryPlans.map(m => m.id) + ]); + + // TODO: cascading wont work on SQLite needs 2 separate deletes + // for members_labels and members wrapped into a transaction + const deleteMembersCount = await db.knex('members') + .whereIn('id', memberIdsToDestroy) + .del(); + + imported.count -= deleteMembersCount; + invalid.count += deleteMembersCount; + invalid.errors.push(new errors.ValidationError({ + message: i18n.t('errors.api.members.stripeNotConnected.message'), + context: i18n.t('errors.api.members.stripeNotConnected.context'), + help: i18n.t('errors.api.members.stripeNotConnected.help') + })); + } else { + if (membersWithStripeCustomers.length) { + await Promise.map(membersWithStripeCustomers, async (stripeMember) => { + try { + await membersService.api.members.linkStripeCustomer(stripeMember.stripe_customer_id, stripeMember); + } catch (error) { + if (error.message.indexOf('customer') && error.code === 'resource_missing') { + error.message = `Member not imported. ${error.message}`; + error.context = i18n.t('errors.api.members.stripeCustomerNotFound.context'); + error.help = i18n.t('errors.api.members.stripeCustomerNotFound.help'); + } + logging.error(error); + invalid.errors.push(error); + + await deleteMemberKnex(stripeMember.id); + } + }, { + concurrency: 10 + }); + } + + if (membersWithComplimentaryPlans.length) { + await Promise.map(membersWithComplimentaryPlans, async (complimentaryMember) => { + try { + await membersService.api.members.setComplimentarySubscription(complimentaryMember); + } catch (error) { + logging.error(error); + invalid.errors.push(error); + await deleteMemberKnex(complimentaryMember.id); + } + }, { + concurrency: 10 // TODO: check if this concurrency level doesn't fail rate limits + }); + } + } + } + }); + }).then(() => { + // NOTE: grouping by context because messages can contain unique data like "customer_id" + const groupedErrors = _.groupBy(invalid.errors, 'context'); + const uniqueErrors = _.uniqBy(invalid.errors, 'context'); + + const outputErrors = uniqueErrors.map((error) => { + let errorGroup = groupedErrors[error.context]; + let errorCount = errorGroup.length; + + if (error.message === i18n.t('errors.api.members.duplicateStripeCustomerIds.message')) { + errorCount = duplicateStripeCustomerIdCount; + } + + // NOTE: filtering only essential error information, so API doesn't leak more error details than it should + return { + message: error.message, + context: error.context, + help: error.help, + count: errorCount + }; + }); + + invalid.errors = outputErrors; + + return { + meta: { + stats: { + imported, + invalid + }, + import_label: importLabel + } + }; + }); + } + }, + stats: { options: [ 'days'