From e707685da39bf1305984bdcebd9bb950b801a3b3 Mon Sep 17 00:00:00 2001 From: diced Date: Fri, 31 Mar 2023 21:32:38 -0700 Subject: [PATCH] feat: offloaded chunked uploads --- .../migration.sql | 18 ++ prisma/schema.prisma | 55 +++-- .../pages/Files/PendingFilesModal.tsx | 118 ++++++++++ src/components/pages/Files/index.tsx | 15 +- src/components/pages/Upload/File.tsx | 29 +-- src/pages/api/upload.ts | 103 ++------ src/pages/api/user/pending.ts | 40 ++++ src/worker/upload.ts | 221 ++++++++++++++++++ tsup.config.ts | 6 + 9 files changed, 481 insertions(+), 124 deletions(-) create mode 100644 prisma/migrations/20230328005320_incomplete_file/migration.sql create mode 100644 src/components/pages/Files/PendingFilesModal.tsx create mode 100644 src/pages/api/user/pending.ts create mode 100644 src/worker/upload.ts diff --git a/prisma/migrations/20230328005320_incomplete_file/migration.sql b/prisma/migrations/20230328005320_incomplete_file/migration.sql new file mode 100644 index 00000000..0d8d4a8e --- /dev/null +++ b/prisma/migrations/20230328005320_incomplete_file/migration.sql @@ -0,0 +1,18 @@ +-- CreateEnum +CREATE TYPE "ProcessingStatus" AS ENUM ('PENDING', 'PROCESSING', 'COMPLETE'); + +-- CreateTable +CREATE TABLE "IncompleteFile" ( + "id" SERIAL NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "status" "ProcessingStatus" NOT NULL, + "chunks" INTEGER NOT NULL, + "chunksComplete" INTEGER NOT NULL, + "userId" INTEGER NOT NULL, + "data" JSONB NOT NULL, + + CONSTRAINT "IncompleteFile_pkey" PRIMARY KEY ("id") +); + +-- AddForeignKey +ALTER TABLE "IncompleteFile" ADD CONSTRAINT "IncompleteFile_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index ffb215e8..7dad66b3 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -8,23 +8,24 @@ generator client { } model User { - id Int @id @default(autoincrement()) - username String - password String? - avatar String? - token String - administrator Boolean @default(false) - superAdmin Boolean @default(false) - systemTheme String @default("system") - embed Json @default("{}") - ratelimit DateTime? - totpSecret String? - domains String[] - oauth OAuth[] - files File[] - urls Url[] - Invite Invite[] - Folder Folder[] + id Int @id @default(autoincrement()) + username String + password String? + avatar String? + token String + administrator Boolean @default(false) + superAdmin Boolean @default(false) + systemTheme String @default("system") + embed Json @default("{}") + ratelimit DateTime? + totpSecret String? + domains String[] + oauth OAuth[] + files File[] + urls Url[] + Invite Invite[] + Folder Folder[] + IncompleteFile IncompleteFile[] } model Folder { @@ -126,3 +127,23 @@ enum OauthProviders { GITHUB GOOGLE } + +model IncompleteFile { + id Int @id @default(autoincrement()) + createdAt DateTime @default(now()) + + status ProcessingStatus + chunks Int + chunksComplete Int + + user User @relation(fields: [userId], references: [id], onDelete: Cascade) + userId Int + + data Json +} + +enum ProcessingStatus { + PENDING + PROCESSING + COMPLETE +} diff --git a/src/components/pages/Files/PendingFilesModal.tsx b/src/components/pages/Files/PendingFilesModal.tsx new file mode 100644 index 00000000..d3f6b605 --- /dev/null +++ b/src/components/pages/Files/PendingFilesModal.tsx @@ -0,0 +1,118 @@ +import { Button, Modal, Title, Tooltip } from '@mantine/core'; +import { IconTrash } from '@tabler/icons-react'; +import AnchorNext from 'components/AnchorNext'; +import MutedText from 'components/MutedText'; +import useFetch from 'hooks/useFetch'; +import { DataTable } from 'mantine-datatable'; +import { useEffect, useState } from 'react'; + +export type PendingFiles = { + id: number; + createdAt: string; + status: string; + chunks: number; + chunksComplete: number; + userId: number; + data: { + file: { + filename: string; + mimetype: string; + lastchunk: boolean; + identifier: string; + totalBytes: number; + }; + code?: number; + message?: string; + }; +}; + +export default function PendingFilesModal({ open, onClose }) { + const [incFiles, setIncFiles] = useState([]); + const [loading, setLoading] = useState(true); + + const [selectedFiles, setSelectedFiles] = useState([]); + + async function updateIncFiles() { + setLoading(true); + + const files = await useFetch('/api/user/pending'); + setIncFiles(files); + + setLoading(false); + } + + async function deleteIncFiles() { + await useFetch('/api/user/pending', 'DELETE', { + id: selectedFiles.map((file) => file.id), + }); + updateIncFiles(); + setSelectedFiles([]); + } + + useEffect(() => { + updateIncFiles(); + }, []); + + useEffect(() => { + const interval = setInterval(() => { + if (open) updateIncFiles(); + }, 5000); + + return () => clearInterval(interval); + }, [open]); + + return ( + Pending Files} size='auto' opened={open} onClose={onClose}> + Refreshing every 5 seconds... + new Date(file.createdAt).toLocaleString() }, + { accessor: 'status', render: (file) => file.status.toLowerCase() }, + { + accessor: 'progress', + title: 'Progress', + render: (file) => `${file.chunksComplete}/${file.chunks} chunks`, + }, + { + accessor: 'message', + render: (file) => + file.data.code === 200 ? ( + + view file + + ) : ( + file.data.message + ), + }, + ]} + fetching={loading} + loaderBackgroundBlur={5} + loaderVariant='dots' + onSelectedRecordsChange={setSelectedFiles} + selectedRecords={selectedFiles} + /> + + {selectedFiles.length ? ( + + + + ) : null} + + ); +} diff --git a/src/components/pages/Files/index.tsx b/src/components/pages/Files/index.tsx index aae1cc61..9a03bbe3 100644 --- a/src/components/pages/Files/index.tsx +++ b/src/components/pages/Files/index.tsx @@ -1,17 +1,20 @@ -import { Accordion, ActionIcon, Box, Group, Pagination, SimpleGrid, Title } from '@mantine/core'; -import { IconFileUpload } from '@tabler/icons-react'; +import { Accordion, ActionIcon, Box, Group, Pagination, SimpleGrid, Title, Tooltip } from '@mantine/core'; +import { IconFileUpload, IconPhotoUp } from '@tabler/icons-react'; import File from 'components/File'; import useFetch from 'hooks/useFetch'; import { usePaginatedFiles } from 'lib/queries/files'; import Link from 'next/link'; import { useEffect, useState } from 'react'; import FilePagation from './FilePagation'; +import PendingFilesModal from './PendingFilesModal'; export default function Files({ disableMediaPreview, exifEnabled, queryPage, compress }) { const [favoritePage, setFavoritePage] = useState(1); const [favoriteNumPages, setFavoriteNumPages] = useState(0); const favoritePages = usePaginatedFiles(favoritePage, 'media', true); + const [open, setOpen] = useState(false); + useEffect(() => { (async () => { const { count } = await useFetch('/api/user/paged?count=true&filter=media&favorite=true'); @@ -21,11 +24,19 @@ export default function Files({ disableMediaPreview, exifEnabled, queryPage, com return ( <> + setOpen(false)} /> + Files + + + setOpen(true)} variant='filled' color='primary'> + + + {favoritePages.isSuccess && favoritePages.data.length ? ( setTimeout(resolve, 100)); } - // if last chunk send notif that it will take a while - if (j === chunks.length - 1) { - updateNotification({ - id: 'upload-chunked', - title: 'Finalizing partial upload', - message: 'This may take a while...', - icon: , - color: 'yellow', - autoClose: false, - }); - } - const body = new FormData(); body.append('file', chunks[j].blob); @@ -109,25 +97,18 @@ export default function File({ chunks: chunks_config }) { if (j === chunks.length - 1) { updateNotification({ id: 'upload-chunked', - title: 'Upload Successful', - message: '', + title: 'Finalizing partial upload', + message: + 'The upload has been offloaded, and will complete in the background. You can see processing files in the files tab.', + icon: , color: 'green', - icon: , + autoClose: true, }); - showFilesModal(clipboard, modals, json.files); invalidateFiles(); setFiles([]); setProgress(100); setTimeout(() => setProgress(0), 1000); - - clipboard.copy(json.files[0]); - if (!navigator.clipboard) - showNotification({ - title: 'Unable to copy to clipboard', - message: 'Zipline is unable to copy to clipboard due to security reasons.', - color: 'red', - }); } ready = true; diff --git a/src/pages/api/upload.ts b/src/pages/api/upload.ts index e6b5bbeb..9df3968d 100644 --- a/src/pages/api/upload.ts +++ b/src/pages/api/upload.ts @@ -1,5 +1,5 @@ import { InvisibleFile } from '@prisma/client'; -import { readdir, readFile, unlink, writeFile } from 'fs/promises'; +import { writeFile } from 'fs/promises'; import zconfig from 'lib/config'; import datasource from 'lib/datasource'; import { sendUpload } from 'lib/discord'; @@ -15,6 +15,7 @@ import multer from 'multer'; import { tmpdir } from 'os'; import { join } from 'path'; import sharp from 'sharp'; +import { Worker } from 'worker_threads'; const uploader = multer(); const logger = Logger.get('upload'); @@ -109,89 +110,29 @@ async function handler(req: NextApiReq, res: NextApiRes) { await writeFile(tempFile, req.files[0].buffer); if (lastchunk) { - const partials = await readdir(tmpdir()).then((files) => - files.filter((x) => x.startsWith(`zipline_partial_${identifier}`)) - ); - - const readChunks = partials.map((x) => { - const [, , , start, end] = x.split('_'); - return { start: Number(start), end: Number(end), filename: x }; - }); - - // combine chunks - const chunks = new Uint8Array(total); - - for (let i = 0; i !== readChunks.length; ++i) { - const chunkData = readChunks[i]; - - const buffer = await readFile(join(tmpdir(), chunkData.filename)); - await unlink(join(tmpdir(), readChunks[i].filename)); - - chunks.set(buffer, chunkData.start); - } - - const ext = filename.split('.').length === 1 ? '' : filename.split('.').pop(); - if (zconfig.uploader.disabled_extensions.includes(ext)) - return res.error('disabled extension recieved: ' + ext); - const fileName = await formatFileName(format, filename); - - let password = null; - if (req.headers.password) { - password = await hashPassword(req.headers.password as string); - } - - const compressionUsed = imageCompressionPercent && mimetype.startsWith('image/'); - let invis: InvisibleFile; - - const file = await prisma.file.create({ - data: { - name: `${fileName}${compressionUsed ? '.jpg' : `${ext ? '.' : ''}${ext}`}`, - mimetype, - userId: user.id, - embed: !!req.headers.embed, - password, - expiresAt: expiry, - maxViews: fileMaxViews, - originalName: req.headers['original-name'] ? filename ?? null : null, + new Worker('./dist/worker/upload.js', { + workerData: { + user, + file: { + filename, + mimetype, + identifier, + lastchunk, + totalBytes: total, + }, + response: { + expiresAt: expiry, + format, + imageCompressionPercent, + fileMaxViews, + }, + headers: req.headers, }, }); - if (req.headers.zws) invis = await createInvisImage(zconfig.uploader.length, file.id); - - await datasource.save(file.name, Buffer.from(chunks)); - - logger.info(`User ${user.username} (${user.id}) uploaded ${file.name} (${file.id}) (chunked)`); - let domain; - if (req.headers['override-domain']) { - domain = `${zconfig.core.return_https ? 'https' : 'http'}://${req.headers['override-domain']}`; - } else if (user.domains.length) { - domain = user.domains[Math.floor(Math.random() * user.domains.length)]; - } else { - domain = `${zconfig.core.return_https ? 'https' : 'http'}://${req.headers.host}`; - } - - const responseUrl = `${domain}${zconfig.uploader.route === '/' ? '/' : zconfig.uploader.route + '/'}${ - invis ? invis.invis : encodeURI(file.name) - }`; - - response.files.push(responseUrl); - - if (zconfig.discord?.upload) { - await sendUpload(user, file, `${domain}/r/${invis ? invis.invis : file.name}`, responseUrl); - } - - if (zconfig.exif.enabled && zconfig.exif.remove_gps && mimetype.startsWith('image/')) { - try { - await removeGPSData(file); - response.removed_gps = true; - } catch (e) { - logger.error(`Failed to remove GPS data from ${file.name} (${file.id}) - ${e.message}`); - - response.removed_gps = false; - } - } - - return res.json(response); + return res.json({ + pending: true, + }); } return res.json({ diff --git a/src/pages/api/user/pending.ts b/src/pages/api/user/pending.ts new file mode 100644 index 00000000..356da7f0 --- /dev/null +++ b/src/pages/api/user/pending.ts @@ -0,0 +1,40 @@ +import prisma from 'lib/prisma'; +import { NextApiReq, NextApiRes, UserExtended, withZipline } from 'middleware/withZipline'; + +async function handler(req: NextApiReq, res: NextApiRes, user: UserExtended) { + if (req.method === 'DELETE') { + const fileIds = req.body.id as number[]; + + const existingFiles = await prisma.incompleteFile.findMany({ + where: { + id: { + in: fileIds, + }, + userId: user.id, + }, + }); + + const incFiles = await prisma.incompleteFile.deleteMany({ + where: { + id: { + in: existingFiles.map((x) => x.id), + }, + }, + }); + + return res.json(incFiles); + } else { + const files = await prisma.incompleteFile.findMany({ + where: { + userId: user.id, + }, + }); + + return res.json(files); + } +} + +export default withZipline(handler, { + methods: ['GET', 'DELETE'], + user: true, +}); diff --git a/src/worker/upload.ts b/src/worker/upload.ts new file mode 100644 index 00000000..a740ff63 --- /dev/null +++ b/src/worker/upload.ts @@ -0,0 +1,221 @@ +import { readdir, readFile, open, rm } from 'fs/promises'; +import type { NameFormat } from 'lib/format'; +import Logger from 'lib/logger'; +import type { UserExtended } from 'middleware/withZipline'; +import { tmpdir } from 'os'; +import { isMainThread, workerData } from 'worker_threads'; + +import prisma from 'lib/prisma'; +import { join } from 'path'; +import { IncompleteFile, InvisibleFile } from '@prisma/client'; +import { removeGPSData } from 'lib/utils/exif'; +import { sendUpload } from 'lib/discord'; +import { createInvisImage, hashPassword } from 'lib/util'; +import formatFileName from 'lib/format'; + +export type UploadWorkerData = { + user: UserExtended; + file: { + filename: string; + mimetype: string; + identifier: string; + lastchunk: boolean; + totalBytes: number; + }; + response: { + expiresAt?: Date; + format: NameFormat; + imageCompressionPercent?: number; + fileMaxViews?: number; + }; + headers: Record; +}; + +const { user, file, response, headers } = workerData as UploadWorkerData; + +const logger = Logger.get('worker::upload').child(file?.identifier ?? 'unknown-ident'); + +if (isMainThread) { + logger.error('worker is not a thread'); + process.exit(1); +} + +if (!file.lastchunk) { + logger.error('lastchunk is false, worker should not have been started'); + process.exit(1); +} + +start(); + +async function start() { + logger.debug('starting worker'); + + const partials = await readdir(tmpdir()).then((files) => + files.filter((x) => x.startsWith(`zipline_partial_${file.identifier}`)) + ); + + const readChunks = partials.map((x) => { + const [, , , start, end] = x.split('_'); + return { start: Number(start), end: Number(end), filename: x }; + }); + + const incompleteFile = await prisma.incompleteFile.create({ + data: { + data: { + file, + }, + chunks: readChunks.length, + chunksComplete: 0, + status: 'PENDING', + userId: user.id, + }, + }); + + const compressionUsed = response.imageCompressionPercent && file.mimetype.startsWith('image/'); + const ext = file.filename.split('.').length === 1 ? '' : file.filename.split('.').pop(); + const fileName = await formatFileName(response.format, file.filename); + + let fd; + + if (config.datasource.type === 'local') { + fd = await open( + join( + process.cwd(), + config.datasource.local.directory, + `${fileName}${compressionUsed ? '.jpg' : `${ext ? '.' : ''}${ext}`}` + ), + 'w' + ); + } else { + fd = new Uint8Array(file.totalBytes); + } + + for (let i = 0; i !== readChunks.length; ++i) { + const chunk = readChunks[i]; + + const buffer = await readFile(join(tmpdir(), chunk.filename)); + + if (config.datasource.type === 'local') { + const { bytesWritten } = await fd.write(buffer, 0, buffer.length, chunk.start); + logger.child('fd').debug(`wrote ${bytesWritten} bytes to file`); + } else { + fd.set(buffer, chunk.start); + logger.child('bytes').debug(`wrote ${buffer.length} bytes to array`); + } + + await rm(join(tmpdir(), chunk.filename)); + + await prisma.incompleteFile.update({ + where: { + id: incompleteFile.id, + }, + data: { + chunksComplete: { + increment: 1, + }, + status: 'PROCESSING', + }, + }); + } + + if (config.datasource.type === 'local') { + await fd.close(); + } else { + logger.debug('writing file to datasource'); + await datasource.save( + `${fileName}${compressionUsed ? '.jpg' : `${ext ? '.' : ''}${ext}`}`, + Buffer.from(fd as Uint8Array) + ); + } + + const final = await prisma.incompleteFile.update({ + where: { + id: incompleteFile.id, + }, + data: { + status: 'COMPLETE', + }, + }); + + logger.debug('done writing file'); + + await runFileComplete(fileName, ext, compressionUsed, final); + + logger.debug('done running worker'); + process.exit(0); +} + +async function setResponse(incompleteFile: IncompleteFile, code: number, message: string) { + incompleteFile.data['code'] = code; + incompleteFile.data['message'] = message; + + return prisma.incompleteFile.update({ + where: { + id: incompleteFile.id, + }, + data: { + data: incompleteFile.data, + }, + }); +} + +async function runFileComplete( + fileName: string, + ext: string, + compressionUsed: boolean, + incompleteFile: IncompleteFile +) { + if (config.uploader.disabled_extensions.includes(ext)) + return setResponse(incompleteFile, 403, 'disabled extension'); + + let password = null; + if (headers.password) { + password = await hashPassword(headers.password as string); + } + + let invis: InvisibleFile; + + const fFile = await prisma.file.create({ + data: { + name: `${fileName}${compressionUsed ? '.jpg' : `${ext ? '.' : ''}${ext}`}`, + mimetype: file.mimetype, + userId: user.id, + embed: !!headers.embed, + password, + expiresAt: response.expiresAt, + maxViews: response.fileMaxViews, + originalName: headers['original-name'] ? file.filename ?? null : null, + size: file.totalBytes, + }, + }); + + if (headers.zws) invis = await createInvisImage(config.uploader.length, fFile.id); + + logger.info(`User ${user.username} (${user.id}) uploaded ${fFile.name} (${fFile.id}) (chunked)`); + let domain; + if (headers['override-domain']) { + domain = `${config.core.return_https ? 'https' : 'http'}://${headers['override-domain']}`; + } else if (user.domains.length) { + domain = user.domains[Math.floor(Math.random() * user.domains.length)]; + } else { + domain = `${config.core.return_https ? 'https' : 'http'}://${headers.host}`; + } + + const responseUrl = `${domain}${config.uploader.route === '/' ? '/' : config.uploader.route + '/'}${ + invis ? invis.invis : encodeURI(fFile.name) + }`; + + if (config.discord?.upload) { + await sendUpload(user, fFile, `${domain}/r/${invis ? invis.invis : fFile.name}`, responseUrl); + } + + if (config.exif.enabled && config.exif.remove_gps && fFile.mimetype.startsWith('image/')) { + try { + await removeGPSData(fFile); + } catch (e) { + logger.error(`Failed to remove GPS data from ${fFile.name} (${fFile.id}) - ${e.message}`); + } + } + + await setResponse(incompleteFile, 200, responseUrl); +} diff --git a/tsup.config.ts b/tsup.config.ts index 9d8b6612..a38ced38 100644 --- a/tsup.config.ts +++ b/tsup.config.ts @@ -13,6 +13,12 @@ export default defineConfig([ entryPoints: ['src/server/index.ts'], ...opts, }, + // workers + { + entryPoints: ['src/worker/upload.ts'], + outDir: 'dist/worker', + ...opts, + }, // scripts { entryPoints: ['src/scripts/import-dir.ts'],