mirror of
https://github.com/TryGhost/Ghost.git
synced 2025-01-06 22:40:14 -05:00
Added batched members import API method
no issue - New Member API batched import is meant to be a substitution to current import with improved performance while keeping same behaviore. Current import processes 1 record at a time using internal API calls and times out consistently when large number of members has to be imported (~10k records without Stripe). - New import's aim is to improve performance and process >50K records without timing out both with and without Stripe connected members - Batched import can be conceptually devided into 3 stages which have their own ways to improve performance: 1. labels - can be at current performance as number of labels is usually small, but could also be improved through batching 2. member records + member<->labels relations - these could be performed as batched inserts into the database 3. Stripe connections - most challanging bottleneck to solve because API request are slow by it's nature and have to deal with rate limits of Stripe's API itself - It's a heavy WIP, with lots of known pitfalls which are marked with TODOs. Will be solved iteratively through time untill the method can be declared stable - The new batched import method will be hidden behind 'enableDeveloperExperiments' flag to allow early testing
This commit is contained in:
parent
b61ccf0889
commit
bbcc0f5178
1 changed files with 287 additions and 0 deletions
|
@ -1,7 +1,9 @@
|
||||||
// NOTE: We must not cache references to membersService.api
|
// NOTE: We must not cache references to membersService.api
|
||||||
// as it is a getter and may change during runtime.
|
// as it is a getter and may change during runtime.
|
||||||
const Promise = require('bluebird');
|
const Promise = require('bluebird');
|
||||||
|
const ObjectId = require('bson-objectid');
|
||||||
const moment = require('moment-timezone');
|
const moment = require('moment-timezone');
|
||||||
|
const uuid = require('uuid');
|
||||||
const errors = require('@tryghost/errors');
|
const errors = require('@tryghost/errors');
|
||||||
const config = require('../../../shared/config');
|
const config = require('../../../shared/config');
|
||||||
const models = require('../../models');
|
const models = require('../../models');
|
||||||
|
@ -67,6 +69,10 @@ const sanitizeInput = (members) => {
|
||||||
|
|
||||||
function serializeMemberLabels(labels) {
|
function serializeMemberLabels(labels) {
|
||||||
if (_.isString(labels)) {
|
if (_.isString(labels)) {
|
||||||
|
if (labels === '') {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
return [{
|
return [{
|
||||||
name: labels.trim()
|
name: labels.trim()
|
||||||
}];
|
}];
|
||||||
|
@ -560,6 +566,287 @@ const members = {
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
|
importCSVBatched: {
|
||||||
|
statusCode: 201,
|
||||||
|
permissions: {
|
||||||
|
method: 'add'
|
||||||
|
},
|
||||||
|
async query(frame) {
|
||||||
|
let imported = {
|
||||||
|
count: 0
|
||||||
|
};
|
||||||
|
let invalid = {
|
||||||
|
count: 0,
|
||||||
|
errors: []
|
||||||
|
};
|
||||||
|
let duplicateStripeCustomerIdCount = 0;
|
||||||
|
|
||||||
|
// NOTE: custom labels have to be created in advance otherwise there are conflicts
|
||||||
|
// when processing member creation in parallel later on in import process
|
||||||
|
const importSetLabels = serializeMemberLabels(frame.data.labels);
|
||||||
|
|
||||||
|
// NOTE: adding an import label allows for imports to be "undone" via bulk delete
|
||||||
|
let importLabel;
|
||||||
|
if (frame.data.members.length) {
|
||||||
|
const siteTimezone = settingsCache.get('timezone');
|
||||||
|
const name = `Import ${moment().tz(siteTimezone).format('YYYY-MM-DD HH:mm')}`;
|
||||||
|
const result = await findOrCreateLabels([{name}], frame.options);
|
||||||
|
importLabel = result[0] && result[0].toJSON();
|
||||||
|
|
||||||
|
importSetLabels.push(importLabel);
|
||||||
|
}
|
||||||
|
|
||||||
|
const importSetLabelModels = await findOrCreateLabels(importSetLabels, frame.options);
|
||||||
|
|
||||||
|
// NOTE: member-specific labels have to be pre-created as they cause conflicts when processed
|
||||||
|
// in parallel
|
||||||
|
const memberLabels = serializeMemberLabels(getUniqueMemberLabels(frame.data.members));
|
||||||
|
const memberLabelModels = await findOrCreateLabels(memberLabels, frame.options);
|
||||||
|
|
||||||
|
const allLabelModels = [...importSetLabelModels, ...memberLabelModels].filter(model => model !== undefined);
|
||||||
|
|
||||||
|
return Promise.resolve().then(async () => {
|
||||||
|
const sanitized = sanitizeInput(frame.data.members);
|
||||||
|
duplicateStripeCustomerIdCount = frame.data.members.length - sanitized.length;
|
||||||
|
invalid.count += duplicateStripeCustomerIdCount;
|
||||||
|
|
||||||
|
if (duplicateStripeCustomerIdCount) {
|
||||||
|
invalid.errors.push(new errors.ValidationError({
|
||||||
|
message: i18n.t('errors.api.members.duplicateStripeCustomerIds.message'),
|
||||||
|
context: i18n.t('errors.api.members.duplicateStripeCustomerIds.context'),
|
||||||
|
help: i18n.t('errors.api.members.duplicateStripeCustomerIds.help')
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
const CHUNK_SIZE = 100;
|
||||||
|
const memberBatches = _.chunk(sanitized, CHUNK_SIZE);
|
||||||
|
|
||||||
|
return Promise.map(memberBatches, async (membersBatch) => {
|
||||||
|
const mappedMemberBatchData = [];
|
||||||
|
const mappedMembersLabelsBatchAssociations = [];
|
||||||
|
const membersWithStripeCustomers = [];
|
||||||
|
const membersWithComplimentaryPlans = [];
|
||||||
|
|
||||||
|
membersBatch.forEach((entry) => {
|
||||||
|
cleanupUndefined(entry);
|
||||||
|
|
||||||
|
let subscribed;
|
||||||
|
if (_.isUndefined(entry.subscribed_to_emails)) {
|
||||||
|
// model default
|
||||||
|
subscribed = 'true';
|
||||||
|
} else {
|
||||||
|
subscribed = (String(entry.subscribed_to_emails).toLowerCase() !== 'false');
|
||||||
|
}
|
||||||
|
|
||||||
|
entry.labels = (entry.labels && entry.labels.split(',')) || [];
|
||||||
|
const entryLabels = serializeMemberLabels(entry.labels);
|
||||||
|
const mergedLabels = _.unionBy(entryLabels, importSetLabels, 'name');
|
||||||
|
|
||||||
|
let createdAt = entry.created_at === '' ? undefined : entry.created_at;
|
||||||
|
|
||||||
|
if (createdAt) {
|
||||||
|
const date = new Date(createdAt);
|
||||||
|
|
||||||
|
// CASE: client sends `0000-00-00 00:00:00`
|
||||||
|
if (isNaN(date)) {
|
||||||
|
// TODO: throw in validation stage for single record, not whole batch!
|
||||||
|
throw new errors.ValidationError({
|
||||||
|
message: i18n.t('errors.models.base.invalidDate', {key: 'created_at'}),
|
||||||
|
code: 'DATE_INVALID'
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
createdAt = moment(createdAt).toDate();
|
||||||
|
} else {
|
||||||
|
createdAt = new Date();
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE: redacted copy from models.Base module
|
||||||
|
const contextUser = (options) => {
|
||||||
|
options = options || {};
|
||||||
|
options.context = options.context || {};
|
||||||
|
|
||||||
|
if (options.context.user || models.Base.Model.isExternalUser(options.context.user)) {
|
||||||
|
return options.context.user;
|
||||||
|
} else if (options.context.integration) {
|
||||||
|
return models.Base.Model.internalUser;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const memberId = ObjectId.generate();
|
||||||
|
mappedMemberBatchData.push({
|
||||||
|
id: memberId,
|
||||||
|
uuid: uuid.v4(), // member model default
|
||||||
|
email: entry.email,
|
||||||
|
name: entry.name,
|
||||||
|
note: entry.note,
|
||||||
|
subscribed: subscribed,
|
||||||
|
created_at: createdAt,
|
||||||
|
created_by: String(contextUser(frame.options))
|
||||||
|
});
|
||||||
|
|
||||||
|
if (mergedLabels) {
|
||||||
|
mergedLabels.forEach((label) => {
|
||||||
|
const matchedLabel = allLabelModels.find(labelModel => labelModel.get('name') === label.name);
|
||||||
|
|
||||||
|
mappedMembersLabelsBatchAssociations.push({
|
||||||
|
id: ObjectId.generate(),
|
||||||
|
member_id: memberId,
|
||||||
|
label_id: matchedLabel.id,
|
||||||
|
sort_order: 0 //TODO: implementme
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (entry.stripe_customer_id) {
|
||||||
|
membersWithStripeCustomers.push({
|
||||||
|
stripe_customer_id: entry.stripe_customer_id,
|
||||||
|
id: memberId,
|
||||||
|
email: entry.email
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((String(entry.complimentary_plan).toLocaleLowerCase() === 'true')) {
|
||||||
|
membersWithComplimentaryPlans.push({
|
||||||
|
id: memberId,
|
||||||
|
email: entry.email
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
// TODO: below inserts most likely need to be wrapped into transaction
|
||||||
|
// to avoid creating orphaned member_labels connections
|
||||||
|
await db.knex('members')
|
||||||
|
.insert(mappedMemberBatchData);
|
||||||
|
|
||||||
|
await db.knex('members_labels')
|
||||||
|
.insert(mappedMembersLabelsBatchAssociations);
|
||||||
|
|
||||||
|
imported.count += mappedMemberBatchData.length;
|
||||||
|
} catch (error) {
|
||||||
|
logging.error(error);
|
||||||
|
|
||||||
|
if (error.code && error.message.toLowerCase().indexOf('unique') !== -1) {
|
||||||
|
invalid.errors.push(new errors.ValidationError({
|
||||||
|
message: i18n.t('errors.api.members.memberAlreadyExists.message'),
|
||||||
|
context: i18n.t('errors.api.members.memberAlreadyExists.context')
|
||||||
|
}));
|
||||||
|
} else {
|
||||||
|
// NOTE: probably need to wrap this error into something more specific e.g. ImportError
|
||||||
|
invalid.errors.push(error);
|
||||||
|
}
|
||||||
|
|
||||||
|
invalid.count += mappedMemberBatchData.length;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (membersWithStripeCustomers.length || membersWithComplimentaryPlans.length) {
|
||||||
|
const deleteMemberKnex = async (id) => {
|
||||||
|
// TODO: cascading wont work on SQLite needs 2 separate deletes
|
||||||
|
// for members_labels and members wrapped into a transaction
|
||||||
|
const deletedMembersCount = await db.knex('members')
|
||||||
|
.where('id', id)
|
||||||
|
.del();
|
||||||
|
|
||||||
|
if (deletedMembersCount) {
|
||||||
|
imported.count -= deletedMembersCount;
|
||||||
|
invalid.count += deletedMembersCount;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if (!membersService.config.isStripeConnected()) {
|
||||||
|
const memberIdsToDestroy = _.uniq([
|
||||||
|
...membersWithStripeCustomers.map(m => m.id),
|
||||||
|
...membersWithComplimentaryPlans.map(m => m.id)
|
||||||
|
]);
|
||||||
|
|
||||||
|
// TODO: cascading wont work on SQLite needs 2 separate deletes
|
||||||
|
// for members_labels and members wrapped into a transaction
|
||||||
|
const deleteMembersCount = await db.knex('members')
|
||||||
|
.whereIn('id', memberIdsToDestroy)
|
||||||
|
.del();
|
||||||
|
|
||||||
|
imported.count -= deleteMembersCount;
|
||||||
|
invalid.count += deleteMembersCount;
|
||||||
|
invalid.errors.push(new errors.ValidationError({
|
||||||
|
message: i18n.t('errors.api.members.stripeNotConnected.message'),
|
||||||
|
context: i18n.t('errors.api.members.stripeNotConnected.context'),
|
||||||
|
help: i18n.t('errors.api.members.stripeNotConnected.help')
|
||||||
|
}));
|
||||||
|
} else {
|
||||||
|
if (membersWithStripeCustomers.length) {
|
||||||
|
await Promise.map(membersWithStripeCustomers, async (stripeMember) => {
|
||||||
|
try {
|
||||||
|
await membersService.api.members.linkStripeCustomer(stripeMember.stripe_customer_id, stripeMember);
|
||||||
|
} catch (error) {
|
||||||
|
if (error.message.indexOf('customer') && error.code === 'resource_missing') {
|
||||||
|
error.message = `Member not imported. ${error.message}`;
|
||||||
|
error.context = i18n.t('errors.api.members.stripeCustomerNotFound.context');
|
||||||
|
error.help = i18n.t('errors.api.members.stripeCustomerNotFound.help');
|
||||||
|
}
|
||||||
|
logging.error(error);
|
||||||
|
invalid.errors.push(error);
|
||||||
|
|
||||||
|
await deleteMemberKnex(stripeMember.id);
|
||||||
|
}
|
||||||
|
}, {
|
||||||
|
concurrency: 10
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (membersWithComplimentaryPlans.length) {
|
||||||
|
await Promise.map(membersWithComplimentaryPlans, async (complimentaryMember) => {
|
||||||
|
try {
|
||||||
|
await membersService.api.members.setComplimentarySubscription(complimentaryMember);
|
||||||
|
} catch (error) {
|
||||||
|
logging.error(error);
|
||||||
|
invalid.errors.push(error);
|
||||||
|
await deleteMemberKnex(complimentaryMember.id);
|
||||||
|
}
|
||||||
|
}, {
|
||||||
|
concurrency: 10 // TODO: check if this concurrency level doesn't fail rate limits
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}).then(() => {
|
||||||
|
// NOTE: grouping by context because messages can contain unique data like "customer_id"
|
||||||
|
const groupedErrors = _.groupBy(invalid.errors, 'context');
|
||||||
|
const uniqueErrors = _.uniqBy(invalid.errors, 'context');
|
||||||
|
|
||||||
|
const outputErrors = uniqueErrors.map((error) => {
|
||||||
|
let errorGroup = groupedErrors[error.context];
|
||||||
|
let errorCount = errorGroup.length;
|
||||||
|
|
||||||
|
if (error.message === i18n.t('errors.api.members.duplicateStripeCustomerIds.message')) {
|
||||||
|
errorCount = duplicateStripeCustomerIdCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE: filtering only essential error information, so API doesn't leak more error details than it should
|
||||||
|
return {
|
||||||
|
message: error.message,
|
||||||
|
context: error.context,
|
||||||
|
help: error.help,
|
||||||
|
count: errorCount
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
invalid.errors = outputErrors;
|
||||||
|
|
||||||
|
return {
|
||||||
|
meta: {
|
||||||
|
stats: {
|
||||||
|
imported,
|
||||||
|
invalid
|
||||||
|
},
|
||||||
|
import_label: importLabel
|
||||||
|
}
|
||||||
|
};
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
stats: {
|
stats: {
|
||||||
options: [
|
options: [
|
||||||
'days'
|
'days'
|
||||||
|
|
Loading…
Reference in a new issue