fix: excessive worker count (#425)

This commit is contained in:
diced 2023-06-19 11:24:04 -07:00
parent 226d946ec8
commit 4a254c55c8
No known key found for this signature in database
GPG key ID: 370BD1BA142842D1
2 changed files with 100 additions and 56 deletions

View file

@ -1,11 +1,12 @@
import config from 'lib/config'; import config from 'lib/config';
import datasource from 'lib/datasource'; import datasource from 'lib/datasource';
import Logger from 'lib/logger'; import Logger from 'lib/logger';
import { version } from '../../package.json';
import { getStats } from 'server/util'; import { getStats } from 'server/util';
import { version } from '../../package.json';
import fastify, { FastifyInstance, FastifyServerOptions } from 'fastify'; import fastify, { FastifyInstance, FastifyServerOptions } from 'fastify';
import { createReadStream, existsSync, readFileSync } from 'fs'; import { createReadStream, existsSync, readFileSync } from 'fs';
import { Worker } from 'worker_threads';
import dbFileDecorator from './decorators/dbFile'; import dbFileDecorator from './decorators/dbFile';
import notFound from './decorators/notFound'; import notFound from './decorators/notFound';
import postFileDecorator from './decorators/postFile'; import postFileDecorator from './decorators/postFile';
@ -21,7 +22,6 @@ import prismaPlugin from './plugins/prisma';
import rawRoute from './routes/raw'; import rawRoute from './routes/raw';
import uploadsRoute, { uploadsRouteOnResponse } from './routes/uploads'; import uploadsRoute, { uploadsRouteOnResponse } from './routes/uploads';
import urlsRoute, { urlsRouteOnResponse } from './routes/urls'; import urlsRoute, { urlsRouteOnResponse } from './routes/urls';
import { Worker } from 'worker_threads';
const dev = process.env.NODE_ENV === 'development'; const dev = process.env.NODE_ENV === 'development';
const logger = Logger.get('server'); const logger = Logger.get('server');
@ -228,14 +228,38 @@ async function thumbs(this: FastifyInstance) {
}, },
thumbnail: null, thumbnail: null,
}, },
include: {
thumbnail: true,
},
}); });
logger.child('thumb').debug(`found ${videoFiles.length} videos without thumbnails`); // avoids reaching prisma connection limit
const MAX_THUMB_THREADS = 4;
// make all the files fit into 4 arrays
const chunks = [];
for (let i = 0; i !== MAX_THUMB_THREADS; ++i) {
chunks.push([]);
for (let j = i; j < videoFiles.length; j += MAX_THUMB_THREADS) {
chunks[i].push(videoFiles[j]);
}
}
logger.child('thumbnail').debug(`starting ${chunks.length} thumbnail threads`);
for (let i = 0; i !== chunks.length; ++i) {
const chunk = chunks[i];
if (chunk.length === 0) continue;
logger.child('thumbnail').debug(`starting thumbnail generation for ${chunk.length} videos`);
for (const file of videoFiles) {
new Worker('./dist/worker/thumbnail.js', { new Worker('./dist/worker/thumbnail.js', {
workerData: { workerData: {
id: file.id, videos: chunk,
config,
datasource,
}, },
}); });
} }

View file

@ -1,18 +1,25 @@
import { File } from '@prisma/client'; import { type File, PrismaClient, type Thumbnail } from '@prisma/client';
import { spawn } from 'child_process'; import { spawn } from 'child_process';
import ffmpeg from 'ffmpeg-static'; import ffmpeg from 'ffmpeg-static';
import { createWriteStream } from 'fs'; import { createWriteStream } from 'fs';
import { rm } from 'fs/promises'; import { rm } from 'fs/promises';
import config from 'lib/config'; import type { Config } from 'lib/config/Config';
import datasource from 'lib/datasource';
import Logger from 'lib/logger'; import Logger from 'lib/logger';
import prisma from 'lib/prisma'; import { randomChars } from 'lib/util';
import { join } from 'path'; import { join } from 'path';
import { isMainThread, workerData } from 'worker_threads'; import { isMainThread, workerData } from 'worker_threads';
import datasource from 'lib/datasource';
const { id } = workerData as { id: number }; const { videos, config } = workerData as {
videos: (File & {
thumbnail: Thumbnail;
})[];
config: Config;
};
const logger = Logger.get('worker::thumbnail').child(id.toString() ?? 'unknown-ident'); const logger = Logger.get('worker::thumbnail').child(randomChars(4));
logger.debug(`thumbnail generation for ${videos.length} videos`);
if (isMainThread) { if (isMainThread) {
logger.error('worker is not a thread'); logger.error('worker is not a thread');
@ -24,9 +31,30 @@ async function loadThumbnail(path) {
const child = spawn(ffmpeg, args, { stdio: ['ignore', 'pipe', 'ignore'] }); const child = spawn(ffmpeg, args, { stdio: ['ignore', 'pipe', 'ignore'] });
const data: Promise<Buffer> = new Promise((resolve, reject) => { const data: Buffer = await new Promise((resolve, reject) => {
child.stdout.once('data', resolve); const buffers = [];
child.stdout.on('data', (chunk) => {
buffers.push(chunk);
});
child.once('error', reject); child.once('error', reject);
child.once('close', (code) => {
if (code !== 0) {
reject(new Error(`child exited with code ${code}`));
} else {
const buffer = Buffer.allocUnsafe(buffers.reduce((acc, val) => acc + val.length, 0));
let offset = 0;
for (let i = 0; i !== buffers.length; ++i) {
const chunk = buffers[i];
chunk.copy(buffer, offset);
offset += chunk.length;
}
resolve(buffer);
}
});
}); });
return data; return data;
@ -49,59 +77,51 @@ async function loadFileTmp(file: File) {
} }
async function start() { async function start() {
const file = await prisma.file.findUnique({ const prisma = new PrismaClient();
where: {
id,
},
include: {
thumbnail: true,
},
});
if (!file) { for (let i = 0; i !== videos.length; ++i) {
logger.error('file not found'); const file = videos[i];
process.exit(1); if (!file.mimetype.startsWith('video/')) {
} logger.info('file is not a video');
process.exit(0);
}
if (!file.mimetype.startsWith('video/')) { if (file.thumbnail) {
logger.info('file is not a video'); logger.info('thumbnail already exists');
process.exit(0); process.exit(0);
} }
if (file.thumbnail) { const tmpFile = await loadFileTmp(file);
logger.info('thumbnail already exists'); logger.debug(`loaded file to tmp: ${tmpFile}`);
process.exit(0); const thumbnail = await loadThumbnail(tmpFile);
} logger.debug(`loaded thumbnail: ${thumbnail.length} bytes mjpeg`);
const tmpFile = await loadFileTmp(file); const { thumbnail: thumb } = await prisma.file.update({
logger.debug(`loaded file to tmp: ${tmpFile}`); where: {
const thumbnail = await loadThumbnail(tmpFile); id: file.id,
logger.debug(`loaded thumbnail: ${thumbnail.length} bytes mjpeg`); },
data: {
const { thumbnail: thumb } = await prisma.file.update({ thumbnail: {
where: { create: {
id: file.id, name: `.thumb-${file.id}.jpg`,
}, },
data: {
thumbnail: {
create: {
name: `.thumb-${file.id}.jpg`,
}, },
}, },
}, select: {
select: { thumbnail: true,
thumbnail: true, },
}, });
});
await datasource.save(thumb.name, thumbnail); await datasource.save(thumb.name, thumbnail);
logger.info(`thumbnail saved - ${thumb.name}`); logger.info(`thumbnail saved - ${thumb.name}`);
logger.debug(`thumbnail ${JSON.stringify(thumb)}`); logger.debug(`thumbnail ${JSON.stringify(thumb)}`);
logger.debug(`removing tmp file: ${tmpFile}`); logger.debug(`removing tmp file: ${tmpFile}`);
await rm(tmpFile); await rm(tmpFile);
}
await prisma.$disconnect();
process.exit(0); process.exit(0);
} }