feat: offloaded chunked uploads (#356)

* feat: offloaded chunked uploads

* fix: use temp_directory instead of tmpdir()

* feat: CHUNKS_ENABLED config
This commit is contained in:
dicedtomato 2023-04-03 22:42:27 -07:00 committed by GitHub
parent bf40fa9cd2
commit eedeb89c7d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 490 additions and 125 deletions

View file

@ -0,0 +1,18 @@
-- CreateEnum
CREATE TYPE "ProcessingStatus" AS ENUM ('PENDING', 'PROCESSING', 'COMPLETE');
-- CreateTable
CREATE TABLE "IncompleteFile" (
"id" SERIAL NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"status" "ProcessingStatus" NOT NULL,
"chunks" INTEGER NOT NULL,
"chunksComplete" INTEGER NOT NULL,
"userId" INTEGER NOT NULL,
"data" JSONB NOT NULL,
CONSTRAINT "IncompleteFile_pkey" PRIMARY KEY ("id")
);
-- AddForeignKey
ALTER TABLE "IncompleteFile" ADD CONSTRAINT "IncompleteFile_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE;

View file

@ -8,23 +8,24 @@ generator client {
} }
model User { model User {
id Int @id @default(autoincrement()) id Int @id @default(autoincrement())
username String username String
password String? password String?
avatar String? avatar String?
token String token String
administrator Boolean @default(false) administrator Boolean @default(false)
superAdmin Boolean @default(false) superAdmin Boolean @default(false)
systemTheme String @default("system") systemTheme String @default("system")
embed Json @default("{}") embed Json @default("{}")
ratelimit DateTime? ratelimit DateTime?
totpSecret String? totpSecret String?
domains String[] domains String[]
oauth OAuth[] oauth OAuth[]
files File[] files File[]
urls Url[] urls Url[]
Invite Invite[] Invite Invite[]
Folder Folder[] Folder Folder[]
IncompleteFile IncompleteFile[]
} }
model Folder { model Folder {
@ -126,3 +127,23 @@ enum OauthProviders {
GITHUB GITHUB
GOOGLE GOOGLE
} }
model IncompleteFile {
id Int @id @default(autoincrement())
createdAt DateTime @default(now())
status ProcessingStatus
chunks Int
chunksComplete Int
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
userId Int
data Json
}
enum ProcessingStatus {
PENDING
PROCESSING
COMPLETE
}

View file

@ -0,0 +1,118 @@
import { Button, Modal, Title, Tooltip } from '@mantine/core';
import { IconTrash } from '@tabler/icons-react';
import AnchorNext from 'components/AnchorNext';
import MutedText from 'components/MutedText';
import useFetch from 'hooks/useFetch';
import { DataTable } from 'mantine-datatable';
import { useEffect, useState } from 'react';
export type PendingFiles = {
id: number;
createdAt: string;
status: string;
chunks: number;
chunksComplete: number;
userId: number;
data: {
file: {
filename: string;
mimetype: string;
lastchunk: boolean;
identifier: string;
totalBytes: number;
};
code?: number;
message?: string;
};
};
export default function PendingFilesModal({ open, onClose }) {
const [incFiles, setIncFiles] = useState<PendingFiles[]>([]);
const [loading, setLoading] = useState(true);
const [selectedFiles, setSelectedFiles] = useState<PendingFiles[]>([]);
async function updateIncFiles() {
setLoading(true);
const files = await useFetch('/api/user/pending');
setIncFiles(files);
setLoading(false);
}
async function deleteIncFiles() {
await useFetch('/api/user/pending', 'DELETE', {
id: selectedFiles.map((file) => file.id),
});
updateIncFiles();
setSelectedFiles([]);
}
useEffect(() => {
updateIncFiles();
}, []);
useEffect(() => {
const interval = setInterval(() => {
if (open) updateIncFiles();
}, 5000);
return () => clearInterval(interval);
}, [open]);
return (
<Modal title={<Title>Pending Files</Title>} size='auto' opened={open} onClose={onClose}>
<MutedText size='xs'>Refreshing every 5 seconds...</MutedText>
<DataTable
withBorder
borderRadius='md'
highlightOnHover
verticalSpacing='sm'
minHeight={200}
records={incFiles ?? []}
columns={[
{ accessor: 'id', title: 'ID' },
{ accessor: 'createdAt', render: (file) => new Date(file.createdAt).toLocaleString() },
{ accessor: 'status', render: (file) => file.status.toLowerCase() },
{
accessor: 'progress',
title: 'Progress',
render: (file) => `${file.chunksComplete}/${file.chunks} chunks`,
},
{
accessor: 'message',
render: (file) =>
file.data.code === 200 ? (
<AnchorNext href={file.data.message} target='_blank'>
view file
</AnchorNext>
) : (
file.data.message
),
},
]}
fetching={loading}
loaderBackgroundBlur={5}
loaderVariant='dots'
onSelectedRecordsChange={setSelectedFiles}
selectedRecords={selectedFiles}
/>
{selectedFiles.length ? (
<Tooltip label='Clearing pending files will still leave the final file on the server.'>
<Button
variant='filled'
my='md'
color='red'
onClick={deleteIncFiles}
leftIcon={<IconTrash size='1rem' />}
fullWidth
>
Clear {selectedFiles.length} pending file{selectedFiles.length > 1 ? 's' : ''}
</Button>
</Tooltip>
) : null}
</Modal>
);
}

View file

@ -1,17 +1,20 @@
import { Accordion, ActionIcon, Box, Group, Pagination, SimpleGrid, Title } from '@mantine/core'; import { Accordion, ActionIcon, Box, Group, Pagination, SimpleGrid, Title, Tooltip } from '@mantine/core';
import { IconFileUpload } from '@tabler/icons-react'; import { IconFileUpload, IconPhotoUp } from '@tabler/icons-react';
import File from 'components/File'; import File from 'components/File';
import useFetch from 'hooks/useFetch'; import useFetch from 'hooks/useFetch';
import { usePaginatedFiles } from 'lib/queries/files'; import { usePaginatedFiles } from 'lib/queries/files';
import Link from 'next/link'; import Link from 'next/link';
import { useEffect, useState } from 'react'; import { useEffect, useState } from 'react';
import FilePagation from './FilePagation'; import FilePagation from './FilePagation';
import PendingFilesModal from './PendingFilesModal';
export default function Files({ disableMediaPreview, exifEnabled, queryPage, compress }) { export default function Files({ disableMediaPreview, exifEnabled, queryPage, compress }) {
const [favoritePage, setFavoritePage] = useState(1); const [favoritePage, setFavoritePage] = useState(1);
const [favoriteNumPages, setFavoriteNumPages] = useState(0); const [favoriteNumPages, setFavoriteNumPages] = useState(0);
const favoritePages = usePaginatedFiles(favoritePage, 'media', true); const favoritePages = usePaginatedFiles(favoritePage, 'media', true);
const [open, setOpen] = useState(false);
useEffect(() => { useEffect(() => {
(async () => { (async () => {
const { count } = await useFetch('/api/user/paged?count=true&filter=media&favorite=true'); const { count } = await useFetch('/api/user/paged?count=true&filter=media&favorite=true');
@ -21,11 +24,19 @@ export default function Files({ disableMediaPreview, exifEnabled, queryPage, com
return ( return (
<> <>
<PendingFilesModal open={open} onClose={() => setOpen(false)} />
<Group mb='md'> <Group mb='md'>
<Title>Files</Title> <Title>Files</Title>
<ActionIcon component={Link} href='/dashboard/upload/file' variant='filled' color='primary'> <ActionIcon component={Link} href='/dashboard/upload/file' variant='filled' color='primary'>
<IconFileUpload size='1rem' /> <IconFileUpload size='1rem' />
</ActionIcon> </ActionIcon>
<Tooltip label='View pending uploads'>
<ActionIcon onClick={() => setOpen(true)} variant='filled' color='primary'>
<IconPhotoUp size='1rem' />
</ActionIcon>
</Tooltip>
</Group> </Group>
{favoritePages.isSuccess && favoritePages.data.length ? ( {favoritePages.isSuccess && favoritePages.data.length ? (
<Accordion <Accordion

View file

@ -98,18 +98,6 @@ export default function File({ chunks: chunks_config }) {
await new Promise((resolve) => setTimeout(resolve, 100)); await new Promise((resolve) => setTimeout(resolve, 100));
} }
// if last chunk send notif that it will take a while
if (j === chunks.length - 1) {
updateNotification({
id: 'upload-chunked',
title: 'Finalizing partial upload',
message: 'This may take a while...',
icon: <IconFileTime size='1rem' />,
color: 'yellow',
autoClose: false,
});
}
const body = new FormData(); const body = new FormData();
body.append('file', chunks[j].blob); body.append('file', chunks[j].blob);
@ -136,25 +124,18 @@ export default function File({ chunks: chunks_config }) {
if (j === chunks.length - 1) { if (j === chunks.length - 1) {
updateNotification({ updateNotification({
id: 'upload-chunked', id: 'upload-chunked',
title: 'Upload Successful', title: 'Finalizing partial upload',
message: '', message:
'The upload has been offloaded, and will complete in the background. You can see processing files in the files tab.',
icon: <IconFileTime size='1rem' />,
color: 'green', color: 'green',
icon: <IconFileUpload size='1rem' />, autoClose: true,
}); });
showFilesModal(clipboard, modals, json.files);
invalidateFiles(); invalidateFiles();
setFiles([]); setFiles([]);
setProgress(100); setProgress(100);
setTimeout(() => setProgress(0), 1000); setTimeout(() => setProgress(0), 1000);
clipboard.copy(json.files[0]);
if (!navigator.clipboard)
showNotification({
title: 'Unable to copy to clipboard',
message: 'Zipline is unable to copy to clipboard due to security reasons.',
color: 'red',
});
} }
ready = true; ready = true;

View file

@ -136,6 +136,7 @@ export interface ConfigOAuth {
export interface ConfigChunks { export interface ConfigChunks {
max_size: number; max_size: number;
chunks_size: number; chunks_size: number;
enabled: boolean;
} }
export interface ConfigMfa { export interface ConfigMfa {

View file

@ -158,6 +158,7 @@ export default function readConfig() {
map('CHUNKS_MAX_SIZE', 'human-to-byte', 'chunks.max_size'), map('CHUNKS_MAX_SIZE', 'human-to-byte', 'chunks.max_size'),
map('CHUNKS_CHUNKS_SIZE', 'human-to-byte', 'chunks.chunks_size'), map('CHUNKS_CHUNKS_SIZE', 'human-to-byte', 'chunks.chunks_size'),
map('CHUNKS_ENABLED', 'boolean', 'chunks.enabled'),
map('MFA_TOTP_ISSUER', 'string', 'mfa.totp_issuer'), map('MFA_TOTP_ISSUER', 'string', 'mfa.totp_issuer'),
map('MFA_TOTP_ENABLED', 'boolean', 'mfa.totp_enabled'), map('MFA_TOTP_ENABLED', 'boolean', 'mfa.totp_enabled'),

View file

@ -201,10 +201,12 @@ const validator = s.object({
.object({ .object({
max_size: s.number.default(humanToBytes('90MB')), max_size: s.number.default(humanToBytes('90MB')),
chunks_size: s.number.default(humanToBytes('20MB')), chunks_size: s.number.default(humanToBytes('20MB')),
enabled: s.boolean.default(true),
}) })
.default({ .default({
max_size: humanToBytes('90MB'), max_size: humanToBytes('90MB'),
chunks_size: humanToBytes('20MB'), chunks_size: humanToBytes('20MB'),
enabled: true,
}), }),
mfa: s mfa: s
.object({ .object({

View file

@ -1,5 +1,5 @@
import { InvisibleFile } from '@prisma/client'; import { InvisibleFile } from '@prisma/client';
import { readdir, readFile, unlink, writeFile } from 'fs/promises'; import { writeFile } from 'fs/promises';
import zconfig from 'lib/config'; import zconfig from 'lib/config';
import datasource from 'lib/datasource'; import datasource from 'lib/datasource';
import { sendUpload } from 'lib/discord'; import { sendUpload } from 'lib/discord';
@ -14,6 +14,7 @@ import { removeGPSData } from 'lib/utils/exif';
import multer from 'multer'; import multer from 'multer';
import { join } from 'path'; import { join } from 'path';
import sharp from 'sharp'; import sharp from 'sharp';
import { Worker } from 'worker_threads';
const uploader = multer(); const uploader = multer();
const logger = Logger.get('upload'); const logger = Logger.get('upload');
@ -78,7 +79,7 @@ async function handler(req: NextApiReq, res: NextApiRes) {
if (fileMaxViews < 0) return res.badRequest('invalid max views (max views < 0)'); if (fileMaxViews < 0) return res.badRequest('invalid max views (max views < 0)');
// handle partial uploads before ratelimits // handle partial uploads before ratelimits
if (req.headers['content-range']) { if (req.headers['content-range'] && zconfig.chunks.enabled) {
// parses content-range header (bytes start-end/total) // parses content-range header (bytes start-end/total)
const [start, end, total] = req.headers['content-range'] const [start, end, total] = req.headers['content-range']
.replace('bytes ', '') .replace('bytes ', '')
@ -108,89 +109,29 @@ async function handler(req: NextApiReq, res: NextApiRes) {
await writeFile(tempFile, req.files[0].buffer); await writeFile(tempFile, req.files[0].buffer);
if (lastchunk) { if (lastchunk) {
const partials = await readdir(zconfig.core.temp_directory).then((files) => new Worker('./dist/worker/upload.js', {
files.filter((x) => x.startsWith(`zipline_partial_${identifier}`)) workerData: {
); user,
file: {
const readChunks = partials.map((x) => { filename,
const [, , , start, end] = x.split('_'); mimetype,
return { start: Number(start), end: Number(end), filename: x }; identifier,
}); lastchunk,
totalBytes: total,
// combine chunks },
const chunks = new Uint8Array(total); response: {
expiresAt: expiry,
for (let i = 0; i !== readChunks.length; ++i) { format,
const chunkData = readChunks[i]; imageCompressionPercent,
fileMaxViews,
const buffer = await readFile(join(zconfig.core.temp_directory, chunkData.filename)); },
await unlink(join(zconfig.core.temp_directory, readChunks[i].filename)); headers: req.headers,
chunks.set(buffer, chunkData.start);
}
const ext = filename.split('.').length === 1 ? '' : filename.split('.').pop();
if (zconfig.uploader.disabled_extensions.includes(ext))
return res.error('disabled extension recieved: ' + ext);
const fileName = await formatFileName(format, filename);
let password = null;
if (req.headers.password) {
password = await hashPassword(req.headers.password as string);
}
const compressionUsed = imageCompressionPercent && mimetype.startsWith('image/');
let invis: InvisibleFile;
const file = await prisma.file.create({
data: {
name: `${fileName}${compressionUsed ? '.jpg' : `${ext ? '.' : ''}${ext}`}`,
mimetype,
userId: user.id,
embed: !!req.headers.embed,
password,
expiresAt: expiry,
maxViews: fileMaxViews,
originalName: req.headers['original-name'] ? filename ?? null : null,
}, },
}); });
if (req.headers.zws) invis = await createInvisImage(zconfig.uploader.length, file.id); return res.json({
pending: true,
await datasource.save(file.name, Buffer.from(chunks)); });
logger.info(`User ${user.username} (${user.id}) uploaded ${file.name} (${file.id}) (chunked)`);
let domain;
if (req.headers['override-domain']) {
domain = `${zconfig.core.return_https ? 'https' : 'http'}://${req.headers['override-domain']}`;
} else if (user.domains.length) {
domain = user.domains[Math.floor(Math.random() * user.domains.length)];
} else {
domain = `${zconfig.core.return_https ? 'https' : 'http'}://${req.headers.host}`;
}
const responseUrl = `${domain}${zconfig.uploader.route === '/' ? '/' : zconfig.uploader.route + '/'}${
invis ? invis.invis : encodeURI(file.name)
}`;
response.files.push(responseUrl);
if (zconfig.discord?.upload) {
await sendUpload(user, file, `${domain}/r/${invis ? invis.invis : file.name}`, responseUrl);
}
if (zconfig.exif.enabled && zconfig.exif.remove_gps && mimetype.startsWith('image/')) {
try {
await removeGPSData(file);
response.removed_gps = true;
} catch (e) {
logger.error(`Failed to remove GPS data from ${file.name} (${file.id}) - ${e.message}`);
response.removed_gps = false;
}
}
return res.json(response);
} }
return res.json({ return res.json({

View file

@ -0,0 +1,40 @@
import prisma from 'lib/prisma';
import { NextApiReq, NextApiRes, UserExtended, withZipline } from 'middleware/withZipline';
async function handler(req: NextApiReq, res: NextApiRes, user: UserExtended) {
if (req.method === 'DELETE') {
const fileIds = req.body.id as number[];
const existingFiles = await prisma.incompleteFile.findMany({
where: {
id: {
in: fileIds,
},
userId: user.id,
},
});
const incFiles = await prisma.incompleteFile.deleteMany({
where: {
id: {
in: existingFiles.map((x) => x.id),
},
},
});
return res.json(incFiles);
} else {
const files = await prisma.incompleteFile.findMany({
where: {
userId: user.id,
},
});
return res.json(files);
}
}
export default withZipline(handler, {
methods: ['GET', 'DELETE'],
user: true,
});

225
src/worker/upload.ts Normal file
View file

@ -0,0 +1,225 @@
import { readdir, readFile, open, rm } from 'fs/promises';
import type { NameFormat } from 'lib/format';
import Logger from 'lib/logger';
import type { UserExtended } from 'middleware/withZipline';
import { isMainThread, workerData } from 'worker_threads';
import prisma from 'lib/prisma';
import { join } from 'path';
import { IncompleteFile, InvisibleFile } from '@prisma/client';
import { removeGPSData } from 'lib/utils/exif';
import { sendUpload } from 'lib/discord';
import { createInvisImage, hashPassword } from 'lib/util';
import formatFileName from 'lib/format';
export type UploadWorkerData = {
user: UserExtended;
file: {
filename: string;
mimetype: string;
identifier: string;
lastchunk: boolean;
totalBytes: number;
};
response: {
expiresAt?: Date;
format: NameFormat;
imageCompressionPercent?: number;
fileMaxViews?: number;
};
headers: Record<string, string>;
};
const { user, file, response, headers } = workerData as UploadWorkerData;
const logger = Logger.get('worker::upload').child(file?.identifier ?? 'unknown-ident');
if (isMainThread) {
logger.error('worker is not a thread');
process.exit(1);
}
if (!file.lastchunk) {
logger.error('lastchunk is false, worker should not have been started');
process.exit(1);
}
if (!config.chunks.enabled) {
logger.error('chunks are not enabled, worker should not have been started');
process.exit(1);
}
start();
async function start() {
logger.debug('starting worker');
const partials = await readdir(config.core.temp_directory).then((files) =>
files.filter((x) => x.startsWith(`zipline_partial_${file.identifier}`))
);
const readChunks = partials.map((x) => {
const [, , , start, end] = x.split('_');
return { start: Number(start), end: Number(end), filename: x };
});
const incompleteFile = await prisma.incompleteFile.create({
data: {
data: {
file,
},
chunks: readChunks.length,
chunksComplete: 0,
status: 'PENDING',
userId: user.id,
},
});
const compressionUsed = response.imageCompressionPercent && file.mimetype.startsWith('image/');
const ext = file.filename.split('.').length === 1 ? '' : file.filename.split('.').pop();
const fileName = await formatFileName(response.format, file.filename);
let fd;
if (config.datasource.type === 'local') {
fd = await open(
join(
process.cwd(),
config.datasource.local.directory,
`${fileName}${compressionUsed ? '.jpg' : `${ext ? '.' : ''}${ext}`}`
),
'w'
);
} else {
fd = new Uint8Array(file.totalBytes);
}
for (let i = 0; i !== readChunks.length; ++i) {
const chunk = readChunks[i];
const buffer = await readFile(join(config.core.temp_directory, chunk.filename));
if (config.datasource.type === 'local') {
const { bytesWritten } = await fd.write(buffer, 0, buffer.length, chunk.start);
logger.child('fd').debug(`wrote ${bytesWritten} bytes to file`);
} else {
fd.set(buffer, chunk.start);
logger.child('bytes').debug(`wrote ${buffer.length} bytes to array`);
}
await rm(join(config.core.temp_directory, chunk.filename));
await prisma.incompleteFile.update({
where: {
id: incompleteFile.id,
},
data: {
chunksComplete: {
increment: 1,
},
status: 'PROCESSING',
},
});
}
if (config.datasource.type === 'local') {
await fd.close();
} else {
logger.debug('writing file to datasource');
await datasource.save(
`${fileName}${compressionUsed ? '.jpg' : `${ext ? '.' : ''}${ext}`}`,
Buffer.from(fd as Uint8Array)
);
}
const final = await prisma.incompleteFile.update({
where: {
id: incompleteFile.id,
},
data: {
status: 'COMPLETE',
},
});
logger.debug('done writing file');
await runFileComplete(fileName, ext, compressionUsed, final);
logger.debug('done running worker');
process.exit(0);
}
async function setResponse(incompleteFile: IncompleteFile, code: number, message: string) {
incompleteFile.data['code'] = code;
incompleteFile.data['message'] = message;
return prisma.incompleteFile.update({
where: {
id: incompleteFile.id,
},
data: {
data: incompleteFile.data,
},
});
}
async function runFileComplete(
fileName: string,
ext: string,
compressionUsed: boolean,
incompleteFile: IncompleteFile
) {
if (config.uploader.disabled_extensions.includes(ext))
return setResponse(incompleteFile, 403, 'disabled extension');
let password = null;
if (headers.password) {
password = await hashPassword(headers.password as string);
}
let invis: InvisibleFile;
const fFile = await prisma.file.create({
data: {
name: `${fileName}${compressionUsed ? '.jpg' : `${ext ? '.' : ''}${ext}`}`,
mimetype: file.mimetype,
userId: user.id,
embed: !!headers.embed,
password,
expiresAt: response.expiresAt,
maxViews: response.fileMaxViews,
originalName: headers['original-name'] ? file.filename ?? null : null,
size: file.totalBytes,
},
});
if (headers.zws) invis = await createInvisImage(config.uploader.length, fFile.id);
logger.info(`User ${user.username} (${user.id}) uploaded ${fFile.name} (${fFile.id}) (chunked)`);
let domain;
if (headers['override-domain']) {
domain = `${config.core.return_https ? 'https' : 'http'}://${headers['override-domain']}`;
} else if (user.domains.length) {
domain = user.domains[Math.floor(Math.random() * user.domains.length)];
} else {
domain = `${config.core.return_https ? 'https' : 'http'}://${headers.host}`;
}
const responseUrl = `${domain}${config.uploader.route === '/' ? '/' : config.uploader.route + '/'}${
invis ? invis.invis : encodeURI(fFile.name)
}`;
if (config.discord?.upload) {
await sendUpload(user, fFile, `${domain}/r/${invis ? invis.invis : fFile.name}`, responseUrl);
}
if (config.exif.enabled && config.exif.remove_gps && fFile.mimetype.startsWith('image/')) {
try {
await removeGPSData(fFile);
} catch (e) {
logger.error(`Failed to remove GPS data from ${fFile.name} (${fFile.id}) - ${e.message}`);
}
}
await setResponse(incompleteFile, 200, responseUrl);
}

View file

@ -13,6 +13,12 @@ export default defineConfig([
entryPoints: ['src/server/index.ts'], entryPoints: ['src/server/index.ts'],
...opts, ...opts,
}, },
// workers
{
entryPoints: ['src/worker/upload.ts'],
outDir: 'dist/worker',
...opts,
},
// scripts // scripts
{ {
entryPoints: ['src/scripts/import-dir.ts'], entryPoints: ['src/scripts/import-dir.ts'],