mirror of
https://github.com/withastro/astro.git
synced 2025-03-31 23:31:30 -05:00
Cancel response stream when connection closes (#9071)
* cancel stream on close * add changeset * add test * Update .changeset/modern-ways-develop.md Co-authored-by: Sarah Rainsberger <sarah@rainsberger.ca> --------- Co-authored-by: lilnasy <69170106+lilnasy@users.noreply.github.com> Co-authored-by: Sarah Rainsberger <sarah@rainsberger.ca>
This commit is contained in:
parent
eeac288551
commit
c9487138d6
6 changed files with 55 additions and 233 deletions
5
.changeset/modern-ways-develop.md
Normal file
5
.changeset/modern-ways-develop.md
Normal file
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
'@astrojs/node': patch
|
||||
---
|
||||
|
||||
Fixes a bug where the response stream would not cancel when the connection closed
|
|
@ -1,8 +1,6 @@
|
|||
import type { NodeApp } from 'astro/app/node';
|
||||
import type { ServerResponse } from 'node:http';
|
||||
import type { Readable } from 'stream';
|
||||
import { createOutgoingHttpHeaders } from './createOutgoingHttpHeaders.js';
|
||||
import { responseIterator } from './response-iterator.js';
|
||||
import type { ErrorHandlerParams, Options, RequestHandlerParams } from './types.js';
|
||||
|
||||
// Disable no-unused-vars to avoid breaking signature change
|
||||
|
@ -79,8 +77,14 @@ async function writeWebResponse(app: NodeApp, res: ServerResponse, webResponse:
|
|||
res.writeHead(status, nodeHeaders);
|
||||
if (webResponse.body) {
|
||||
try {
|
||||
for await (const chunk of responseIterator(webResponse) as unknown as Readable) {
|
||||
res.write(chunk);
|
||||
const reader = webResponse.body.getReader();
|
||||
res.on("close", () => {
|
||||
reader.cancel();
|
||||
})
|
||||
let result = await reader.read();
|
||||
while (!result.done) {
|
||||
res.write(result.value);
|
||||
result = await reader.read();
|
||||
}
|
||||
} catch (err: any) {
|
||||
console.error(err?.stack || err?.message || String(err));
|
||||
|
|
|
@ -1,228 +0,0 @@
|
|||
/**
|
||||
* Original sources:
|
||||
* - https://github.com/kmalakoff/response-iterator/blob/master/src/index.ts
|
||||
* - https://github.com/apollographql/apollo-client/blob/main/src/utilities/common/responseIterator.ts
|
||||
*/
|
||||
|
||||
import { AstroError } from 'astro/errors';
|
||||
import type { ReadableStreamDefaultReadResult } from 'node:stream/web';
|
||||
import { Readable as NodeReadableStream } from 'stream';
|
||||
|
||||
interface NodeStreamIterator<T> {
|
||||
next(): Promise<IteratorResult<T, boolean | undefined>>;
|
||||
[Symbol.asyncIterator]?(): AsyncIterator<T>;
|
||||
}
|
||||
|
||||
interface PromiseIterator<T> {
|
||||
next(): Promise<IteratorResult<T, ArrayBuffer | undefined>>;
|
||||
[Symbol.asyncIterator]?(): AsyncIterator<T>;
|
||||
}
|
||||
|
||||
interface ReaderIterator<T> {
|
||||
next(): Promise<ReadableStreamDefaultReadResult<T>>;
|
||||
[Symbol.asyncIterator]?(): AsyncIterator<T>;
|
||||
}
|
||||
|
||||
const canUseSymbol = typeof Symbol === 'function' && typeof Symbol.for === 'function';
|
||||
|
||||
const canUseAsyncIteratorSymbol = canUseSymbol && Symbol.asyncIterator;
|
||||
|
||||
function isBuffer(value: any): value is Buffer {
|
||||
return (
|
||||
value?.constructor != null &&
|
||||
typeof value.constructor.isBuffer === 'function' &&
|
||||
value.constructor.isBuffer(value)
|
||||
);
|
||||
}
|
||||
|
||||
function isNodeResponse(value: any): value is Response {
|
||||
return !!(value as Response).body;
|
||||
}
|
||||
|
||||
function isReadableStream(value: any): value is ReadableStream<any> {
|
||||
return !!(value as ReadableStream<any>).getReader;
|
||||
}
|
||||
|
||||
function isAsyncIterableIterator(value: any): value is AsyncIterableIterator<any> {
|
||||
return !!(
|
||||
canUseAsyncIteratorSymbol && (value as AsyncIterableIterator<any>)[Symbol.asyncIterator]
|
||||
);
|
||||
}
|
||||
|
||||
function isStreamableBlob(value: any): value is Blob {
|
||||
return !!(value as Blob).stream;
|
||||
}
|
||||
|
||||
function isBlob(value: any): value is Blob {
|
||||
return !!(value as Blob).arrayBuffer;
|
||||
}
|
||||
|
||||
function isNodeReadableStream(value: any): value is NodeReadableStream {
|
||||
return !!(value as NodeReadableStream).pipe;
|
||||
}
|
||||
|
||||
function readerIterator<T>(reader: ReadableStreamDefaultReader<T>): AsyncIterableIterator<T> {
|
||||
const iterator: ReaderIterator<T> = {
|
||||
//@ts-expect-error
|
||||
next() {
|
||||
return reader.read();
|
||||
},
|
||||
};
|
||||
|
||||
if (canUseAsyncIteratorSymbol) {
|
||||
iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> {
|
||||
//@ts-expect-error
|
||||
return this;
|
||||
};
|
||||
}
|
||||
|
||||
return iterator as AsyncIterableIterator<T>;
|
||||
}
|
||||
|
||||
function promiseIterator<T = ArrayBuffer>(promise: Promise<ArrayBuffer>): AsyncIterableIterator<T> {
|
||||
let resolved = false;
|
||||
|
||||
const iterator: PromiseIterator<T> = {
|
||||
next(): Promise<IteratorResult<T, ArrayBuffer | undefined>> {
|
||||
if (resolved)
|
||||
return Promise.resolve({
|
||||
value: undefined,
|
||||
done: true,
|
||||
});
|
||||
resolved = true;
|
||||
return new Promise(function (resolve, reject) {
|
||||
promise
|
||||
.then(function (value) {
|
||||
resolve({ value: value as unknown as T, done: false });
|
||||
})
|
||||
.catch(reject);
|
||||
});
|
||||
},
|
||||
};
|
||||
|
||||
if (canUseAsyncIteratorSymbol) {
|
||||
iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> {
|
||||
return this;
|
||||
};
|
||||
}
|
||||
|
||||
return iterator as AsyncIterableIterator<T>;
|
||||
}
|
||||
|
||||
function nodeStreamIterator<T>(stream: NodeReadableStream): AsyncIterableIterator<T> {
|
||||
let cleanup: (() => void) | null = null;
|
||||
let error: Error | null = null;
|
||||
let done = false;
|
||||
const data: unknown[] = [];
|
||||
|
||||
const waiting: [
|
||||
(
|
||||
value:
|
||||
| IteratorResult<T, boolean | undefined>
|
||||
| PromiseLike<IteratorResult<T, boolean | undefined>>
|
||||
) => void,
|
||||
(reason?: any) => void,
|
||||
][] = [];
|
||||
|
||||
function onData(chunk: any) {
|
||||
if (error) return;
|
||||
if (waiting.length) {
|
||||
const shiftedArr = waiting.shift();
|
||||
if (Array.isArray(shiftedArr) && shiftedArr[0]) {
|
||||
return shiftedArr[0]({ value: chunk, done: false });
|
||||
}
|
||||
}
|
||||
data.push(chunk);
|
||||
}
|
||||
function onError(err: Error) {
|
||||
error = err;
|
||||
const all = waiting.slice();
|
||||
all.forEach(function (pair) {
|
||||
pair[1](err);
|
||||
});
|
||||
!cleanup || cleanup();
|
||||
}
|
||||
function onEnd() {
|
||||
done = true;
|
||||
const all = waiting.slice();
|
||||
all.forEach(function (pair) {
|
||||
pair[0]({ value: undefined, done: true });
|
||||
});
|
||||
!cleanup || cleanup();
|
||||
}
|
||||
|
||||
cleanup = function () {
|
||||
cleanup = null;
|
||||
stream.removeListener('data', onData);
|
||||
stream.removeListener('error', onError);
|
||||
stream.removeListener('end', onEnd);
|
||||
stream.removeListener('finish', onEnd);
|
||||
stream.removeListener('close', onEnd);
|
||||
};
|
||||
stream.on('data', onData);
|
||||
stream.on('error', onError);
|
||||
stream.on('end', onEnd);
|
||||
stream.on('finish', onEnd);
|
||||
stream.on('close', onEnd);
|
||||
|
||||
function getNext(): Promise<IteratorResult<T, boolean | undefined>> {
|
||||
return new Promise(function (resolve, reject) {
|
||||
if (error) return reject(error);
|
||||
if (data.length) return resolve({ value: data.shift() as T, done: false });
|
||||
if (done) return resolve({ value: undefined, done: true });
|
||||
waiting.push([resolve, reject]);
|
||||
});
|
||||
}
|
||||
|
||||
const iterator: NodeStreamIterator<T> = {
|
||||
next(): Promise<IteratorResult<T, boolean | undefined>> {
|
||||
return getNext();
|
||||
},
|
||||
};
|
||||
|
||||
if (canUseAsyncIteratorSymbol) {
|
||||
iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> {
|
||||
return this;
|
||||
};
|
||||
}
|
||||
|
||||
return iterator as AsyncIterableIterator<T>;
|
||||
}
|
||||
|
||||
function asyncIterator<T>(source: AsyncIterableIterator<T>): AsyncIterableIterator<T> {
|
||||
const iterator = source[Symbol.asyncIterator]();
|
||||
return {
|
||||
next(): Promise<IteratorResult<T, boolean>> {
|
||||
return iterator.next();
|
||||
},
|
||||
[Symbol.asyncIterator](): AsyncIterableIterator<T> {
|
||||
return this;
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export function responseIterator<T>(response: Response | Buffer): AsyncIterableIterator<T> {
|
||||
let body: unknown = response;
|
||||
|
||||
if (isNodeResponse(response)) body = response.body;
|
||||
|
||||
if (isBuffer(body)) body = NodeReadableStream.from(body);
|
||||
|
||||
if (isAsyncIterableIterator(body)) return asyncIterator<T>(body);
|
||||
|
||||
if (isReadableStream(body)) return readerIterator<T>(body.getReader());
|
||||
|
||||
// this errors without casting to ReadableStream<T>
|
||||
// because Blob.stream() returns a NodeJS ReadableStream
|
||||
if (isStreamableBlob(body)) {
|
||||
return readerIterator<T>((body.stream() as unknown as ReadableStream<T>).getReader());
|
||||
}
|
||||
|
||||
if (isBlob(body)) return promiseIterator<T>(body.arrayBuffer());
|
||||
|
||||
if (isNodeReadableStream(body)) return nodeStreamIterator<T>(body);
|
||||
|
||||
throw new AstroError(
|
||||
'Unknown body type for responseIterator. Please pass a streamable response.'
|
||||
);
|
||||
}
|
|
@ -89,4 +89,23 @@ describe('API routes', () => {
|
|||
let [out] = await done;
|
||||
expect(new Uint8Array(out.buffer)).to.deep.equal(expectedDigest);
|
||||
});
|
||||
|
||||
it('Can bail on streaming', async () => {
|
||||
const { handler } = await import('./fixtures/api-route/dist/server/entry.mjs');
|
||||
let { req, res, done } = createRequestAndResponse({
|
||||
url: '/streaming',
|
||||
});
|
||||
|
||||
let locals = { cancelledByTheServer: false };
|
||||
|
||||
handler(req, res, () => {}, locals);
|
||||
req.send();
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 500));
|
||||
res.emit("close");
|
||||
|
||||
await done;
|
||||
|
||||
expect(locals).to.deep.include({ cancelledByTheServer: true });
|
||||
});
|
||||
});
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import crypto from 'node:crypto';
|
||||
|
||||
export async function post({ request }: { request: Request }) {
|
||||
export async function POST({ request }: { request: Request }) {
|
||||
const hash = crypto.createHash('sha256');
|
||||
|
||||
const iterable = request.body as unknown as AsyncIterable<Uint8Array>;
|
||||
|
|
22
packages/integrations/node/test/fixtures/api-route/src/pages/streaming.ts
vendored
Normal file
22
packages/integrations/node/test/fixtures/api-route/src/pages/streaming.ts
vendored
Normal file
|
@ -0,0 +1,22 @@
|
|||
export const GET = ({ locals }) => {
|
||||
let sentChunks = 0;
|
||||
|
||||
const readableStream = new ReadableStream({
|
||||
async pull(controller) {
|
||||
if (sentChunks === 3) return controller.close();
|
||||
else sentChunks++;
|
||||
|
||||
await new Promise(resolve => setTimeout(resolve, 1000));
|
||||
controller.enqueue(new TextEncoder().encode('hello\n'));
|
||||
},
|
||||
cancel() {
|
||||
locals.cancelledByTheServer = true;
|
||||
}
|
||||
});
|
||||
|
||||
return new Response(readableStream, {
|
||||
headers: {
|
||||
"Content-Type": "text/event-stream"
|
||||
}
|
||||
});
|
||||
}
|
Loading…
Add table
Reference in a new issue