diff --git a/ghost/core/core/server/services/jobs/job-service.js b/ghost/core/core/server/services/jobs/job-service.js index 5f1b5edb77..376d1b23a8 100644 --- a/ghost/core/core/server/services/jobs/job-service.js +++ b/ghost/core/core/server/services/jobs/job-service.js @@ -9,7 +9,6 @@ const models = require('../../models'); const sentry = require('../../../shared/sentry'); const domainEvents = require('@tryghost/domain-events'); const config = require('../../../shared/config'); -const prometheusClient = require('../../../shared/prometheus-client'); const errorHandler = (error, workerMeta) => { logging.info(`Capturing error for worker during execution of job: ${workerMeta.name}`); logging.error(error); @@ -44,7 +43,7 @@ const initTestMode = () => { }, 5000); }; -const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, config, prometheusClient, events}); +const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, config, events}); module.exports = jobManager; module.exports.initTestMode = initTestMode; diff --git a/ghost/job-manager/lib/JobManager.js b/ghost/job-manager/lib/JobManager.js index cd365b00e8..5ba0a20044 100644 --- a/ghost/job-manager/lib/JobManager.js +++ b/ghost/job-manager/lib/JobManager.js @@ -7,6 +7,7 @@ const Bree = require('bree'); const pWaitFor = require('p-wait-for'); const {UnhandledJobError, IncorrectUsageError} = require('@tryghost/errors'); const logging = require('@tryghost/logging'); +const metrics = require('@tryghost/metrics'); const isCronExpression = require('./is-cron-expression'); const assembleBreeJob = require('./assemble-bree-job'); const JobsRepository = require('./JobsRepository'); @@ -53,10 +54,9 @@ class JobManager { * @param {Object} [options.config] - config * @param {boolean} [options.isDuplicate] - if true, the job manager will not initialize the job queue * @param {JobQueueManager} [options.jobQueueManager] - job queue manager instance (for testing) - * @param {Object} [options.prometheusClient] - prometheus client instance (for testing) * @param {Object} [options.events] - events instance (for testing) */ - constructor({errorHandler, workerMessageHandler, JobModel, domainEvents, config, isDuplicate = false, jobQueueManager = null, prometheusClient = null, events = null}) { + constructor({errorHandler, workerMessageHandler, JobModel, domainEvents, config, isDuplicate = false, jobQueueManager = null, events = null}) { this.inlineQueue = fastq(this, worker, 3); this._jobMessageHandler = this._jobMessageHandler.bind(this); this._jobErrorHandler = this._jobErrorHandler.bind(this); @@ -95,8 +95,6 @@ class JobManager { if (JobModel) { this._jobsRepository = new JobsRepository({JobModel}); } - - this.prometheusClient = prometheusClient; if (jobQueueManager) { this.#jobQueueManager = jobQueueManager; @@ -107,7 +105,7 @@ class JobManager { #initializeJobQueueManager() { if (this.#config?.get('services:jobs:queue:enabled') === true && !this.#jobQueueManager) { - this.#jobQueueManager = new JobQueueManager({JobModel: this.#JobModel, config: this.#config, prometheusClient: this.prometheusClient, eventEmitter: this.#events}); + this.#jobQueueManager = new JobQueueManager({JobModel: this.#JobModel, config: this.#config, eventEmitter: this.#events, metricLogger: metrics}); this.#jobQueueManager.init(); } } diff --git a/ghost/job-manager/lib/JobQueueManager.js b/ghost/job-manager/lib/JobQueueManager.js index fab6b836a3..0baa32a867 100644 --- a/ghost/job-manager/lib/JobQueueManager.js +++ b/ghost/job-manager/lib/JobQueueManager.js @@ -3,26 +3,23 @@ const path = require('path'); const JobsRepository = require('./JobsRepository'); const debug = require('@tryghost/debug')('job-manager:JobQueueManager'); const logging = require('@tryghost/logging'); +const metrics = require('@tryghost/metrics'); class JobQueueManager { - constructor({JobModel, config, logger = logging, WorkerPool = workerpool, prometheusClient, eventEmitter}) { + constructor({JobModel, config, logger = logging, metricLogger = metrics, WorkerPool = workerpool, eventEmitter}) { this.jobsRepository = new JobsRepository({JobModel}); this.config = this.initializeConfig(config?.get('services:jobs:queue') || {}); this.logger = this.createLogger(logger, this.config.logLevel); + this.metricLogger = metricLogger; this.WorkerPool = WorkerPool; this.pool = this.createWorkerPool(); this.state = this.initializeState(); this.eventEmitter = eventEmitter; - - this.prometheusClient = prometheusClient; - if (prometheusClient) { - this.prometheusClient.registerCounter({ - name: 'job_manager_queue_job_completion_count', - help: 'The number of jobs completed by the job manager queue', - labelNames: ['jobName'] - }); - this.prometheusClient.registerGauge({name: 'job_manager_queue_depth', help: 'The number of jobs in the job manager queue'}); - } + this.metricCache = { + jobCompletionCount: 0, + queueDepth: 0, + emailAnalyticsAggregateMemberStatsCount: 0 + }; } createLogger(logger, logLevel) { @@ -114,7 +111,7 @@ class JobQueueManager { if (stats.pendingTasks <= this.config.QUEUE_CAPACITY) { const entriesToAdd = Math.min(this.config.FETCH_COUNT, this.config.FETCH_COUNT - stats.pendingTasks); const {data: jobs, total} = await this.jobsRepository.getQueuedJobs(entriesToAdd); - this.prometheusClient?.getMetric('job_manager_queue_depth')?.set(total || 0); + this.metricCache.queueDepth = total || 0; this.logger.debug(`Adding up to ${entriesToAdd} queue entries. Current pending tasks: ${stats.pendingTasks}. Current worker count: ${stats.totalWorkers}. Current depth: ${total}.`); this.updatePollInterval(jobs); await this.processJobs(jobs); @@ -164,9 +161,9 @@ class JobQueueManager { */ const result = await this.pool.exec('executeJob', [jobMetadata.job, jobMetadata.data]); await this.jobsRepository.delete(job.id); - this.prometheusClient?.getMetric('job_manager_queue_job_completion_count')?.inc({jobName}); + this.metricCache.jobCompletionCount += 1; if (jobName === 'update-member-email-analytics') { - this.prometheusClient?.getMetric('email_analytics_aggregate_member_stats_count')?.inc(); + this.metricCache.emailAnalyticsAggregateMemberStatsCount += 1; } if (result && result.eventData) { this.emitEvents(result.eventData.events); // this is nested within eventData because we may want to support DomainEvents emission as well @@ -212,11 +209,21 @@ class JobQueueManager { reportStats() { setInterval(() => { - this.logger.info('-- job queue stats --'); - this.logger.info(JSON.stringify(this.pool.stats(), null, 2)); + this._doReportStats(); }, this.config.reportInterval); } + _doReportStats() { + const poolStats = this.pool.stats(); + const stats = { + ...poolStats, + ...this.metricCache + }; + const statsString = JSON.stringify(stats, null, 2); + this.logger.info(`Job Queue Stats: ${statsString}`); + this.metricLogger.metric('job_manager_queue', stats); + } + async shutdown() { try { await this.pool.terminate(); diff --git a/ghost/job-manager/test/job-queue-manager.test.js b/ghost/job-manager/test/job-queue-manager.test.js index 3374a29330..30b02adc20 100644 --- a/ghost/job-manager/test/job-queue-manager.test.js +++ b/ghost/job-manager/test/job-queue-manager.test.js @@ -7,12 +7,10 @@ describe('JobQueueManager', function () { let mockJobModel; let mockConfig; let mockLogger; + let mockMetricLogger; let mockWorkerPool; - let mockPrometheusClient; - let metricIncStub; let mockEventEmitter; beforeEach(function () { - metricIncStub = sinon.stub(); mockJobModel = {}; mockConfig = { get: sinon.stub().returns({}) @@ -22,18 +20,18 @@ describe('JobQueueManager', function () { info: sinon.stub(), error: sinon.stub() }; - mockPrometheusClient = { - getMetric: sinon.stub().returns({ - set: sinon.stub(), - inc: metricIncStub - }), - registerCounter: sinon.stub(), - registerGauge: sinon.stub() + mockMetricLogger = { + metric: sinon.stub() }; mockWorkerPool = { pool: sinon.stub().returns({ exec: sinon.stub(), - stats: sinon.stub().returns({}), + stats: sinon.stub().returns({ + totalWorkers: 1, + busyWorkers: 0, + idleWorkers: 1, + activeTasks: 0 + }), terminate: sinon.stub() }) }; @@ -45,8 +43,8 @@ describe('JobQueueManager', function () { JobModel: mockJobModel, config: mockConfig, logger: mockLogger, + metricLogger: mockMetricLogger, WorkerPool: mockWorkerPool, - prometheusClient: mockPrometheusClient, eventEmitter: mockEventEmitter }); }); @@ -142,6 +140,7 @@ describe('JobQueueManager', function () { expect(jobQueueManager.jobsRepository.getQueuedJobs.calledOnce).to.be.true; expect(jobQueueManager.updatePollInterval.calledOnceWith(mockJobs)).to.be.true; expect(jobQueueManager.processJobs.calledOnceWith(mockJobs)).to.be.true; + expect(jobQueueManager.metricCache.queueDepth).to.equal(mockJobs.length); }); }); @@ -309,18 +308,18 @@ describe('JobQueueManager', function () { expect(jobQueueManager.state.queuedJobs.has('testJob')).to.be.false; }); - it('should increment the job_manager_queue_job_completion_count metric', async function () { + it('should increment the metricCache.jobCompletionCount metric', async function () { const job = {id: '1', get: sinon.stub().returns('{"job": "testJob", "data": {}}')}; sinon.stub(jobQueueManager.jobsRepository, 'delete').resolves(); await jobQueueManager.executeJob(job, 'testJob', {job: 'testJob', data: {}}); - expect(metricIncStub.calledOnce).to.be.true; + expect(jobQueueManager.metricCache.jobCompletionCount).to.equal(1); }); - it('should increment the email_analytics_aggregate_member_stats_count metric', async function () { + it('should increment the metricCache.emailAnalyticsAggregateMemberStatsCount metric', async function () { const job = {id: '1', get: sinon.stub().returns('{"job": "update-member-email-analytics", "data": {}}')}; sinon.stub(jobQueueManager.jobsRepository, 'delete').resolves(); await jobQueueManager.executeJob(job, 'update-member-email-analytics', {job: 'update-member-email-analytics', data: {}}); - expect(metricIncStub.calledTwice).to.be.true; + expect(jobQueueManager.metricCache.emailAnalyticsAggregateMemberStatsCount).to.equal(1); }); it('should emit events if present in result', async function () { @@ -379,4 +378,62 @@ describe('JobQueueManager', function () { expect(calledUpdateData.finished_at).to.be.instanceOf(Date); }); }); + + describe('reportStats', function () { + it('should log the stats every reportInterval', async function () { + const clock = sinon.useFakeTimers(); + const reportStatsStub = sinon.stub(jobQueueManager, '_doReportStats'); + jobQueueManager.config.reportInterval = 1000; + jobQueueManager.reportStats(); + await clock.tickAsync(1000); + expect(reportStatsStub.calledOnce).to.be.true; + clock.restore(); + }); + + it('should not log the stats if reportStats is false', async function () { + const clock = sinon.useFakeTimers(); + jobQueueManager.config.reportStats = false; + const reportStatsStub = sinon.stub(jobQueueManager, '_doReportStats'); + jobQueueManager.reportStats(); + await clock.tickAsync(2000); + expect(reportStatsStub.called).to.be.false; + clock.restore(); + }); + }); + + describe('_doReportStats', function () { + it('should log the stats using the logger', function () { + const loggerInfoStub = sinon.stub(jobQueueManager.logger, 'info'); + jobQueueManager._doReportStats(); + const expectedStats = { + totalWorkers: 1, + busyWorkers: 0, + idleWorkers: 1, + activeTasks: 0, + jobCompletionCount: 0, + queueDepth: 0, + emailAnalyticsAggregateMemberStatsCount: 0 + }; + const expectedLog = `Job Queue Stats: ${JSON.stringify(expectedStats, null, 2)}`; + expect(loggerInfoStub.calledOnce).to.be.true; + expect(loggerInfoStub.calledWith(expectedLog)).to.be.true; + }); + + it('should log the stats using the metricLogger', function () { + jobQueueManager._doReportStats(); + const expectedStats = { + totalWorkers: 1, + busyWorkers: 0, + idleWorkers: 1, + activeTasks: 0, + jobCompletionCount: 0, + queueDepth: 0, + emailAnalyticsAggregateMemberStatsCount: 0 + }; + expect(mockMetricLogger.metric.calledOnce).to.be.true; + const args = mockMetricLogger.metric.args[0]; + expect(args[0]).to.equal('job_manager_queue'); + expect(args[1]).to.deep.equal(expectedStats); + }); + }); }); \ No newline at end of file