0
Fork 0
mirror of https://github.com/TryGhost/Ghost.git synced 2025-02-17 23:44:39 -05:00

Added metric logging for the JobQueueManager (#22157)

refs
https://linear.app/ghost/issue/ENG-1987/monitor-email-analytics-via-ghost-logging-in-some-form

- Since we decided not to fully roll out prometheus for metrics
collection, we need an alternative method to collect metrics from the
`JobQueueManager`
- This commit removes the prometheus-based metrics from the
`JobQueueManager` in favor of `@tryghost/metrics`
This commit is contained in:
Chris Raible 2025-02-10 16:17:09 -08:00
parent 734ad0c7a1
commit 31e41054f3
4 changed files with 100 additions and 39 deletions

View file

@ -9,7 +9,6 @@ const models = require('../../models');
const sentry = require('../../../shared/sentry');
const domainEvents = require('@tryghost/domain-events');
const config = require('../../../shared/config');
const prometheusClient = require('../../../shared/prometheus-client');
const errorHandler = (error, workerMeta) => {
logging.info(`Capturing error for worker during execution of job: ${workerMeta.name}`);
logging.error(error);
@ -44,7 +43,7 @@ const initTestMode = () => {
}, 5000);
};
const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, config, prometheusClient, events});
const jobManager = new JobManager({errorHandler, workerMessageHandler, JobModel: models.Job, domainEvents, config, events});
module.exports = jobManager;
module.exports.initTestMode = initTestMode;

View file

@ -7,6 +7,7 @@ const Bree = require('bree');
const pWaitFor = require('p-wait-for');
const {UnhandledJobError, IncorrectUsageError} = require('@tryghost/errors');
const logging = require('@tryghost/logging');
const metrics = require('@tryghost/metrics');
const isCronExpression = require('./is-cron-expression');
const assembleBreeJob = require('./assemble-bree-job');
const JobsRepository = require('./JobsRepository');
@ -53,10 +54,9 @@ class JobManager {
* @param {Object} [options.config] - config
* @param {boolean} [options.isDuplicate] - if true, the job manager will not initialize the job queue
* @param {JobQueueManager} [options.jobQueueManager] - job queue manager instance (for testing)
* @param {Object} [options.prometheusClient] - prometheus client instance (for testing)
* @param {Object} [options.events] - events instance (for testing)
*/
constructor({errorHandler, workerMessageHandler, JobModel, domainEvents, config, isDuplicate = false, jobQueueManager = null, prometheusClient = null, events = null}) {
constructor({errorHandler, workerMessageHandler, JobModel, domainEvents, config, isDuplicate = false, jobQueueManager = null, events = null}) {
this.inlineQueue = fastq(this, worker, 3);
this._jobMessageHandler = this._jobMessageHandler.bind(this);
this._jobErrorHandler = this._jobErrorHandler.bind(this);
@ -95,8 +95,6 @@ class JobManager {
if (JobModel) {
this._jobsRepository = new JobsRepository({JobModel});
}
this.prometheusClient = prometheusClient;
if (jobQueueManager) {
this.#jobQueueManager = jobQueueManager;
@ -107,7 +105,7 @@ class JobManager {
#initializeJobQueueManager() {
if (this.#config?.get('services:jobs:queue:enabled') === true && !this.#jobQueueManager) {
this.#jobQueueManager = new JobQueueManager({JobModel: this.#JobModel, config: this.#config, prometheusClient: this.prometheusClient, eventEmitter: this.#events});
this.#jobQueueManager = new JobQueueManager({JobModel: this.#JobModel, config: this.#config, eventEmitter: this.#events, metricLogger: metrics});
this.#jobQueueManager.init();
}
}

View file

@ -3,26 +3,23 @@ const path = require('path');
const JobsRepository = require('./JobsRepository');
const debug = require('@tryghost/debug')('job-manager:JobQueueManager');
const logging = require('@tryghost/logging');
const metrics = require('@tryghost/metrics');
class JobQueueManager {
constructor({JobModel, config, logger = logging, WorkerPool = workerpool, prometheusClient, eventEmitter}) {
constructor({JobModel, config, logger = logging, metricLogger = metrics, WorkerPool = workerpool, eventEmitter}) {
this.jobsRepository = new JobsRepository({JobModel});
this.config = this.initializeConfig(config?.get('services:jobs:queue') || {});
this.logger = this.createLogger(logger, this.config.logLevel);
this.metricLogger = metricLogger;
this.WorkerPool = WorkerPool;
this.pool = this.createWorkerPool();
this.state = this.initializeState();
this.eventEmitter = eventEmitter;
this.prometheusClient = prometheusClient;
if (prometheusClient) {
this.prometheusClient.registerCounter({
name: 'job_manager_queue_job_completion_count',
help: 'The number of jobs completed by the job manager queue',
labelNames: ['jobName']
});
this.prometheusClient.registerGauge({name: 'job_manager_queue_depth', help: 'The number of jobs in the job manager queue'});
}
this.metricCache = {
jobCompletionCount: 0,
queueDepth: 0,
emailAnalyticsAggregateMemberStatsCount: 0
};
}
createLogger(logger, logLevel) {
@ -114,7 +111,7 @@ class JobQueueManager {
if (stats.pendingTasks <= this.config.QUEUE_CAPACITY) {
const entriesToAdd = Math.min(this.config.FETCH_COUNT, this.config.FETCH_COUNT - stats.pendingTasks);
const {data: jobs, total} = await this.jobsRepository.getQueuedJobs(entriesToAdd);
this.prometheusClient?.getMetric('job_manager_queue_depth')?.set(total || 0);
this.metricCache.queueDepth = total || 0;
this.logger.debug(`Adding up to ${entriesToAdd} queue entries. Current pending tasks: ${stats.pendingTasks}. Current worker count: ${stats.totalWorkers}. Current depth: ${total}.`);
this.updatePollInterval(jobs);
await this.processJobs(jobs);
@ -164,9 +161,9 @@ class JobQueueManager {
*/
const result = await this.pool.exec('executeJob', [jobMetadata.job, jobMetadata.data]);
await this.jobsRepository.delete(job.id);
this.prometheusClient?.getMetric('job_manager_queue_job_completion_count')?.inc({jobName});
this.metricCache.jobCompletionCount += 1;
if (jobName === 'update-member-email-analytics') {
this.prometheusClient?.getMetric('email_analytics_aggregate_member_stats_count')?.inc();
this.metricCache.emailAnalyticsAggregateMemberStatsCount += 1;
}
if (result && result.eventData) {
this.emitEvents(result.eventData.events); // this is nested within eventData because we may want to support DomainEvents emission as well
@ -212,11 +209,21 @@ class JobQueueManager {
reportStats() {
setInterval(() => {
this.logger.info('-- job queue stats --');
this.logger.info(JSON.stringify(this.pool.stats(), null, 2));
this._doReportStats();
}, this.config.reportInterval);
}
_doReportStats() {
const poolStats = this.pool.stats();
const stats = {
...poolStats,
...this.metricCache
};
const statsString = JSON.stringify(stats, null, 2);
this.logger.info(`Job Queue Stats: ${statsString}`);
this.metricLogger.metric('job_manager_queue', stats);
}
async shutdown() {
try {
await this.pool.terminate();

View file

@ -7,12 +7,10 @@ describe('JobQueueManager', function () {
let mockJobModel;
let mockConfig;
let mockLogger;
let mockMetricLogger;
let mockWorkerPool;
let mockPrometheusClient;
let metricIncStub;
let mockEventEmitter;
beforeEach(function () {
metricIncStub = sinon.stub();
mockJobModel = {};
mockConfig = {
get: sinon.stub().returns({})
@ -22,18 +20,18 @@ describe('JobQueueManager', function () {
info: sinon.stub(),
error: sinon.stub()
};
mockPrometheusClient = {
getMetric: sinon.stub().returns({
set: sinon.stub(),
inc: metricIncStub
}),
registerCounter: sinon.stub(),
registerGauge: sinon.stub()
mockMetricLogger = {
metric: sinon.stub()
};
mockWorkerPool = {
pool: sinon.stub().returns({
exec: sinon.stub(),
stats: sinon.stub().returns({}),
stats: sinon.stub().returns({
totalWorkers: 1,
busyWorkers: 0,
idleWorkers: 1,
activeTasks: 0
}),
terminate: sinon.stub()
})
};
@ -45,8 +43,8 @@ describe('JobQueueManager', function () {
JobModel: mockJobModel,
config: mockConfig,
logger: mockLogger,
metricLogger: mockMetricLogger,
WorkerPool: mockWorkerPool,
prometheusClient: mockPrometheusClient,
eventEmitter: mockEventEmitter
});
});
@ -142,6 +140,7 @@ describe('JobQueueManager', function () {
expect(jobQueueManager.jobsRepository.getQueuedJobs.calledOnce).to.be.true;
expect(jobQueueManager.updatePollInterval.calledOnceWith(mockJobs)).to.be.true;
expect(jobQueueManager.processJobs.calledOnceWith(mockJobs)).to.be.true;
expect(jobQueueManager.metricCache.queueDepth).to.equal(mockJobs.length);
});
});
@ -309,18 +308,18 @@ describe('JobQueueManager', function () {
expect(jobQueueManager.state.queuedJobs.has('testJob')).to.be.false;
});
it('should increment the job_manager_queue_job_completion_count metric', async function () {
it('should increment the metricCache.jobCompletionCount metric', async function () {
const job = {id: '1', get: sinon.stub().returns('{"job": "testJob", "data": {}}')};
sinon.stub(jobQueueManager.jobsRepository, 'delete').resolves();
await jobQueueManager.executeJob(job, 'testJob', {job: 'testJob', data: {}});
expect(metricIncStub.calledOnce).to.be.true;
expect(jobQueueManager.metricCache.jobCompletionCount).to.equal(1);
});
it('should increment the email_analytics_aggregate_member_stats_count metric', async function () {
it('should increment the metricCache.emailAnalyticsAggregateMemberStatsCount metric', async function () {
const job = {id: '1', get: sinon.stub().returns('{"job": "update-member-email-analytics", "data": {}}')};
sinon.stub(jobQueueManager.jobsRepository, 'delete').resolves();
await jobQueueManager.executeJob(job, 'update-member-email-analytics', {job: 'update-member-email-analytics', data: {}});
expect(metricIncStub.calledTwice).to.be.true;
expect(jobQueueManager.metricCache.emailAnalyticsAggregateMemberStatsCount).to.equal(1);
});
it('should emit events if present in result', async function () {
@ -379,4 +378,62 @@ describe('JobQueueManager', function () {
expect(calledUpdateData.finished_at).to.be.instanceOf(Date);
});
});
describe('reportStats', function () {
it('should log the stats every reportInterval', async function () {
const clock = sinon.useFakeTimers();
const reportStatsStub = sinon.stub(jobQueueManager, '_doReportStats');
jobQueueManager.config.reportInterval = 1000;
jobQueueManager.reportStats();
await clock.tickAsync(1000);
expect(reportStatsStub.calledOnce).to.be.true;
clock.restore();
});
it('should not log the stats if reportStats is false', async function () {
const clock = sinon.useFakeTimers();
jobQueueManager.config.reportStats = false;
const reportStatsStub = sinon.stub(jobQueueManager, '_doReportStats');
jobQueueManager.reportStats();
await clock.tickAsync(2000);
expect(reportStatsStub.called).to.be.false;
clock.restore();
});
});
describe('_doReportStats', function () {
it('should log the stats using the logger', function () {
const loggerInfoStub = sinon.stub(jobQueueManager.logger, 'info');
jobQueueManager._doReportStats();
const expectedStats = {
totalWorkers: 1,
busyWorkers: 0,
idleWorkers: 1,
activeTasks: 0,
jobCompletionCount: 0,
queueDepth: 0,
emailAnalyticsAggregateMemberStatsCount: 0
};
const expectedLog = `Job Queue Stats: ${JSON.stringify(expectedStats, null, 2)}`;
expect(loggerInfoStub.calledOnce).to.be.true;
expect(loggerInfoStub.calledWith(expectedLog)).to.be.true;
});
it('should log the stats using the metricLogger', function () {
jobQueueManager._doReportStats();
const expectedStats = {
totalWorkers: 1,
busyWorkers: 0,
idleWorkers: 1,
activeTasks: 0,
jobCompletionCount: 0,
queueDepth: 0,
emailAnalyticsAggregateMemberStatsCount: 0
};
expect(mockMetricLogger.metric.calledOnce).to.be.true;
const args = mockMetricLogger.metric.args[0];
expect(args[0]).to.equal('job_manager_queue');
expect(args[1]).to.deep.equal(expectedStats);
});
});
});