From eedeb89c7d75c951998e3571df2ca38af1d8e9ad Mon Sep 17 00:00:00 2001 From: dicedtomato <35403473+diced@users.noreply.github.com> Date: Mon, 3 Apr 2023 22:42:27 -0700 Subject: [PATCH] feat: offloaded chunked uploads (#356) * feat: offloaded chunked uploads * fix: use temp_directory instead of tmpdir() * feat: CHUNKS_ENABLED config --- .../migration.sql | 18 ++ prisma/schema.prisma | 55 +++-- .../pages/Files/PendingFilesModal.tsx | 118 +++++++++ src/components/pages/Files/index.tsx | 15 +- src/components/pages/Upload/File.tsx | 29 +-- src/lib/config/Config.ts | 1 + src/lib/config/readConfig.ts | 1 + src/lib/config/validateConfig.ts | 2 + src/pages/api/upload.ts | 105 ++------ src/pages/api/user/pending.ts | 40 ++++ src/worker/upload.ts | 225 ++++++++++++++++++ tsup.config.ts | 6 + 12 files changed, 490 insertions(+), 125 deletions(-) create mode 100644 prisma/migrations/20230328005320_incomplete_file/migration.sql create mode 100644 src/components/pages/Files/PendingFilesModal.tsx create mode 100644 src/pages/api/user/pending.ts create mode 100644 src/worker/upload.ts diff --git a/prisma/migrations/20230328005320_incomplete_file/migration.sql b/prisma/migrations/20230328005320_incomplete_file/migration.sql new file mode 100644 index 0000000..0d8d4a8 --- /dev/null +++ b/prisma/migrations/20230328005320_incomplete_file/migration.sql @@ -0,0 +1,18 @@ +-- CreateEnum +CREATE TYPE "ProcessingStatus" AS ENUM ('PENDING', 'PROCESSING', 'COMPLETE'); + +-- CreateTable +CREATE TABLE "IncompleteFile" ( + "id" SERIAL NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "status" "ProcessingStatus" NOT NULL, + "chunks" INTEGER NOT NULL, + "chunksComplete" INTEGER NOT NULL, + "userId" INTEGER NOT NULL, + "data" JSONB NOT NULL, + + CONSTRAINT "IncompleteFile_pkey" PRIMARY KEY ("id") +); + +-- AddForeignKey +ALTER TABLE "IncompleteFile" ADD CONSTRAINT "IncompleteFile_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index ffb215e..7dad66b 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -8,23 +8,24 @@ generator client { } model User { - id Int @id @default(autoincrement()) - username String - password String? - avatar String? - token String - administrator Boolean @default(false) - superAdmin Boolean @default(false) - systemTheme String @default("system") - embed Json @default("{}") - ratelimit DateTime? - totpSecret String? - domains String[] - oauth OAuth[] - files File[] - urls Url[] - Invite Invite[] - Folder Folder[] + id Int @id @default(autoincrement()) + username String + password String? + avatar String? + token String + administrator Boolean @default(false) + superAdmin Boolean @default(false) + systemTheme String @default("system") + embed Json @default("{}") + ratelimit DateTime? + totpSecret String? + domains String[] + oauth OAuth[] + files File[] + urls Url[] + Invite Invite[] + Folder Folder[] + IncompleteFile IncompleteFile[] } model Folder { @@ -126,3 +127,23 @@ enum OauthProviders { GITHUB GOOGLE } + +model IncompleteFile { + id Int @id @default(autoincrement()) + createdAt DateTime @default(now()) + + status ProcessingStatus + chunks Int + chunksComplete Int + + user User @relation(fields: [userId], references: [id], onDelete: Cascade) + userId Int + + data Json +} + +enum ProcessingStatus { + PENDING + PROCESSING + COMPLETE +} diff --git a/src/components/pages/Files/PendingFilesModal.tsx b/src/components/pages/Files/PendingFilesModal.tsx new file mode 100644 index 0000000..d3f6b60 --- /dev/null +++ b/src/components/pages/Files/PendingFilesModal.tsx @@ -0,0 +1,118 @@ +import { Button, Modal, Title, Tooltip } from '@mantine/core'; +import { IconTrash } from '@tabler/icons-react'; +import AnchorNext from 'components/AnchorNext'; +import MutedText from 'components/MutedText'; +import useFetch from 'hooks/useFetch'; +import { DataTable } from 'mantine-datatable'; +import { useEffect, useState } from 'react'; + +export type PendingFiles = { + id: number; + createdAt: string; + status: string; + chunks: number; + chunksComplete: number; + userId: number; + data: { + file: { + filename: string; + mimetype: string; + lastchunk: boolean; + identifier: string; + totalBytes: number; + }; + code?: number; + message?: string; + }; +}; + +export default function PendingFilesModal({ open, onClose }) { + const [incFiles, setIncFiles] = useState([]); + const [loading, setLoading] = useState(true); + + const [selectedFiles, setSelectedFiles] = useState([]); + + async function updateIncFiles() { + setLoading(true); + + const files = await useFetch('/api/user/pending'); + setIncFiles(files); + + setLoading(false); + } + + async function deleteIncFiles() { + await useFetch('/api/user/pending', 'DELETE', { + id: selectedFiles.map((file) => file.id), + }); + updateIncFiles(); + setSelectedFiles([]); + } + + useEffect(() => { + updateIncFiles(); + }, []); + + useEffect(() => { + const interval = setInterval(() => { + if (open) updateIncFiles(); + }, 5000); + + return () => clearInterval(interval); + }, [open]); + + return ( + Pending Files} size='auto' opened={open} onClose={onClose}> + Refreshing every 5 seconds... + new Date(file.createdAt).toLocaleString() }, + { accessor: 'status', render: (file) => file.status.toLowerCase() }, + { + accessor: 'progress', + title: 'Progress', + render: (file) => `${file.chunksComplete}/${file.chunks} chunks`, + }, + { + accessor: 'message', + render: (file) => + file.data.code === 200 ? ( + + view file + + ) : ( + file.data.message + ), + }, + ]} + fetching={loading} + loaderBackgroundBlur={5} + loaderVariant='dots' + onSelectedRecordsChange={setSelectedFiles} + selectedRecords={selectedFiles} + /> + + {selectedFiles.length ? ( + + + + ) : null} + + ); +} diff --git a/src/components/pages/Files/index.tsx b/src/components/pages/Files/index.tsx index aae1cc6..9a03bbe 100644 --- a/src/components/pages/Files/index.tsx +++ b/src/components/pages/Files/index.tsx @@ -1,17 +1,20 @@ -import { Accordion, ActionIcon, Box, Group, Pagination, SimpleGrid, Title } from '@mantine/core'; -import { IconFileUpload } from '@tabler/icons-react'; +import { Accordion, ActionIcon, Box, Group, Pagination, SimpleGrid, Title, Tooltip } from '@mantine/core'; +import { IconFileUpload, IconPhotoUp } from '@tabler/icons-react'; import File from 'components/File'; import useFetch from 'hooks/useFetch'; import { usePaginatedFiles } from 'lib/queries/files'; import Link from 'next/link'; import { useEffect, useState } from 'react'; import FilePagation from './FilePagation'; +import PendingFilesModal from './PendingFilesModal'; export default function Files({ disableMediaPreview, exifEnabled, queryPage, compress }) { const [favoritePage, setFavoritePage] = useState(1); const [favoriteNumPages, setFavoriteNumPages] = useState(0); const favoritePages = usePaginatedFiles(favoritePage, 'media', true); + const [open, setOpen] = useState(false); + useEffect(() => { (async () => { const { count } = await useFetch('/api/user/paged?count=true&filter=media&favorite=true'); @@ -21,11 +24,19 @@ export default function Files({ disableMediaPreview, exifEnabled, queryPage, com return ( <> + setOpen(false)} /> + Files + + + setOpen(true)} variant='filled' color='primary'> + + + {favoritePages.isSuccess && favoritePages.data.length ? ( setTimeout(resolve, 100)); } - // if last chunk send notif that it will take a while - if (j === chunks.length - 1) { - updateNotification({ - id: 'upload-chunked', - title: 'Finalizing partial upload', - message: 'This may take a while...', - icon: , - color: 'yellow', - autoClose: false, - }); - } - const body = new FormData(); body.append('file', chunks[j].blob); @@ -136,25 +124,18 @@ export default function File({ chunks: chunks_config }) { if (j === chunks.length - 1) { updateNotification({ id: 'upload-chunked', - title: 'Upload Successful', - message: '', + title: 'Finalizing partial upload', + message: + 'The upload has been offloaded, and will complete in the background. You can see processing files in the files tab.', + icon: , color: 'green', - icon: , + autoClose: true, }); - showFilesModal(clipboard, modals, json.files); invalidateFiles(); setFiles([]); setProgress(100); setTimeout(() => setProgress(0), 1000); - - clipboard.copy(json.files[0]); - if (!navigator.clipboard) - showNotification({ - title: 'Unable to copy to clipboard', - message: 'Zipline is unable to copy to clipboard due to security reasons.', - color: 'red', - }); } ready = true; diff --git a/src/lib/config/Config.ts b/src/lib/config/Config.ts index 5e6b595..ac9bc6d 100644 --- a/src/lib/config/Config.ts +++ b/src/lib/config/Config.ts @@ -136,6 +136,7 @@ export interface ConfigOAuth { export interface ConfigChunks { max_size: number; chunks_size: number; + enabled: boolean; } export interface ConfigMfa { diff --git a/src/lib/config/readConfig.ts b/src/lib/config/readConfig.ts index 9656c2c..7a81d10 100644 --- a/src/lib/config/readConfig.ts +++ b/src/lib/config/readConfig.ts @@ -158,6 +158,7 @@ export default function readConfig() { map('CHUNKS_MAX_SIZE', 'human-to-byte', 'chunks.max_size'), map('CHUNKS_CHUNKS_SIZE', 'human-to-byte', 'chunks.chunks_size'), + map('CHUNKS_ENABLED', 'boolean', 'chunks.enabled'), map('MFA_TOTP_ISSUER', 'string', 'mfa.totp_issuer'), map('MFA_TOTP_ENABLED', 'boolean', 'mfa.totp_enabled'), diff --git a/src/lib/config/validateConfig.ts b/src/lib/config/validateConfig.ts index 89581c3..272d47e 100644 --- a/src/lib/config/validateConfig.ts +++ b/src/lib/config/validateConfig.ts @@ -201,10 +201,12 @@ const validator = s.object({ .object({ max_size: s.number.default(humanToBytes('90MB')), chunks_size: s.number.default(humanToBytes('20MB')), + enabled: s.boolean.default(true), }) .default({ max_size: humanToBytes('90MB'), chunks_size: humanToBytes('20MB'), + enabled: true, }), mfa: s .object({ diff --git a/src/pages/api/upload.ts b/src/pages/api/upload.ts index 622c872..b3124c7 100644 --- a/src/pages/api/upload.ts +++ b/src/pages/api/upload.ts @@ -1,5 +1,5 @@ import { InvisibleFile } from '@prisma/client'; -import { readdir, readFile, unlink, writeFile } from 'fs/promises'; +import { writeFile } from 'fs/promises'; import zconfig from 'lib/config'; import datasource from 'lib/datasource'; import { sendUpload } from 'lib/discord'; @@ -14,6 +14,7 @@ import { removeGPSData } from 'lib/utils/exif'; import multer from 'multer'; import { join } from 'path'; import sharp from 'sharp'; +import { Worker } from 'worker_threads'; const uploader = multer(); const logger = Logger.get('upload'); @@ -78,7 +79,7 @@ async function handler(req: NextApiReq, res: NextApiRes) { if (fileMaxViews < 0) return res.badRequest('invalid max views (max views < 0)'); // handle partial uploads before ratelimits - if (req.headers['content-range']) { + if (req.headers['content-range'] && zconfig.chunks.enabled) { // parses content-range header (bytes start-end/total) const [start, end, total] = req.headers['content-range'] .replace('bytes ', '') @@ -108,89 +109,29 @@ async function handler(req: NextApiReq, res: NextApiRes) { await writeFile(tempFile, req.files[0].buffer); if (lastchunk) { - const partials = await readdir(zconfig.core.temp_directory).then((files) => - files.filter((x) => x.startsWith(`zipline_partial_${identifier}`)) - ); - - const readChunks = partials.map((x) => { - const [, , , start, end] = x.split('_'); - return { start: Number(start), end: Number(end), filename: x }; - }); - - // combine chunks - const chunks = new Uint8Array(total); - - for (let i = 0; i !== readChunks.length; ++i) { - const chunkData = readChunks[i]; - - const buffer = await readFile(join(zconfig.core.temp_directory, chunkData.filename)); - await unlink(join(zconfig.core.temp_directory, readChunks[i].filename)); - - chunks.set(buffer, chunkData.start); - } - - const ext = filename.split('.').length === 1 ? '' : filename.split('.').pop(); - if (zconfig.uploader.disabled_extensions.includes(ext)) - return res.error('disabled extension recieved: ' + ext); - const fileName = await formatFileName(format, filename); - - let password = null; - if (req.headers.password) { - password = await hashPassword(req.headers.password as string); - } - - const compressionUsed = imageCompressionPercent && mimetype.startsWith('image/'); - let invis: InvisibleFile; - - const file = await prisma.file.create({ - data: { - name: `${fileName}${compressionUsed ? '.jpg' : `${ext ? '.' : ''}${ext}`}`, - mimetype, - userId: user.id, - embed: !!req.headers.embed, - password, - expiresAt: expiry, - maxViews: fileMaxViews, - originalName: req.headers['original-name'] ? filename ?? null : null, + new Worker('./dist/worker/upload.js', { + workerData: { + user, + file: { + filename, + mimetype, + identifier, + lastchunk, + totalBytes: total, + }, + response: { + expiresAt: expiry, + format, + imageCompressionPercent, + fileMaxViews, + }, + headers: req.headers, }, }); - if (req.headers.zws) invis = await createInvisImage(zconfig.uploader.length, file.id); - - await datasource.save(file.name, Buffer.from(chunks)); - - logger.info(`User ${user.username} (${user.id}) uploaded ${file.name} (${file.id}) (chunked)`); - let domain; - if (req.headers['override-domain']) { - domain = `${zconfig.core.return_https ? 'https' : 'http'}://${req.headers['override-domain']}`; - } else if (user.domains.length) { - domain = user.domains[Math.floor(Math.random() * user.domains.length)]; - } else { - domain = `${zconfig.core.return_https ? 'https' : 'http'}://${req.headers.host}`; - } - - const responseUrl = `${domain}${zconfig.uploader.route === '/' ? '/' : zconfig.uploader.route + '/'}${ - invis ? invis.invis : encodeURI(file.name) - }`; - - response.files.push(responseUrl); - - if (zconfig.discord?.upload) { - await sendUpload(user, file, `${domain}/r/${invis ? invis.invis : file.name}`, responseUrl); - } - - if (zconfig.exif.enabled && zconfig.exif.remove_gps && mimetype.startsWith('image/')) { - try { - await removeGPSData(file); - response.removed_gps = true; - } catch (e) { - logger.error(`Failed to remove GPS data from ${file.name} (${file.id}) - ${e.message}`); - - response.removed_gps = false; - } - } - - return res.json(response); + return res.json({ + pending: true, + }); } return res.json({ diff --git a/src/pages/api/user/pending.ts b/src/pages/api/user/pending.ts new file mode 100644 index 0000000..356da7f --- /dev/null +++ b/src/pages/api/user/pending.ts @@ -0,0 +1,40 @@ +import prisma from 'lib/prisma'; +import { NextApiReq, NextApiRes, UserExtended, withZipline } from 'middleware/withZipline'; + +async function handler(req: NextApiReq, res: NextApiRes, user: UserExtended) { + if (req.method === 'DELETE') { + const fileIds = req.body.id as number[]; + + const existingFiles = await prisma.incompleteFile.findMany({ + where: { + id: { + in: fileIds, + }, + userId: user.id, + }, + }); + + const incFiles = await prisma.incompleteFile.deleteMany({ + where: { + id: { + in: existingFiles.map((x) => x.id), + }, + }, + }); + + return res.json(incFiles); + } else { + const files = await prisma.incompleteFile.findMany({ + where: { + userId: user.id, + }, + }); + + return res.json(files); + } +} + +export default withZipline(handler, { + methods: ['GET', 'DELETE'], + user: true, +}); diff --git a/src/worker/upload.ts b/src/worker/upload.ts new file mode 100644 index 0000000..d6b26b1 --- /dev/null +++ b/src/worker/upload.ts @@ -0,0 +1,225 @@ +import { readdir, readFile, open, rm } from 'fs/promises'; +import type { NameFormat } from 'lib/format'; +import Logger from 'lib/logger'; +import type { UserExtended } from 'middleware/withZipline'; +import { isMainThread, workerData } from 'worker_threads'; + +import prisma from 'lib/prisma'; +import { join } from 'path'; +import { IncompleteFile, InvisibleFile } from '@prisma/client'; +import { removeGPSData } from 'lib/utils/exif'; +import { sendUpload } from 'lib/discord'; +import { createInvisImage, hashPassword } from 'lib/util'; +import formatFileName from 'lib/format'; + +export type UploadWorkerData = { + user: UserExtended; + file: { + filename: string; + mimetype: string; + identifier: string; + lastchunk: boolean; + totalBytes: number; + }; + response: { + expiresAt?: Date; + format: NameFormat; + imageCompressionPercent?: number; + fileMaxViews?: number; + }; + headers: Record; +}; + +const { user, file, response, headers } = workerData as UploadWorkerData; + +const logger = Logger.get('worker::upload').child(file?.identifier ?? 'unknown-ident'); + +if (isMainThread) { + logger.error('worker is not a thread'); + process.exit(1); +} + +if (!file.lastchunk) { + logger.error('lastchunk is false, worker should not have been started'); + process.exit(1); +} + +if (!config.chunks.enabled) { + logger.error('chunks are not enabled, worker should not have been started'); + process.exit(1); +} + +start(); + +async function start() { + logger.debug('starting worker'); + + const partials = await readdir(config.core.temp_directory).then((files) => + files.filter((x) => x.startsWith(`zipline_partial_${file.identifier}`)) + ); + + const readChunks = partials.map((x) => { + const [, , , start, end] = x.split('_'); + return { start: Number(start), end: Number(end), filename: x }; + }); + + const incompleteFile = await prisma.incompleteFile.create({ + data: { + data: { + file, + }, + chunks: readChunks.length, + chunksComplete: 0, + status: 'PENDING', + userId: user.id, + }, + }); + + const compressionUsed = response.imageCompressionPercent && file.mimetype.startsWith('image/'); + const ext = file.filename.split('.').length === 1 ? '' : file.filename.split('.').pop(); + const fileName = await formatFileName(response.format, file.filename); + + let fd; + + if (config.datasource.type === 'local') { + fd = await open( + join( + process.cwd(), + config.datasource.local.directory, + `${fileName}${compressionUsed ? '.jpg' : `${ext ? '.' : ''}${ext}`}` + ), + 'w' + ); + } else { + fd = new Uint8Array(file.totalBytes); + } + + for (let i = 0; i !== readChunks.length; ++i) { + const chunk = readChunks[i]; + + const buffer = await readFile(join(config.core.temp_directory, chunk.filename)); + + if (config.datasource.type === 'local') { + const { bytesWritten } = await fd.write(buffer, 0, buffer.length, chunk.start); + logger.child('fd').debug(`wrote ${bytesWritten} bytes to file`); + } else { + fd.set(buffer, chunk.start); + logger.child('bytes').debug(`wrote ${buffer.length} bytes to array`); + } + + await rm(join(config.core.temp_directory, chunk.filename)); + + await prisma.incompleteFile.update({ + where: { + id: incompleteFile.id, + }, + data: { + chunksComplete: { + increment: 1, + }, + status: 'PROCESSING', + }, + }); + } + + if (config.datasource.type === 'local') { + await fd.close(); + } else { + logger.debug('writing file to datasource'); + await datasource.save( + `${fileName}${compressionUsed ? '.jpg' : `${ext ? '.' : ''}${ext}`}`, + Buffer.from(fd as Uint8Array) + ); + } + + const final = await prisma.incompleteFile.update({ + where: { + id: incompleteFile.id, + }, + data: { + status: 'COMPLETE', + }, + }); + + logger.debug('done writing file'); + + await runFileComplete(fileName, ext, compressionUsed, final); + + logger.debug('done running worker'); + process.exit(0); +} + +async function setResponse(incompleteFile: IncompleteFile, code: number, message: string) { + incompleteFile.data['code'] = code; + incompleteFile.data['message'] = message; + + return prisma.incompleteFile.update({ + where: { + id: incompleteFile.id, + }, + data: { + data: incompleteFile.data, + }, + }); +} + +async function runFileComplete( + fileName: string, + ext: string, + compressionUsed: boolean, + incompleteFile: IncompleteFile +) { + if (config.uploader.disabled_extensions.includes(ext)) + return setResponse(incompleteFile, 403, 'disabled extension'); + + let password = null; + if (headers.password) { + password = await hashPassword(headers.password as string); + } + + let invis: InvisibleFile; + + const fFile = await prisma.file.create({ + data: { + name: `${fileName}${compressionUsed ? '.jpg' : `${ext ? '.' : ''}${ext}`}`, + mimetype: file.mimetype, + userId: user.id, + embed: !!headers.embed, + password, + expiresAt: response.expiresAt, + maxViews: response.fileMaxViews, + originalName: headers['original-name'] ? file.filename ?? null : null, + size: file.totalBytes, + }, + }); + + if (headers.zws) invis = await createInvisImage(config.uploader.length, fFile.id); + + logger.info(`User ${user.username} (${user.id}) uploaded ${fFile.name} (${fFile.id}) (chunked)`); + let domain; + if (headers['override-domain']) { + domain = `${config.core.return_https ? 'https' : 'http'}://${headers['override-domain']}`; + } else if (user.domains.length) { + domain = user.domains[Math.floor(Math.random() * user.domains.length)]; + } else { + domain = `${config.core.return_https ? 'https' : 'http'}://${headers.host}`; + } + + const responseUrl = `${domain}${config.uploader.route === '/' ? '/' : config.uploader.route + '/'}${ + invis ? invis.invis : encodeURI(fFile.name) + }`; + + if (config.discord?.upload) { + await sendUpload(user, fFile, `${domain}/r/${invis ? invis.invis : fFile.name}`, responseUrl); + } + + if (config.exif.enabled && config.exif.remove_gps && fFile.mimetype.startsWith('image/')) { + try { + await removeGPSData(fFile); + } catch (e) { + logger.error(`Failed to remove GPS data from ${fFile.name} (${fFile.id}) - ${e.message}`); + } + } + + await setResponse(incompleteFile, 200, responseUrl); +} diff --git a/tsup.config.ts b/tsup.config.ts index 9d8b661..a38ced3 100644 --- a/tsup.config.ts +++ b/tsup.config.ts @@ -13,6 +13,12 @@ export default defineConfig([ entryPoints: ['src/server/index.ts'], ...opts, }, + // workers + { + entryPoints: ['src/worker/upload.ts'], + outDir: 'dist/worker', + ...opts, + }, // scripts { entryPoints: ['src/scripts/import-dir.ts'],