From 842adcdc80e35469ccdb9509bd4053adff39ed93 Mon Sep 17 00:00:00 2001 From: Steve Larson <9larsons@gmail.com> Date: Fri, 15 Nov 2024 12:00:53 -0600 Subject: [PATCH] Added metrics for job queue and email analytics (#21626) no ref Added Prometheus metrics for the job queue throughput and email analytics throughput. We'll likely keep these around as good metrics to keep an eye on, though for the moment their primary function is to establish a baseline for users w/o the job queue enabled so we can observe the full impact once switching it on. --- .../EmailAnalyticsServiceWrapper.js | 5 +++- .../core/server/services/jobs/job-service.js | 4 +-- .../test/integration/jobs/job-queue.test.js | 1 + .../lib/EmailAnalyticsService.js | 15 +++++++++- ghost/job-manager/lib/JobManager.js | 8 +++-- ghost/job-manager/lib/JobQueueManager.js | 22 +++++++++++--- ghost/job-manager/lib/JobsRepository.js | 4 +-- .../test/job-queue-manager.test.js | 30 +++++++++++++++++-- .../src/PrometheusClient.ts | 6 ++-- 9 files changed, 79 insertions(+), 16 deletions(-) diff --git a/ghost/core/core/server/services/email-analytics/EmailAnalyticsServiceWrapper.js b/ghost/core/core/server/services/email-analytics/EmailAnalyticsServiceWrapper.js index ac2ef2877b..5cd381aa66 100644 --- a/ghost/core/core/server/services/email-analytics/EmailAnalyticsServiceWrapper.js +++ b/ghost/core/core/server/services/email-analytics/EmailAnalyticsServiceWrapper.js @@ -22,6 +22,7 @@ class EmailAnalyticsServiceWrapper { const membersService = require('../members'); const membersRepository = membersService.api.members; const emailSuppressionList = require('../email-suppression-list'); + const prometheusClient = require('../../../shared/prometheus-client'); this.eventStorage = new EmailEventStorage({ db, @@ -50,7 +51,8 @@ class EmailAnalyticsServiceWrapper { new MailgunProvider({config, settings}) ], queries, - domainEvents + domainEvents, + prometheusClient }); // We currently cannot trigger a non-offloaded job from the job manager @@ -65,6 +67,7 @@ class EmailAnalyticsServiceWrapper { name: `update-member-email-analytics-${memberId}`, metadata: { job: path.resolve(__dirname, 'jobs/update-member-email-analytics'), + name: 'update-member-email-analytics', data: { memberId } diff --git a/ghost/core/core/server/services/jobs/job-service.js b/ghost/core/core/server/services/jobs/job-service.js index f96684feb9..81f51be878 100644 --- a/ghost/core/core/server/services/jobs/job-service.js +++ b/ghost/core/core/server/services/jobs/job-service.js @@ -9,7 +9,7 @@ const models = require('../../models'); const sentry = require('../../../shared/sentry'); const domainEvents = require('@tryghost/domain-events'); const config = require('../../../shared/config'); - +const prometheusClient = require('../../../shared/prometheus-client'); const errorHandler = (error, workerMeta) => { logging.info(`Capturing error for worker during execution of job: ${workerMeta.name}`); logging.error(error); @@ -43,7 +43,7 @@ const initTestMode = () => { }, 5000); }; -const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, config}); +const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, config, prometheusClient}); module.exports = jobManager; module.exports.initTestMode = initTestMode; diff --git a/ghost/core/test/integration/jobs/job-queue.test.js b/ghost/core/test/integration/jobs/job-queue.test.js index e2187a2053..865370dcc2 100644 --- a/ghost/core/test/integration/jobs/job-queue.test.js +++ b/ghost/core/test/integration/jobs/job-queue.test.js @@ -28,6 +28,7 @@ describe('Job Queue', function () { afterEach(async function () { await configUtils.restore(); }); + afterEach(testUtils.teardownDb); describe('enabled by config', function () { beforeEach(async function () { diff --git a/ghost/email-analytics-service/lib/EmailAnalyticsService.js b/ghost/email-analytics-service/lib/EmailAnalyticsService.js index 90fa93f797..91518eb9f6 100644 --- a/ghost/email-analytics-service/lib/EmailAnalyticsService.js +++ b/ghost/email-analytics-service/lib/EmailAnalyticsService.js @@ -75,14 +75,21 @@ module.exports = class EmailAnalyticsService { * @param {EmailEventProcessor} dependencies.eventProcessor * @param {object} dependencies.providers * @param {import('@tryghost/domain-events')} dependencies.domainEvents + * @param {import('@tryghost/prometheus-metrics')} dependencies.prometheusClient */ - constructor({config, settings, queries, eventProcessor, providers, domainEvents}) { + constructor({config, settings, queries, eventProcessor, providers, domainEvents, prometheusClient}) { this.config = config; this.settings = settings; this.queries = queries; this.eventProcessor = eventProcessor; this.providers = providers; this.domainEvents = domainEvents; + this.prometheusClient = prometheusClient; + + if (prometheusClient) { + // @ts-expect-error + prometheusClient.registerCounter({name: 'email_analytics_aggregate_member_stats_count', help: 'Count of member stats aggregations'}); + } } getStatus() { @@ -505,6 +512,7 @@ module.exports = class EmailAnalyticsService { async aggregateStats({emailIds = [], memberIds = []}, includeOpenedEvents = true) { let startTime = Date.now(); logging.info(`[EmailAnalytics] Aggregating for ${emailIds.length} emails`); + for (const emailId of emailIds) { await this.aggregateEmailStats(emailId, includeOpenedEvents); } @@ -513,12 +521,17 @@ module.exports = class EmailAnalyticsService { startTime = Date.now(); logging.info(`[EmailAnalytics] Aggregating for ${memberIds.length} members`); + + // @ts-expect-error + const memberMetric = this.prometheusClient?.getMetric('email_analytics_aggregate_member_stats_count'); for (const memberId of memberIds) { if (this.config?.get('services:jobs:queue:enabled')) { // With the queue enabled we will dispatch an event to update the member email analytics on the background queue (multithreaded :)) + // job manager has its own metrics await this.domainEvents.dispatch(MemberEmailAnalyticsUpdateEvent.create({memberId})); } else { await this.aggregateMemberStats(memberId); + memberMetric?.inc(); } } endTime = Date.now() - startTime; diff --git a/ghost/job-manager/lib/JobManager.js b/ghost/job-manager/lib/JobManager.js index 0572fa1e22..8761a772b8 100644 --- a/ghost/job-manager/lib/JobManager.js +++ b/ghost/job-manager/lib/JobManager.js @@ -52,8 +52,9 @@ class JobManager { * @param {Object} [options.config] - config * @param {boolean} [options.isDuplicate] - if true, the job manager will not initialize the job queue * @param {JobQueueManager} [options.jobQueueManager] - job queue manager instance (for testing) + * @param {Object} [options.prometheusClient] - prometheus client instance (for testing) */ - constructor({errorHandler, workerMessageHandler, JobModel, domainEvents, config, isDuplicate = false, jobQueueManager = null}) { + constructor({errorHandler, workerMessageHandler, JobModel, domainEvents, config, isDuplicate = false, jobQueueManager = null, prometheusClient = null}) { this.inlineQueue = fastq(this, worker, 3); this._jobMessageHandler = this._jobMessageHandler.bind(this); this._jobErrorHandler = this._jobErrorHandler.bind(this); @@ -91,6 +92,8 @@ class JobManager { if (JobModel) { this._jobsRepository = new JobsRepository({JobModel}); } + + this.prometheusClient = prometheusClient; if (jobQueueManager) { this.#jobQueueManager = jobQueueManager; @@ -101,7 +104,7 @@ class JobManager { #initializeJobQueueManager() { if (this.#config?.get('services:jobs:queue:enabled') === true && !this.#jobQueueManager) { - this.#jobQueueManager = new JobQueueManager({JobModel: this.#JobModel, config: this.#config}); + this.#jobQueueManager = new JobQueueManager({JobModel: this.#JobModel, config: this.#config, prometheusClient: this.prometheusClient}); this.#jobQueueManager.init(); } } @@ -129,6 +132,7 @@ class JobManager { * @property {string} name - The name or identifier of the job. * @property {Object} metadata - Metadata associated with the job. * @property {string} metadata.job - The absolute path to the job to execute. + * @property {string} metadata.name - The name of the job. Used for metrics. * @property {Object} metadata.data - The data associated with the job. */ diff --git a/ghost/job-manager/lib/JobQueueManager.js b/ghost/job-manager/lib/JobQueueManager.js index 14f3275fa5..cd3bee0f00 100644 --- a/ghost/job-manager/lib/JobQueueManager.js +++ b/ghost/job-manager/lib/JobQueueManager.js @@ -5,13 +5,23 @@ const debug = require('@tryghost/debug')('job-manager:JobQueueManager'); const logging = require('@tryghost/logging'); class JobQueueManager { - constructor({JobModel, config, logger = logging, WorkerPool = workerpool}) { + constructor({JobModel, config, logger = logging, WorkerPool = workerpool, prometheusClient}) { this.jobsRepository = new JobsRepository({JobModel}); this.config = this.initializeConfig(config?.get('services:jobs:queue') || {}); this.logger = this.createLogger(logger, this.config.logLevel); this.WorkerPool = WorkerPool; this.pool = this.createWorkerPool(); this.state = this.initializeState(); + this.prometheusClient = prometheusClient; + + if (prometheusClient) { + this.prometheusClient.registerCounter({ + name: 'job_manager_queue_job_completion_count', + help: 'The number of jobs completed by the job manager queue', + labelNames: ['jobName'] + }); + this.prometheusClient.registerGauge({name: 'job_manager_queue_depth', help: 'The number of jobs in the job manager queue'}); + } } createLogger(logger, logLevel) { @@ -97,9 +107,9 @@ class JobQueueManager { const stats = await this.getStats(); if (stats.pendingTasks <= this.config.QUEUE_CAPACITY) { const entriesToAdd = Math.min(this.config.FETCH_COUNT, this.config.FETCH_COUNT - stats.pendingTasks); - this.logger.info(`Adding up to ${entriesToAdd} queue entries. Current pending tasks: ${stats.pendingTasks}. Current worker count: ${stats.totalWorkers}`); - - const jobs = await this.jobsRepository.getQueuedJobs(entriesToAdd); + const {data: jobs, total} = await this.jobsRepository.getQueuedJobs(entriesToAdd); + this.prometheusClient?.getMetric('job_manager_queue_depth')?.set(total || 0); + this.logger.info(`Adding up to ${entriesToAdd} queue entries. Current pending tasks: ${stats.pendingTasks}. Current worker count: ${stats.totalWorkers}. Current depth: ${total}.`); this.updatePollInterval(jobs); await this.processJobs(jobs); } @@ -133,6 +143,10 @@ class JobQueueManager { try { await this.pool.exec('executeJob', [jobMetadata.job, jobMetadata.data]); await this.jobsRepository.delete(job.id); + this.prometheusClient?.getMetric('job_manager_queue_job_completion_count')?.inc({jobName}); + if (jobName === 'update-member-email-analytics') { + this.prometheusClient?.getMetric('email_analytics_aggregate_member_stats_count')?.inc(); + } } catch (error) { await this.handleJobError(job, jobMetadata, error); } finally { diff --git a/ghost/job-manager/lib/JobsRepository.js b/ghost/job-manager/lib/JobsRepository.js index e57cecce58..e2c4f50348 100644 --- a/ghost/job-manager/lib/JobsRepository.js +++ b/ghost/job-manager/lib/JobsRepository.js @@ -71,14 +71,14 @@ class JobsRepository { * @async * @description Retrieves a list of queued jobs from the database. * @param {number} [limit=50] - The maximum number of jobs to retrieve. - * @returns {Promise} An array of queued job objects. + * @returns {Promise} An object containing the queued job data and total count. */ async getQueuedJobs(limit = 50) { const jobs = await this._JobModel.findPage({ filter: 'queue_entry:1', limit }); - return jobs.data; + return {data: jobs.data, total: jobs.meta.pagination.total}; } /** diff --git a/ghost/job-manager/test/job-queue-manager.test.js b/ghost/job-manager/test/job-queue-manager.test.js index d178a4729c..d394484f8a 100644 --- a/ghost/job-manager/test/job-queue-manager.test.js +++ b/ghost/job-manager/test/job-queue-manager.test.js @@ -8,8 +8,11 @@ describe('JobQueueManager', function () { let mockConfig; let mockLogger; let mockWorkerPool; + let mockPrometheusClient; + let metricIncStub; beforeEach(function () { + metricIncStub = sinon.stub(); mockJobModel = {}; mockConfig = { get: sinon.stub().returns({}) @@ -18,6 +21,14 @@ describe('JobQueueManager', function () { info: sinon.stub(), error: sinon.stub() }; + mockPrometheusClient = { + getMetric: sinon.stub().returns({ + set: sinon.stub(), + inc: metricIncStub + }), + registerCounter: sinon.stub(), + registerGauge: sinon.stub() + }; mockWorkerPool = { pool: sinon.stub().returns({ exec: sinon.stub(), @@ -30,7 +41,8 @@ describe('JobQueueManager', function () { JobModel: mockJobModel, config: mockConfig, logger: mockLogger, - WorkerPool: mockWorkerPool + WorkerPool: mockWorkerPool, + prometheusClient: mockPrometheusClient }); }); @@ -116,7 +128,7 @@ describe('JobQueueManager', function () { const mockJobs = [{get: sinon.stub().returns('{}')}]; sinon.stub(jobQueueManager, 'getStats').resolves(mockStats); - sinon.stub(jobQueueManager.jobsRepository, 'getQueuedJobs').resolves(mockJobs); + sinon.stub(jobQueueManager.jobsRepository, 'getQueuedJobs').resolves({data: mockJobs, total: mockJobs.length}); sinon.stub(jobQueueManager, 'updatePollInterval'); sinon.stub(jobQueueManager, 'processJobs'); @@ -279,6 +291,20 @@ describe('JobQueueManager', function () { expect(handleJobErrorStub.calledWith(job, {job: 'testJob', data: {}}, error)).to.be.true; expect(jobQueueManager.state.queuedJobs.has('testJob')).to.be.false; }); + + it('should increment the job_manager_queue_job_completion_count metric', async function () { + const job = {id: '1', get: sinon.stub().returns('{"job": "testJob", "data": {}}')}; + sinon.stub(jobQueueManager.jobsRepository, 'delete').resolves(); + await jobQueueManager.executeJob(job, 'testJob', {job: 'testJob', data: {}}); + expect(metricIncStub.calledOnce).to.be.true; + }); + + it('should increment the email_analytics_aggregate_member_stats_count metric', async function () { + const job = {id: '1', get: sinon.stub().returns('{"job": "update-member-email-analytics", "data": {}}')}; + sinon.stub(jobQueueManager.jobsRepository, 'delete').resolves(); + await jobQueueManager.executeJob(job, 'update-member-email-analytics', {job: 'update-member-email-analytics', data: {}}); + expect(metricIncStub.calledTwice).to.be.true; + }); }); describe('handleJobError', function () { diff --git a/ghost/prometheus-metrics/src/PrometheusClient.ts b/ghost/prometheus-metrics/src/PrometheusClient.ts index f783e8a55c..948d1d84ee 100644 --- a/ghost/prometheus-metrics/src/PrometheusClient.ts +++ b/ghost/prometheus-metrics/src/PrometheusClient.ts @@ -180,12 +180,14 @@ export class PrometheusClient { * Registers a counter metric * @param name - The name of the metric * @param help - The help text for the metric + * @param labelNames - The names of the labels for the metric * @returns The counter metric */ - registerCounter({name, help}: {name: string, help: string}): client.Counter { + registerCounter({name, help, labelNames = []}: {name: string, help: string, labelNames?: string[]}): client.Counter { return new this.client.Counter({ name: `${this.prefix}${name}`, - help + help, + labelNames }); }