From 4a254c55c81248235c2ed7dfdec378ebe3d6f026 Mon Sep 17 00:00:00 2001 From: diced Date: Mon, 19 Jun 2023 11:24:04 -0700 Subject: [PATCH] fix: excessive worker count (#425) --- src/server/index.ts | 34 +++++++++-- src/worker/thumbnail.ts | 122 +++++++++++++++++++++++----------------- 2 files changed, 100 insertions(+), 56 deletions(-) diff --git a/src/server/index.ts b/src/server/index.ts index 40231fa..3d5ebf1 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -1,11 +1,12 @@ import config from 'lib/config'; import datasource from 'lib/datasource'; import Logger from 'lib/logger'; -import { version } from '../../package.json'; import { getStats } from 'server/util'; +import { version } from '../../package.json'; import fastify, { FastifyInstance, FastifyServerOptions } from 'fastify'; import { createReadStream, existsSync, readFileSync } from 'fs'; +import { Worker } from 'worker_threads'; import dbFileDecorator from './decorators/dbFile'; import notFound from './decorators/notFound'; import postFileDecorator from './decorators/postFile'; @@ -21,7 +22,6 @@ import prismaPlugin from './plugins/prisma'; import rawRoute from './routes/raw'; import uploadsRoute, { uploadsRouteOnResponse } from './routes/uploads'; import urlsRoute, { urlsRouteOnResponse } from './routes/urls'; -import { Worker } from 'worker_threads'; const dev = process.env.NODE_ENV === 'development'; const logger = Logger.get('server'); @@ -228,14 +228,38 @@ async function thumbs(this: FastifyInstance) { }, thumbnail: null, }, + include: { + thumbnail: true, + }, }); - logger.child('thumb').debug(`found ${videoFiles.length} videos without thumbnails`); + // avoids reaching prisma connection limit + const MAX_THUMB_THREADS = 4; + + // make all the files fit into 4 arrays + const chunks = []; + + for (let i = 0; i !== MAX_THUMB_THREADS; ++i) { + chunks.push([]); + + for (let j = i; j < videoFiles.length; j += MAX_THUMB_THREADS) { + chunks[i].push(videoFiles[j]); + } + } + + logger.child('thumbnail').debug(`starting ${chunks.length} thumbnail threads`); + + for (let i = 0; i !== chunks.length; ++i) { + const chunk = chunks[i]; + if (chunk.length === 0) continue; + + logger.child('thumbnail').debug(`starting thumbnail generation for ${chunk.length} videos`); - for (const file of videoFiles) { new Worker('./dist/worker/thumbnail.js', { workerData: { - id: file.id, + videos: chunk, + config, + datasource, }, }); } diff --git a/src/worker/thumbnail.ts b/src/worker/thumbnail.ts index 9fcb0f7..415fa65 100644 --- a/src/worker/thumbnail.ts +++ b/src/worker/thumbnail.ts @@ -1,18 +1,25 @@ -import { File } from '@prisma/client'; +import { type File, PrismaClient, type Thumbnail } from '@prisma/client'; import { spawn } from 'child_process'; import ffmpeg from 'ffmpeg-static'; import { createWriteStream } from 'fs'; import { rm } from 'fs/promises'; -import config from 'lib/config'; -import datasource from 'lib/datasource'; +import type { Config } from 'lib/config/Config'; import Logger from 'lib/logger'; -import prisma from 'lib/prisma'; +import { randomChars } from 'lib/util'; import { join } from 'path'; import { isMainThread, workerData } from 'worker_threads'; +import datasource from 'lib/datasource'; -const { id } = workerData as { id: number }; +const { videos, config } = workerData as { + videos: (File & { + thumbnail: Thumbnail; + })[]; + config: Config; +}; -const logger = Logger.get('worker::thumbnail').child(id.toString() ?? 'unknown-ident'); +const logger = Logger.get('worker::thumbnail').child(randomChars(4)); + +logger.debug(`thumbnail generation for ${videos.length} videos`); if (isMainThread) { logger.error('worker is not a thread'); @@ -24,9 +31,30 @@ async function loadThumbnail(path) { const child = spawn(ffmpeg, args, { stdio: ['ignore', 'pipe', 'ignore'] }); - const data: Promise = new Promise((resolve, reject) => { - child.stdout.once('data', resolve); + const data: Buffer = await new Promise((resolve, reject) => { + const buffers = []; + + child.stdout.on('data', (chunk) => { + buffers.push(chunk); + }); + child.once('error', reject); + child.once('close', (code) => { + if (code !== 0) { + reject(new Error(`child exited with code ${code}`)); + } else { + const buffer = Buffer.allocUnsafe(buffers.reduce((acc, val) => acc + val.length, 0)); + + let offset = 0; + for (let i = 0; i !== buffers.length; ++i) { + const chunk = buffers[i]; + chunk.copy(buffer, offset); + offset += chunk.length; + } + + resolve(buffer); + } + }); }); return data; @@ -49,59 +77,51 @@ async function loadFileTmp(file: File) { } async function start() { - const file = await prisma.file.findUnique({ - where: { - id, - }, - include: { - thumbnail: true, - }, - }); + const prisma = new PrismaClient(); - if (!file) { - logger.error('file not found'); - process.exit(1); - } + for (let i = 0; i !== videos.length; ++i) { + const file = videos[i]; + if (!file.mimetype.startsWith('video/')) { + logger.info('file is not a video'); + process.exit(0); + } - if (!file.mimetype.startsWith('video/')) { - logger.info('file is not a video'); - process.exit(0); - } + if (file.thumbnail) { + logger.info('thumbnail already exists'); + process.exit(0); + } - if (file.thumbnail) { - logger.info('thumbnail already exists'); - process.exit(0); - } + const tmpFile = await loadFileTmp(file); + logger.debug(`loaded file to tmp: ${tmpFile}`); + const thumbnail = await loadThumbnail(tmpFile); + logger.debug(`loaded thumbnail: ${thumbnail.length} bytes mjpeg`); - const tmpFile = await loadFileTmp(file); - logger.debug(`loaded file to tmp: ${tmpFile}`); - const thumbnail = await loadThumbnail(tmpFile); - logger.debug(`loaded thumbnail: ${thumbnail.length} bytes mjpeg`); - - const { thumbnail: thumb } = await prisma.file.update({ - where: { - id: file.id, - }, - data: { - thumbnail: { - create: { - name: `.thumb-${file.id}.jpg`, + const { thumbnail: thumb } = await prisma.file.update({ + where: { + id: file.id, + }, + data: { + thumbnail: { + create: { + name: `.thumb-${file.id}.jpg`, + }, }, }, - }, - select: { - thumbnail: true, - }, - }); + select: { + thumbnail: true, + }, + }); - await datasource.save(thumb.name, thumbnail); + await datasource.save(thumb.name, thumbnail); - logger.info(`thumbnail saved - ${thumb.name}`); - logger.debug(`thumbnail ${JSON.stringify(thumb)}`); + logger.info(`thumbnail saved - ${thumb.name}`); + logger.debug(`thumbnail ${JSON.stringify(thumb)}`); - logger.debug(`removing tmp file: ${tmpFile}`); - await rm(tmpFile); + logger.debug(`removing tmp file: ${tmpFile}`); + await rm(tmpFile); + } + await prisma.$disconnect(); process.exit(0); }