mirror of
https://github.com/TryGhost/Ghost.git
synced 2025-01-06 22:40:14 -05:00
Extracted batched member import into separate module
no issue - The code in controller was becoming hard to reason about. - Having a single module shows exactly how many dependencies are there to do an import for single batch. - Having a separate module would make it easier to extract into it's own package in Members monorepo
This commit is contained in:
parent
597bc12088
commit
c696d715c1
2 changed files with 243 additions and 188 deletions
|
@ -1,13 +1,12 @@
|
|||
// NOTE: We must not cache references to membersService.api
|
||||
// as it is a getter and may change during runtime.
|
||||
const Promise = require('bluebird');
|
||||
const ObjectId = require('bson-objectid');
|
||||
const moment = require('moment-timezone');
|
||||
const uuid = require('uuid');
|
||||
const errors = require('@tryghost/errors');
|
||||
const config = require('../../../shared/config');
|
||||
const models = require('../../models');
|
||||
const membersService = require('../../services/members');
|
||||
const doImport = require('../../services/members/importer');
|
||||
const settingsCache = require('../../services/settings/cache');
|
||||
const {i18n} = require('../../lib/common');
|
||||
const logging = require('../../../shared/logging');
|
||||
|
@ -581,6 +580,20 @@ const members = {
|
|||
};
|
||||
let duplicateStripeCustomerIdCount = 0;
|
||||
|
||||
// NOTE: redacted copy from models.Base module
|
||||
const contextUser = (options) => {
|
||||
options = options || {};
|
||||
options.context = options.context || {};
|
||||
|
||||
if (options.context.user || models.Base.Model.isExternalUser(options.context.user)) {
|
||||
return options.context.user;
|
||||
} else if (options.context.integration) {
|
||||
return models.Base.Model.internalUser;
|
||||
}
|
||||
};
|
||||
|
||||
const createdBy = contextUser(frame.options);
|
||||
|
||||
// NOTE: custom labels have to be created in advance otherwise there are conflicts
|
||||
// when processing member creation in parallel later on in import process
|
||||
const importSetLabels = serializeMemberLabels(frame.data.labels);
|
||||
|
@ -622,193 +635,14 @@ const members = {
|
|||
const memberBatches = _.chunk(sanitized, CHUNK_SIZE);
|
||||
|
||||
return Promise.map(memberBatches, async (membersBatch) => {
|
||||
const mappedMemberBatchData = [];
|
||||
const mappedMembersLabelsBatchAssociations = [];
|
||||
const membersWithStripeCustomers = [];
|
||||
const membersWithComplimentaryPlans = [];
|
||||
|
||||
membersBatch.forEach((entry) => {
|
||||
cleanupUndefined(entry);
|
||||
|
||||
let subscribed;
|
||||
if (_.isUndefined(entry.subscribed_to_emails)) {
|
||||
// model default
|
||||
subscribed = 'true';
|
||||
} else {
|
||||
subscribed = (String(entry.subscribed_to_emails).toLowerCase() !== 'false');
|
||||
}
|
||||
|
||||
entry.labels = (entry.labels && entry.labels.split(',')) || [];
|
||||
const entryLabels = serializeMemberLabels(entry.labels);
|
||||
const mergedLabels = _.unionBy(entryLabels, importSetLabels, 'name');
|
||||
|
||||
let createdAt = entry.created_at === '' ? undefined : entry.created_at;
|
||||
|
||||
if (createdAt) {
|
||||
const date = new Date(createdAt);
|
||||
|
||||
// CASE: client sends `0000-00-00 00:00:00`
|
||||
if (isNaN(date)) {
|
||||
// TODO: throw in validation stage for single record, not whole batch!
|
||||
throw new errors.ValidationError({
|
||||
message: i18n.t('errors.models.base.invalidDate', {key: 'created_at'}),
|
||||
code: 'DATE_INVALID'
|
||||
});
|
||||
}
|
||||
|
||||
createdAt = moment(createdAt).toDate();
|
||||
} else {
|
||||
createdAt = new Date();
|
||||
}
|
||||
|
||||
// NOTE: redacted copy from models.Base module
|
||||
const contextUser = (options) => {
|
||||
options = options || {};
|
||||
options.context = options.context || {};
|
||||
|
||||
if (options.context.user || models.Base.Model.isExternalUser(options.context.user)) {
|
||||
return options.context.user;
|
||||
} else if (options.context.integration) {
|
||||
return models.Base.Model.internalUser;
|
||||
}
|
||||
};
|
||||
|
||||
const memberId = ObjectId.generate();
|
||||
mappedMemberBatchData.push({
|
||||
id: memberId,
|
||||
uuid: uuid.v4(), // member model default
|
||||
email: entry.email,
|
||||
name: entry.name,
|
||||
note: entry.note,
|
||||
subscribed: subscribed,
|
||||
created_at: createdAt,
|
||||
created_by: String(contextUser(frame.options))
|
||||
});
|
||||
|
||||
if (mergedLabels) {
|
||||
mergedLabels.forEach((label, index) => {
|
||||
const matchedLabel = allLabelModels.find(labelModel => labelModel.get('name') === label.name);
|
||||
|
||||
mappedMembersLabelsBatchAssociations.push({
|
||||
id: ObjectId.generate(),
|
||||
member_id: memberId,
|
||||
label_id: matchedLabel.id,
|
||||
sort_order: index
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
if (entry.stripe_customer_id) {
|
||||
membersWithStripeCustomers.push({
|
||||
stripe_customer_id: entry.stripe_customer_id,
|
||||
id: memberId,
|
||||
email: entry.email
|
||||
});
|
||||
}
|
||||
|
||||
if ((String(entry.complimentary_plan).toLocaleLowerCase() === 'true')) {
|
||||
membersWithComplimentaryPlans.push({
|
||||
id: memberId,
|
||||
email: entry.email
|
||||
});
|
||||
}
|
||||
return doImport({
|
||||
membersBatch,
|
||||
allLabelModels,
|
||||
importSetLabels,
|
||||
imported,
|
||||
invalid,
|
||||
createdBy
|
||||
});
|
||||
|
||||
try {
|
||||
// TODO: below inserts most likely need to be wrapped into transaction
|
||||
// to avoid creating orphaned member_labels connections
|
||||
await db.knex('members')
|
||||
.insert(mappedMemberBatchData);
|
||||
|
||||
await db.knex('members_labels')
|
||||
.insert(mappedMembersLabelsBatchAssociations);
|
||||
|
||||
imported.count += mappedMemberBatchData.length;
|
||||
} catch (error) {
|
||||
logging.error(error);
|
||||
|
||||
if (error.code && error.message.toLowerCase().indexOf('unique') !== -1) {
|
||||
invalid.errors.push(new errors.ValidationError({
|
||||
message: i18n.t('errors.api.members.memberAlreadyExists.message'),
|
||||
context: i18n.t('errors.api.members.memberAlreadyExists.context')
|
||||
}));
|
||||
} else {
|
||||
// NOTE: probably need to wrap this error into something more specific e.g. ImportError
|
||||
invalid.errors.push(error);
|
||||
}
|
||||
|
||||
invalid.count += mappedMemberBatchData.length;
|
||||
}
|
||||
|
||||
if (membersWithStripeCustomers.length || membersWithComplimentaryPlans.length) {
|
||||
const deleteMemberKnex = async (id) => {
|
||||
// TODO: cascading wont work on SQLite needs 2 separate deletes
|
||||
// for members_labels and members wrapped into a transaction
|
||||
const deletedMembersCount = await db.knex('members')
|
||||
.where('id', id)
|
||||
.del();
|
||||
|
||||
if (deletedMembersCount) {
|
||||
imported.count -= deletedMembersCount;
|
||||
invalid.count += deletedMembersCount;
|
||||
}
|
||||
};
|
||||
|
||||
if (!membersService.config.isStripeConnected()) {
|
||||
const memberIdsToDestroy = _.uniq([
|
||||
...membersWithStripeCustomers.map(m => m.id),
|
||||
...membersWithComplimentaryPlans.map(m => m.id)
|
||||
]);
|
||||
|
||||
// TODO: cascading wont work on SQLite needs 2 separate deletes
|
||||
// for members_labels and members wrapped into a transaction
|
||||
const deleteMembersCount = await db.knex('members')
|
||||
.whereIn('id', memberIdsToDestroy)
|
||||
.del();
|
||||
|
||||
imported.count -= deleteMembersCount;
|
||||
invalid.count += deleteMembersCount;
|
||||
invalid.errors.push(new errors.ValidationError({
|
||||
message: i18n.t('errors.api.members.stripeNotConnected.message'),
|
||||
context: i18n.t('errors.api.members.stripeNotConnected.context'),
|
||||
help: i18n.t('errors.api.members.stripeNotConnected.help')
|
||||
}));
|
||||
} else {
|
||||
if (membersWithStripeCustomers.length) {
|
||||
await Promise.map(membersWithStripeCustomers, async (stripeMember) => {
|
||||
try {
|
||||
await membersService.api.members.linkStripeCustomer(stripeMember.stripe_customer_id, stripeMember);
|
||||
} catch (error) {
|
||||
if (error.message.indexOf('customer') && error.code === 'resource_missing') {
|
||||
error.message = `Member not imported. ${error.message}`;
|
||||
error.context = i18n.t('errors.api.members.stripeCustomerNotFound.context');
|
||||
error.help = i18n.t('errors.api.members.stripeCustomerNotFound.help');
|
||||
}
|
||||
logging.error(error);
|
||||
invalid.errors.push(error);
|
||||
|
||||
await deleteMemberKnex(stripeMember.id);
|
||||
}
|
||||
}, {
|
||||
concurrency: 10
|
||||
});
|
||||
}
|
||||
|
||||
if (membersWithComplimentaryPlans.length) {
|
||||
await Promise.map(membersWithComplimentaryPlans, async (complimentaryMember) => {
|
||||
try {
|
||||
await membersService.api.members.setComplimentarySubscription(complimentaryMember);
|
||||
} catch (error) {
|
||||
logging.error(error);
|
||||
invalid.errors.push(error);
|
||||
await deleteMemberKnex(complimentaryMember.id);
|
||||
}
|
||||
}, {
|
||||
concurrency: 10 // TODO: check if this concurrency level doesn't fail rate limits
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}).then(() => {
|
||||
// NOTE: grouping by context because messages can contain unique data like "customer_id"
|
||||
|
|
221
core/server/services/members/importer/index.js
Normal file
221
core/server/services/members/importer/index.js
Normal file
|
@ -0,0 +1,221 @@
|
|||
const _ = require('lodash');
|
||||
const uuid = require('uuid');
|
||||
const ObjectId = require('bson-objectid');
|
||||
const moment = require('moment-timezone');
|
||||
const errors = require('@tryghost/errors');
|
||||
const membersService = require('../index');
|
||||
const {i18n} = require('../../../lib/common');
|
||||
const db = require('../../../data/db');
|
||||
const logging = require('../../../../shared/logging');
|
||||
|
||||
const cleanupUndefined = (obj) => {
|
||||
for (let key in obj) {
|
||||
if (obj[key] === 'undefined') {
|
||||
delete obj[key];
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
function serializeMemberLabels(labels) {
|
||||
if (_.isString(labels)) {
|
||||
if (labels === '') {
|
||||
return [];
|
||||
}
|
||||
|
||||
return [{
|
||||
name: labels.trim()
|
||||
}];
|
||||
} else if (labels) {
|
||||
return labels.filter((label) => {
|
||||
return !!label;
|
||||
}).map((label) => {
|
||||
if (_.isString(label)) {
|
||||
return {
|
||||
name: label.trim()
|
||||
};
|
||||
}
|
||||
return label;
|
||||
});
|
||||
}
|
||||
return [];
|
||||
}
|
||||
|
||||
const doImport = async ({membersBatch: members, allLabelModels, importSetLabels, imported, invalid, createdBy}) => {
|
||||
const mappedMemberBatchData = [];
|
||||
const mappedMembersLabelsBatchAssociations = [];
|
||||
const membersWithStripeCustomers = [];
|
||||
const membersWithComplimentaryPlans = [];
|
||||
|
||||
members.forEach((member) => {
|
||||
cleanupUndefined(member);
|
||||
|
||||
let subscribed;
|
||||
if (_.isUndefined(member.subscribed_to_emails)) {
|
||||
// model default
|
||||
subscribed = 'true';
|
||||
} else {
|
||||
subscribed = (String(member.subscribed_to_emails).toLowerCase() !== 'false');
|
||||
}
|
||||
|
||||
member.labels = (member.labels && member.labels.split(',')) || [];
|
||||
const entryLabels = serializeMemberLabels(member.labels);
|
||||
const mergedLabels = _.unionBy(entryLabels, importSetLabels, 'name');
|
||||
|
||||
let createdAt = member.created_at === '' ? undefined : member.created_at;
|
||||
|
||||
if (createdAt) {
|
||||
const date = new Date(createdAt);
|
||||
|
||||
// CASE: client sends `0000-00-00 00:00:00`
|
||||
if (isNaN(date)) {
|
||||
// TODO: throw in validation stage for single record, not whole batch!
|
||||
throw new errors.ValidationError({
|
||||
message: i18n.t('errors.models.base.invalidDate', {key: 'created_at'}),
|
||||
code: 'DATE_INVALID'
|
||||
});
|
||||
}
|
||||
|
||||
createdAt = moment(createdAt).toDate();
|
||||
} else {
|
||||
createdAt = new Date();
|
||||
}
|
||||
|
||||
const memberId = ObjectId.generate();
|
||||
mappedMemberBatchData.push({
|
||||
id: memberId,
|
||||
uuid: uuid.v4(), // member model default
|
||||
email: member.email,
|
||||
name: member.name,
|
||||
note: member.note,
|
||||
subscribed: subscribed,
|
||||
created_at: createdAt,
|
||||
created_by: createdBy
|
||||
});
|
||||
|
||||
if (mergedLabels) {
|
||||
mergedLabels.forEach((label, index) => {
|
||||
const matchedLabel = allLabelModels.find(labelModel => labelModel.get('name') === label.name);
|
||||
|
||||
mappedMembersLabelsBatchAssociations.push({
|
||||
id: ObjectId.generate(),
|
||||
member_id: memberId,
|
||||
label_id: matchedLabel.id,
|
||||
sort_order: index
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
if (member.stripe_customer_id) {
|
||||
membersWithStripeCustomers.push({
|
||||
stripe_customer_id: member.stripe_customer_id,
|
||||
id: memberId,
|
||||
email: member.email
|
||||
});
|
||||
}
|
||||
|
||||
if ((String(member.complimentary_plan).toLocaleLowerCase() === 'true')) {
|
||||
membersWithComplimentaryPlans.push({
|
||||
id: memberId,
|
||||
email: member.email
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
// TODO: below inserts most likely need to be wrapped into transaction
|
||||
// to avoid creating orphaned member_labels connections
|
||||
await db.knex('members')
|
||||
.insert(mappedMemberBatchData);
|
||||
|
||||
await db.knex('members_labels')
|
||||
.insert(mappedMembersLabelsBatchAssociations);
|
||||
|
||||
imported.count += mappedMemberBatchData.length;
|
||||
} catch (error) {
|
||||
logging.error(error);
|
||||
|
||||
if (error.code && error.message.toLowerCase().indexOf('unique') !== -1) {
|
||||
invalid.errors.push(new errors.ValidationError({
|
||||
message: i18n.t('errors.api.members.memberAlreadyExists.message'),
|
||||
context: i18n.t('errors.api.members.memberAlreadyExists.context')
|
||||
}));
|
||||
} else {
|
||||
// NOTE: probably need to wrap this error into something more specific e.g. ImportError
|
||||
invalid.errors.push(error);
|
||||
}
|
||||
|
||||
invalid.count += mappedMemberBatchData.length;
|
||||
}
|
||||
|
||||
if (membersWithStripeCustomers.length || membersWithComplimentaryPlans.length) {
|
||||
const deleteMemberKnex = async (id) => {
|
||||
// TODO: cascading wont work on SQLite needs 2 separate deletes
|
||||
// for members_labels and members wrapped into a transaction
|
||||
const deletedMembersCount = await db.knex('members')
|
||||
.where('id', id)
|
||||
.del();
|
||||
|
||||
if (deletedMembersCount) {
|
||||
imported.count -= deletedMembersCount;
|
||||
invalid.count += deletedMembersCount;
|
||||
}
|
||||
};
|
||||
|
||||
if (!membersService.config.isStripeConnected()) {
|
||||
const memberIdsToDestroy = _.uniq([
|
||||
...membersWithStripeCustomers.map(m => m.id),
|
||||
...membersWithComplimentaryPlans.map(m => m.id)
|
||||
]);
|
||||
|
||||
// TODO: cascading wont work on SQLite needs 2 separate deletes
|
||||
// for members_labels and members wrapped into a transaction
|
||||
const deleteMembersCount = await db.knex('members')
|
||||
.whereIn('id', memberIdsToDestroy)
|
||||
.del();
|
||||
|
||||
imported.count -= deleteMembersCount;
|
||||
invalid.count += deleteMembersCount;
|
||||
invalid.errors.push(new errors.ValidationError({
|
||||
message: i18n.t('errors.api.members.stripeNotConnected.message'),
|
||||
context: i18n.t('errors.api.members.stripeNotConnected.context'),
|
||||
help: i18n.t('errors.api.members.stripeNotConnected.help')
|
||||
}));
|
||||
} else {
|
||||
if (membersWithStripeCustomers.length) {
|
||||
await Promise.map(membersWithStripeCustomers, async (stripeMember) => {
|
||||
try {
|
||||
await membersService.api.members.linkStripeCustomer(stripeMember.stripe_customer_id, stripeMember);
|
||||
} catch (error) {
|
||||
if (error.message.indexOf('customer') && error.code === 'resource_missing') {
|
||||
error.message = `Member not imported. ${error.message}`;
|
||||
error.context = i18n.t('errors.api.members.stripeCustomerNotFound.context');
|
||||
error.help = i18n.t('errors.api.members.stripeCustomerNotFound.help');
|
||||
}
|
||||
logging.error(error);
|
||||
invalid.errors.push(error);
|
||||
|
||||
await deleteMemberKnex(stripeMember.id);
|
||||
}
|
||||
}, {
|
||||
concurrency: 10
|
||||
});
|
||||
}
|
||||
|
||||
if (membersWithComplimentaryPlans.length) {
|
||||
await Promise.map(membersWithComplimentaryPlans, async (complimentaryMember) => {
|
||||
try {
|
||||
await membersService.api.members.setComplimentarySubscription(complimentaryMember);
|
||||
} catch (error) {
|
||||
logging.error(error);
|
||||
invalid.errors.push(error);
|
||||
await deleteMemberKnex(complimentaryMember.id);
|
||||
}
|
||||
}, {
|
||||
concurrency: 10 // TODO: check if this concurrency level doesn't fail rate limits
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
module.exports = doImport;
|
Loading…
Reference in a new issue