mirror of
https://github.com/TryGhost/Ghost.git
synced 2025-03-11 02:12:21 -05:00
Updated fetchMissing to use db-persisted values - this helps tremendously with handles reboots to ensure we re-fetch every event to capture missing events
This commit is contained in:
parent
719316df77
commit
d797848ed4
2 changed files with 49 additions and 20 deletions
|
@ -9,6 +9,21 @@ const MIN_EMAIL_COUNT_FOR_OPEN_RATE = 5;
|
|||
/** @typedef {'email-analytics-latest-opened'|'email-analytics-latest-others'|'email-analytics-missing'|'email-analytics-scheduled'} EmailAnalyticsJobName */
|
||||
/** @typedef {'delivered'|'opened'|'failed'} EmailAnalyticsEvent */
|
||||
|
||||
/**
|
||||
* Creates a job in the jobs table if it does not already exist.
|
||||
* @param {EmailAnalyticsJobName} jobName - The name of the job to create.
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
async function createJobIfNotExists(jobName) {
|
||||
await db.knex('jobs').insert({
|
||||
id: new ObjectID().toHexString(),
|
||||
name: jobName,
|
||||
started_at: new Date(),
|
||||
created_at: new Date(),
|
||||
status: 'started'
|
||||
}).onConflict('name').ignore();
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
async shouldFetchStats() {
|
||||
// don't fetch stats from Mailgun if we haven't sent any emails
|
||||
|
@ -28,15 +43,13 @@ module.exports = {
|
|||
let maxOpenedAt;
|
||||
let maxDeliveredAt;
|
||||
let maxFailedAt;
|
||||
const lastJobRunTimestamp = await this.getLastJobRunTimestamp(jobName);
|
||||
|
||||
const jobData = await db.knex('jobs').select('finished_at', 'started_at').where('name', jobName).first();
|
||||
|
||||
if (jobData) {
|
||||
if (lastJobRunTimestamp) {
|
||||
debug(`Using job data for ${jobName}`);
|
||||
const lastJobTimestamp = jobData.finished_at || jobData.started_at;
|
||||
maxOpenedAt = events.includes('opened') ? lastJobTimestamp : null;
|
||||
maxDeliveredAt = events.includes('delivered') ? lastJobTimestamp : null;
|
||||
maxFailedAt = events.includes('failed') ? lastJobTimestamp : null;
|
||||
maxOpenedAt = events.includes('opened') ? lastJobRunTimestamp : null;
|
||||
maxDeliveredAt = events.includes('delivered') ? lastJobRunTimestamp : null;
|
||||
maxFailedAt = events.includes('failed') ? lastJobRunTimestamp : null;
|
||||
} else {
|
||||
debug(`Job data not found for ${jobName}, using email_recipients data`);
|
||||
logging.info(`Job data not found for ${jobName}, using email_recipients data`);
|
||||
|
@ -50,14 +63,7 @@ module.exports = {
|
|||
maxFailedAt = (await db.knex('email_recipients').select(db.knex.raw('MAX(failed_at) as maxFailedAt')).first()).maxFailedAt;
|
||||
}
|
||||
|
||||
// Insert a new job row if it doesn't exist
|
||||
await db.knex('jobs').insert({
|
||||
id: new ObjectID().toHexString(),
|
||||
name: jobName,
|
||||
started_at: new Date(),
|
||||
created_at: new Date(),
|
||||
status: 'started'
|
||||
}).onConflict('name').ignore();
|
||||
await createJobIfNotExists(jobName);
|
||||
}
|
||||
|
||||
// Convert string dates to Date objects for SQLite compatibility
|
||||
|
@ -66,11 +72,30 @@ module.exports = {
|
|||
));
|
||||
|
||||
const lastSeenEventTimestamp = _.max([maxOpenedAt, maxDeliveredAt, maxFailedAt]);
|
||||
debug(`getLastSeenEventTimestamp: finished in ${Date.now() - startDate}ms`);
|
||||
debug(`getLastEventTimestamp: finished in ${Date.now() - startDate}ms`);
|
||||
|
||||
return lastSeenEventTimestamp;
|
||||
},
|
||||
|
||||
/**
|
||||
* Retrieves the job data for the specified job name.
|
||||
* @param {EmailAnalyticsJobName} jobName - The name of the job to retrieve data for.
|
||||
* @returns {Promise<Object|null>} The job data, or null if no job data is found.
|
||||
*/
|
||||
async getJobData(jobName) {
|
||||
return await db.knex('jobs').select('finished_at', 'started_at').where('name', jobName).first();
|
||||
},
|
||||
|
||||
/**
|
||||
* Retrieves the timestamp of the last job run for the specified job name.
|
||||
* @param {EmailAnalyticsJobName} jobName - The name of the job to retrieve the last run timestamp for.
|
||||
* @returns {Promise<Date|null>} The timestamp of the last job run, or null if no job data is found.
|
||||
*/
|
||||
async getLastJobRunTimestamp(jobName) {
|
||||
const jobData = await this.getJobData(jobName);
|
||||
return jobData ? jobData.finished_at || jobData.started_at : null;
|
||||
},
|
||||
|
||||
/**
|
||||
* Sets the timestamp of the last seen event for the specified email analytics events.
|
||||
* @param {EmailAnalyticsJobName} jobName - The name of the job to update.
|
||||
|
|
|
@ -105,6 +105,13 @@ module.exports = class EmailAnalyticsService {
|
|||
return this.#fetchLatestOpenedData?.lastEventTimestamp ?? (await this.queries.getLastEventTimestamp(this.#fetchLatestOpenedData.jobName,['opened'])) ?? new Date(Date.now() - TRUST_THRESHOLD_MS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the timestamp of the last missing event we processed. Defaults to now minus 2h if we have no data yet.
|
||||
*/
|
||||
async getLastMissingEventTimestamp() {
|
||||
return this.#fetchMissingData?.lastEventTimestamp ?? (await this.queries.getLastJobRunTimestamp(this.#fetchMissingData.jobName)) ?? new Date(Date.now() - TRUST_THRESHOLD_MS * 4);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches the latest opened events.
|
||||
* @param {Object} options - The options for fetching events.
|
||||
|
@ -112,7 +119,6 @@ module.exports = class EmailAnalyticsService {
|
|||
* @returns {Promise<number>} The total number of events fetched.
|
||||
*/
|
||||
async fetchLatestOpenedEvents({maxEvents = Infinity} = {}) {
|
||||
// Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available
|
||||
const begin = await this.getLastOpenedEventTimestamp();
|
||||
const end = new Date(Date.now() - FETCH_LATEST_END_MARGIN_MS); // Always stop at x minutes ago to give Mailgun a bit more time to stabilize storage
|
||||
|
||||
|
@ -132,7 +138,6 @@ module.exports = class EmailAnalyticsService {
|
|||
* @returns {Promise<number>} The total number of events fetched.
|
||||
*/
|
||||
async fetchLatestNonOpenedEvents({maxEvents = Infinity} = {}) {
|
||||
// Start where we left of, or the last stored event in the database, or start 30 minutes ago if we have nothing available
|
||||
const begin = await this.getLastNonOpenedEventTimestamp();
|
||||
const end = new Date(Date.now() - FETCH_LATEST_END_MARGIN_MS); // Always stop at x minutes ago to give Mailgun a bit more time to stabilize storage
|
||||
|
||||
|
@ -151,8 +156,7 @@ module.exports = class EmailAnalyticsService {
|
|||
* @param {number} [options.maxEvents] Not a strict maximum. We stop fetching after we reached the maximum AND received at least one event after begin (not equal) to prevent deadlocks.
|
||||
*/
|
||||
async fetchMissing({maxEvents = Infinity} = {}) {
|
||||
// We start where we left of, or 1,5h ago after a server restart
|
||||
const begin = this.#fetchMissingData?.lastEventTimestamp ?? this.#fetchMissingData?.lastBegin ?? new Date(Date.now() - TRUST_THRESHOLD_MS * 3);
|
||||
const begin = await this.getLastMissingEventTimestamp();
|
||||
|
||||
// Always stop at the earlier of the time the fetchLatest started fetching on or 30 minutes ago
|
||||
const end = new Date(
|
||||
|
|
Loading…
Add table
Reference in a new issue