0
Fork 0
mirror of https://github.com/TryGhost/Ghost.git synced 2025-02-10 23:36:14 -05:00

Added metrics for job queue and email analytics (#21626)

no ref

Added Prometheus metrics for the job queue throughput and email analytics throughput. We'll likely keep these around as good metrics to keep an eye on, though for the moment their primary function is to establish a baseline for users w/o the job queue enabled so we can observe the full impact once switching it on.
This commit is contained in:
Steve Larson 2024-11-15 12:00:53 -06:00
parent e079ebb3de
commit 842adcdc80
9 changed files with 79 additions and 16 deletions

View file

@ -22,6 +22,7 @@ class EmailAnalyticsServiceWrapper {
const membersService = require('../members'); const membersService = require('../members');
const membersRepository = membersService.api.members; const membersRepository = membersService.api.members;
const emailSuppressionList = require('../email-suppression-list'); const emailSuppressionList = require('../email-suppression-list');
const prometheusClient = require('../../../shared/prometheus-client');
this.eventStorage = new EmailEventStorage({ this.eventStorage = new EmailEventStorage({
db, db,
@ -50,7 +51,8 @@ class EmailAnalyticsServiceWrapper {
new MailgunProvider({config, settings}) new MailgunProvider({config, settings})
], ],
queries, queries,
domainEvents domainEvents,
prometheusClient
}); });
// We currently cannot trigger a non-offloaded job from the job manager // We currently cannot trigger a non-offloaded job from the job manager
@ -65,6 +67,7 @@ class EmailAnalyticsServiceWrapper {
name: `update-member-email-analytics-${memberId}`, name: `update-member-email-analytics-${memberId}`,
metadata: { metadata: {
job: path.resolve(__dirname, 'jobs/update-member-email-analytics'), job: path.resolve(__dirname, 'jobs/update-member-email-analytics'),
name: 'update-member-email-analytics',
data: { data: {
memberId memberId
} }

View file

@ -9,7 +9,7 @@ const models = require('../../models');
const sentry = require('../../../shared/sentry'); const sentry = require('../../../shared/sentry');
const domainEvents = require('@tryghost/domain-events'); const domainEvents = require('@tryghost/domain-events');
const config = require('../../../shared/config'); const config = require('../../../shared/config');
const prometheusClient = require('../../../shared/prometheus-client');
const errorHandler = (error, workerMeta) => { const errorHandler = (error, workerMeta) => {
logging.info(`Capturing error for worker during execution of job: ${workerMeta.name}`); logging.info(`Capturing error for worker during execution of job: ${workerMeta.name}`);
logging.error(error); logging.error(error);
@ -43,7 +43,7 @@ const initTestMode = () => {
}, 5000); }, 5000);
}; };
const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, config}); const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, config, prometheusClient});
module.exports = jobManager; module.exports = jobManager;
module.exports.initTestMode = initTestMode; module.exports.initTestMode = initTestMode;

View file

@ -28,6 +28,7 @@ describe('Job Queue', function () {
afterEach(async function () { afterEach(async function () {
await configUtils.restore(); await configUtils.restore();
}); });
afterEach(testUtils.teardownDb);
describe('enabled by config', function () { describe('enabled by config', function () {
beforeEach(async function () { beforeEach(async function () {

View file

@ -75,14 +75,21 @@ module.exports = class EmailAnalyticsService {
* @param {EmailEventProcessor} dependencies.eventProcessor * @param {EmailEventProcessor} dependencies.eventProcessor
* @param {object} dependencies.providers * @param {object} dependencies.providers
* @param {import('@tryghost/domain-events')} dependencies.domainEvents * @param {import('@tryghost/domain-events')} dependencies.domainEvents
* @param {import('@tryghost/prometheus-metrics')} dependencies.prometheusClient
*/ */
constructor({config, settings, queries, eventProcessor, providers, domainEvents}) { constructor({config, settings, queries, eventProcessor, providers, domainEvents, prometheusClient}) {
this.config = config; this.config = config;
this.settings = settings; this.settings = settings;
this.queries = queries; this.queries = queries;
this.eventProcessor = eventProcessor; this.eventProcessor = eventProcessor;
this.providers = providers; this.providers = providers;
this.domainEvents = domainEvents; this.domainEvents = domainEvents;
this.prometheusClient = prometheusClient;
if (prometheusClient) {
// @ts-expect-error
prometheusClient.registerCounter({name: 'email_analytics_aggregate_member_stats_count', help: 'Count of member stats aggregations'});
}
} }
getStatus() { getStatus() {
@ -505,6 +512,7 @@ module.exports = class EmailAnalyticsService {
async aggregateStats({emailIds = [], memberIds = []}, includeOpenedEvents = true) { async aggregateStats({emailIds = [], memberIds = []}, includeOpenedEvents = true) {
let startTime = Date.now(); let startTime = Date.now();
logging.info(`[EmailAnalytics] Aggregating for ${emailIds.length} emails`); logging.info(`[EmailAnalytics] Aggregating for ${emailIds.length} emails`);
for (const emailId of emailIds) { for (const emailId of emailIds) {
await this.aggregateEmailStats(emailId, includeOpenedEvents); await this.aggregateEmailStats(emailId, includeOpenedEvents);
} }
@ -513,12 +521,17 @@ module.exports = class EmailAnalyticsService {
startTime = Date.now(); startTime = Date.now();
logging.info(`[EmailAnalytics] Aggregating for ${memberIds.length} members`); logging.info(`[EmailAnalytics] Aggregating for ${memberIds.length} members`);
// @ts-expect-error
const memberMetric = this.prometheusClient?.getMetric('email_analytics_aggregate_member_stats_count');
for (const memberId of memberIds) { for (const memberId of memberIds) {
if (this.config?.get('services:jobs:queue:enabled')) { if (this.config?.get('services:jobs:queue:enabled')) {
// With the queue enabled we will dispatch an event to update the member email analytics on the background queue (multithreaded :)) // With the queue enabled we will dispatch an event to update the member email analytics on the background queue (multithreaded :))
// job manager has its own metrics
await this.domainEvents.dispatch(MemberEmailAnalyticsUpdateEvent.create({memberId})); await this.domainEvents.dispatch(MemberEmailAnalyticsUpdateEvent.create({memberId}));
} else { } else {
await this.aggregateMemberStats(memberId); await this.aggregateMemberStats(memberId);
memberMetric?.inc();
} }
} }
endTime = Date.now() - startTime; endTime = Date.now() - startTime;

View file

@ -52,8 +52,9 @@ class JobManager {
* @param {Object} [options.config] - config * @param {Object} [options.config] - config
* @param {boolean} [options.isDuplicate] - if true, the job manager will not initialize the job queue * @param {boolean} [options.isDuplicate] - if true, the job manager will not initialize the job queue
* @param {JobQueueManager} [options.jobQueueManager] - job queue manager instance (for testing) * @param {JobQueueManager} [options.jobQueueManager] - job queue manager instance (for testing)
* @param {Object} [options.prometheusClient] - prometheus client instance (for testing)
*/ */
constructor({errorHandler, workerMessageHandler, JobModel, domainEvents, config, isDuplicate = false, jobQueueManager = null}) { constructor({errorHandler, workerMessageHandler, JobModel, domainEvents, config, isDuplicate = false, jobQueueManager = null, prometheusClient = null}) {
this.inlineQueue = fastq(this, worker, 3); this.inlineQueue = fastq(this, worker, 3);
this._jobMessageHandler = this._jobMessageHandler.bind(this); this._jobMessageHandler = this._jobMessageHandler.bind(this);
this._jobErrorHandler = this._jobErrorHandler.bind(this); this._jobErrorHandler = this._jobErrorHandler.bind(this);
@ -91,6 +92,8 @@ class JobManager {
if (JobModel) { if (JobModel) {
this._jobsRepository = new JobsRepository({JobModel}); this._jobsRepository = new JobsRepository({JobModel});
} }
this.prometheusClient = prometheusClient;
if (jobQueueManager) { if (jobQueueManager) {
this.#jobQueueManager = jobQueueManager; this.#jobQueueManager = jobQueueManager;
@ -101,7 +104,7 @@ class JobManager {
#initializeJobQueueManager() { #initializeJobQueueManager() {
if (this.#config?.get('services:jobs:queue:enabled') === true && !this.#jobQueueManager) { if (this.#config?.get('services:jobs:queue:enabled') === true && !this.#jobQueueManager) {
this.#jobQueueManager = new JobQueueManager({JobModel: this.#JobModel, config: this.#config}); this.#jobQueueManager = new JobQueueManager({JobModel: this.#JobModel, config: this.#config, prometheusClient: this.prometheusClient});
this.#jobQueueManager.init(); this.#jobQueueManager.init();
} }
} }
@ -129,6 +132,7 @@ class JobManager {
* @property {string} name - The name or identifier of the job. * @property {string} name - The name or identifier of the job.
* @property {Object} metadata - Metadata associated with the job. * @property {Object} metadata - Metadata associated with the job.
* @property {string} metadata.job - The absolute path to the job to execute. * @property {string} metadata.job - The absolute path to the job to execute.
* @property {string} metadata.name - The name of the job. Used for metrics.
* @property {Object} metadata.data - The data associated with the job. * @property {Object} metadata.data - The data associated with the job.
*/ */

View file

@ -5,13 +5,23 @@ const debug = require('@tryghost/debug')('job-manager:JobQueueManager');
const logging = require('@tryghost/logging'); const logging = require('@tryghost/logging');
class JobQueueManager { class JobQueueManager {
constructor({JobModel, config, logger = logging, WorkerPool = workerpool}) { constructor({JobModel, config, logger = logging, WorkerPool = workerpool, prometheusClient}) {
this.jobsRepository = new JobsRepository({JobModel}); this.jobsRepository = new JobsRepository({JobModel});
this.config = this.initializeConfig(config?.get('services:jobs:queue') || {}); this.config = this.initializeConfig(config?.get('services:jobs:queue') || {});
this.logger = this.createLogger(logger, this.config.logLevel); this.logger = this.createLogger(logger, this.config.logLevel);
this.WorkerPool = WorkerPool; this.WorkerPool = WorkerPool;
this.pool = this.createWorkerPool(); this.pool = this.createWorkerPool();
this.state = this.initializeState(); this.state = this.initializeState();
this.prometheusClient = prometheusClient;
if (prometheusClient) {
this.prometheusClient.registerCounter({
name: 'job_manager_queue_job_completion_count',
help: 'The number of jobs completed by the job manager queue',
labelNames: ['jobName']
});
this.prometheusClient.registerGauge({name: 'job_manager_queue_depth', help: 'The number of jobs in the job manager queue'});
}
} }
createLogger(logger, logLevel) { createLogger(logger, logLevel) {
@ -97,9 +107,9 @@ class JobQueueManager {
const stats = await this.getStats(); const stats = await this.getStats();
if (stats.pendingTasks <= this.config.QUEUE_CAPACITY) { if (stats.pendingTasks <= this.config.QUEUE_CAPACITY) {
const entriesToAdd = Math.min(this.config.FETCH_COUNT, this.config.FETCH_COUNT - stats.pendingTasks); const entriesToAdd = Math.min(this.config.FETCH_COUNT, this.config.FETCH_COUNT - stats.pendingTasks);
this.logger.info(`Adding up to ${entriesToAdd} queue entries. Current pending tasks: ${stats.pendingTasks}. Current worker count: ${stats.totalWorkers}`); const {data: jobs, total} = await this.jobsRepository.getQueuedJobs(entriesToAdd);
this.prometheusClient?.getMetric('job_manager_queue_depth')?.set(total || 0);
const jobs = await this.jobsRepository.getQueuedJobs(entriesToAdd); this.logger.info(`Adding up to ${entriesToAdd} queue entries. Current pending tasks: ${stats.pendingTasks}. Current worker count: ${stats.totalWorkers}. Current depth: ${total}.`);
this.updatePollInterval(jobs); this.updatePollInterval(jobs);
await this.processJobs(jobs); await this.processJobs(jobs);
} }
@ -133,6 +143,10 @@ class JobQueueManager {
try { try {
await this.pool.exec('executeJob', [jobMetadata.job, jobMetadata.data]); await this.pool.exec('executeJob', [jobMetadata.job, jobMetadata.data]);
await this.jobsRepository.delete(job.id); await this.jobsRepository.delete(job.id);
this.prometheusClient?.getMetric('job_manager_queue_job_completion_count')?.inc({jobName});
if (jobName === 'update-member-email-analytics') {
this.prometheusClient?.getMetric('email_analytics_aggregate_member_stats_count')?.inc();
}
} catch (error) { } catch (error) {
await this.handleJobError(job, jobMetadata, error); await this.handleJobError(job, jobMetadata, error);
} finally { } finally {

View file

@ -71,14 +71,14 @@ class JobsRepository {
* @async * @async
* @description Retrieves a list of queued jobs from the database. * @description Retrieves a list of queued jobs from the database.
* @param {number} [limit=50] - The maximum number of jobs to retrieve. * @param {number} [limit=50] - The maximum number of jobs to retrieve.
* @returns {Promise<Array>} An array of queued job objects. * @returns {Promise<Object>} An object containing the queued job data and total count.
*/ */
async getQueuedJobs(limit = 50) { async getQueuedJobs(limit = 50) {
const jobs = await this._JobModel.findPage({ const jobs = await this._JobModel.findPage({
filter: 'queue_entry:1', filter: 'queue_entry:1',
limit limit
}); });
return jobs.data; return {data: jobs.data, total: jobs.meta.pagination.total};
} }
/** /**

View file

@ -8,8 +8,11 @@ describe('JobQueueManager', function () {
let mockConfig; let mockConfig;
let mockLogger; let mockLogger;
let mockWorkerPool; let mockWorkerPool;
let mockPrometheusClient;
let metricIncStub;
beforeEach(function () { beforeEach(function () {
metricIncStub = sinon.stub();
mockJobModel = {}; mockJobModel = {};
mockConfig = { mockConfig = {
get: sinon.stub().returns({}) get: sinon.stub().returns({})
@ -18,6 +21,14 @@ describe('JobQueueManager', function () {
info: sinon.stub(), info: sinon.stub(),
error: sinon.stub() error: sinon.stub()
}; };
mockPrometheusClient = {
getMetric: sinon.stub().returns({
set: sinon.stub(),
inc: metricIncStub
}),
registerCounter: sinon.stub(),
registerGauge: sinon.stub()
};
mockWorkerPool = { mockWorkerPool = {
pool: sinon.stub().returns({ pool: sinon.stub().returns({
exec: sinon.stub(), exec: sinon.stub(),
@ -30,7 +41,8 @@ describe('JobQueueManager', function () {
JobModel: mockJobModel, JobModel: mockJobModel,
config: mockConfig, config: mockConfig,
logger: mockLogger, logger: mockLogger,
WorkerPool: mockWorkerPool WorkerPool: mockWorkerPool,
prometheusClient: mockPrometheusClient
}); });
}); });
@ -116,7 +128,7 @@ describe('JobQueueManager', function () {
const mockJobs = [{get: sinon.stub().returns('{}')}]; const mockJobs = [{get: sinon.stub().returns('{}')}];
sinon.stub(jobQueueManager, 'getStats').resolves(mockStats); sinon.stub(jobQueueManager, 'getStats').resolves(mockStats);
sinon.stub(jobQueueManager.jobsRepository, 'getQueuedJobs').resolves(mockJobs); sinon.stub(jobQueueManager.jobsRepository, 'getQueuedJobs').resolves({data: mockJobs, total: mockJobs.length});
sinon.stub(jobQueueManager, 'updatePollInterval'); sinon.stub(jobQueueManager, 'updatePollInterval');
sinon.stub(jobQueueManager, 'processJobs'); sinon.stub(jobQueueManager, 'processJobs');
@ -279,6 +291,20 @@ describe('JobQueueManager', function () {
expect(handleJobErrorStub.calledWith(job, {job: 'testJob', data: {}}, error)).to.be.true; expect(handleJobErrorStub.calledWith(job, {job: 'testJob', data: {}}, error)).to.be.true;
expect(jobQueueManager.state.queuedJobs.has('testJob')).to.be.false; expect(jobQueueManager.state.queuedJobs.has('testJob')).to.be.false;
}); });
it('should increment the job_manager_queue_job_completion_count metric', async function () {
const job = {id: '1', get: sinon.stub().returns('{"job": "testJob", "data": {}}')};
sinon.stub(jobQueueManager.jobsRepository, 'delete').resolves();
await jobQueueManager.executeJob(job, 'testJob', {job: 'testJob', data: {}});
expect(metricIncStub.calledOnce).to.be.true;
});
it('should increment the email_analytics_aggregate_member_stats_count metric', async function () {
const job = {id: '1', get: sinon.stub().returns('{"job": "update-member-email-analytics", "data": {}}')};
sinon.stub(jobQueueManager.jobsRepository, 'delete').resolves();
await jobQueueManager.executeJob(job, 'update-member-email-analytics', {job: 'update-member-email-analytics', data: {}});
expect(metricIncStub.calledTwice).to.be.true;
});
}); });
describe('handleJobError', function () { describe('handleJobError', function () {

View file

@ -180,12 +180,14 @@ export class PrometheusClient {
* Registers a counter metric * Registers a counter metric
* @param name - The name of the metric * @param name - The name of the metric
* @param help - The help text for the metric * @param help - The help text for the metric
* @param labelNames - The names of the labels for the metric
* @returns The counter metric * @returns The counter metric
*/ */
registerCounter({name, help}: {name: string, help: string}): client.Counter { registerCounter({name, help, labelNames = []}: {name: string, help: string, labelNames?: string[]}): client.Counter {
return new this.client.Counter({ return new this.client.Counter({
name: `${this.prefix}${name}`, name: `${this.prefix}${name}`,
help help,
labelNames
}); });
} }