Support back pressure
Support plan
- is this issue currently blocking your project? (yes/no): yes
- is this issue affecting a production system? (yes/no): no
Context
- node version: 16.20.0
- module version with issue: 5.1.0
- last module version without issue: 5.1.0
- environment (e.g. node, browser, native): node
-
used with (e.g. hapi application, another framework, standalone, ...): hapi application (
route.payload.parse = "gunzip") - any other relevant information:
What are you trying to achieve or the steps to reproduce?
const { promisify } = require("node:util")
const { pipeline } = require("node:stream/promises")
const Pez = require('@hapi/pez')
const Content = require('@hapi/content')
const sleep = promisify(setTimeout)
server.route({
method: 'POST',
path: `/upload`,
options: {
payload: {
output: 'stream',
parse: 'gunzip',
maxBytes: 10 * 1024 * 1024 * 1024, // 10GB
},
},
handler: async (r, h) => {
const contentType = Content.type(r.headers['content-type'])
if (contentType.mime === 'multipart/form-data') {
const promises = []
const dispenser = new Pez.Dispenser({
boundary: contentType.boundary,
})
dispenser.on('part', (stream) => {
console.log('part', stream.name)
try {
promises.push(processPartSlowly(stream))
} catch (err) {
promises.push(Promise.reject(err))
}
})
await pipeline(r.payload, dispenser)
console.log('payload fully consumed')
const results = await Promise.allSettled(promises)
console.log('all streams were consumed')
console.log('results', results)
}
}
})
async function processPartSlowly(stream) {
for await (const chunk of stream) {
sleep(chunk.length) // Will sleep for 1 second per 1000 bytes
}
}
What was the result you got?
The payload fully consumed log message comes in quite fast, no matter what the consumer speed is.
Back pressure should be applied to the input stream. In the current situation if consumption of the stream is slow, it will result in a high memory usage. This situation could even arise when Hapi's route.options.payload.output option is set to file and the file system is slow.
What result did you expect?
The inbound HTTP request stream (r.payload) should stop being consumed if the highWaterMarck of the stream is reached. The return value of this line should not be ignored.
Sooo, I ended up re-writing pez using async generators. This allows for much easier control over the dataflow. The code bellow can be used like so:
for await (const part of multipart(r.payload, contentType.boundary)) {
if (part.type === 'preamble') {
console.log('preamble', part.data.toString())
}
if (part.type === 'file') {
// part.stream is a readable that *MUST* be consumed
// back-pressure will be applied
const chunks = []
for await (const chunk of part.stream) chunks.push(chunk)
const buffer = Buffer.concat(chunks)
console.log('file', part.name, buffer.length)
}
if (part.type === 'field') {
console.log('field', part.name, part.value)
}
if (part.type === 'epilogue') {
console.log('epilogue', part.data.toString())
}
}
Back pressure will work as expected, meaning that the http request multipart payload will not be consumed faster than the processing of each individual part. This also mean that the steam of the file parts must be consumed for the processing of the payload to work properly.
Code here
Use this code at your own risk. It was not tested and should not be used without proper testing & foolproofing.
import { Duplex, PassThrough, Readable } from 'node:stream'
import * as B64 from '@hapi/b64'
import * as Boom from '@hapi/boom'
import * as Content from '@hapi/content'
import * as Nigel from '@hapi/nigel'
import * as Vise from '@hapi/vise'
export const CRLF = Buffer.from(`\r\n`)
export const DASHDASH = Buffer.from(`--`)
export const HEADER_NEEDLE = Nigel.compile(CRLF)
export const HEADERS_NEEDLE = Nigel.compile(Buffer.concat([CRLF, CRLF]))
type PreamblePart = {
type: 'preamble'
data: Buffer
}
type FilePart = {
type: 'file'
name: string
stream: Readable
filename: string
headers: Record<string, undefined | string>
}
type FieldPart = {
type: 'field'
name: string
data: Buffer
headers: Record<string, undefined | string>
}
type EpiloguePart = {
type: 'epilogue'
data: Buffer
}
export type Part = PreamblePart | FilePart | FieldPart | EpiloguePart
export async function* multipart(
readable: Readable,
{ boundary }: { boundary: string }
): AsyncGenerator<Part> {
const chunks = new Vise.Vise()
const promises = new Set<Promise<void>>()
let result: IteratorResult<Buffer | null, any>
const generator = multipartSplit(readable, boundary)
try {
// Everything up to the first boundary is preamble data
while (true) {
result = await generator.next()
if (result.done) throw Boom.badRequest('Missing initial boundary')
if (result.value) chunks.push(result.value)
else {
// Got the first boundary, process preamble data
if (chunks.length > 0) {
const dataLength = chunks.length - CRLF.length
if (dataLength < 0) throw Boom.badRequest('Preamble too short')
if (!chunks.startsWith(CRLF, dataLength)) {
throw Boom.badRequest('Preamble missing CRLF terminator')
}
if (dataLength) {
const data = Buffer.concat(chunks.shift(dataLength))
yield { type: 'preamble', data }
}
// Remove anything left (the "CRLF")
chunks.shift(chunks.length)
}
// Process the rest
break
}
}
// Process fields and files
while (true) {
result = await generator.next()
if (result.done) throw Boom.badRequest('Missing epilogue boundary')
if (!result.value) throw Boom.badRequest('Incomplete multipart payload')
const previousLength = chunks.length
// Add the chunk to the buffer
chunks.push(result.value)
// If the chunks were previously empty, ignore any TAB & SPACE char right
// after the boundary. Otherwise, the chunks were cleaned in an earlier
// iteration.
if (previousLength === 0) {
const whiteSpaceCount = getLeadingWhiteSpaceCount(chunks)
if (whiteSpaceCount) chunks.shift(whiteSpaceCount)
}
// We can't determine what we have until we have at least 2 bytes (CRLF or DASHDASH)
if (chunks.length < 2) continue
// At this point, we are after a boundary and we are looking for
// an epilogue prefix ('--') or a new line (CRLF) and some headers.
if (chunks.startsWith(CRLF)) {
let start = CRLF.length
let matchingPos = Nigel.horspool(chunks, HEADERS_NEEDLE, start)
// Consume the generator until the end of headers (CRLF 2x) is found
while (matchingPos === -1) {
result = await generator.next()
if (result.done) throw Boom.badRequest('Missing end boundary')
if (!result.value) throw Boom.badRequest(`Unexpected boundary before part payload`)
chunks.push(result.value)
matchingPos = Nigel.horspool(chunks, HEADERS_NEEDLE, start)
// No need to re-check in the bytes from previous iteration
start = chunks.length - HEADERS_NEEDLE.length + 1
}
// Process headers
const headersData = Buffer.concat(chunks.shift(matchingPos + HEADERS_NEEDLE.length))
const headers = parseHeaders(
headersData,
CRLF.length,
headersData.length - HEADERS_NEEDLE.length
)
if (!headers['content-disposition']) {
throw Boom.badRequest('Missing content-disposition header')
}
const { name, filename } = Content.disposition(headers['content-disposition'])
const transferEncoding = headers['content-transfer-encoding']?.toLowerCase()
// At this point "chunks" (if not empty) contains initial payload data
const payloadChunks = chunks.shift(chunks.length)
if (filename !== undefined) {
// FilePart. We will create (and yield) a stream that will consume the
// generator untile the next boundary. The stream will only consume
// the generator when it is consumed itself, allowing back pressure to
// work.
const stream: Duplex =
transferEncoding === 'base64' ? new B64.Decoder() : new PassThrough()
for (const bodyChunk of payloadChunks) stream.write(bodyChunk)
// Start an async process to consume the generator. We must not
// await before the stream is yield to avoid any deadlock.
const consumePromise = consumeFile(generator, stream, filename)
// We need to catch() synchronously to avoid uncaught promise
// rejection. Also, we don't want/need "promises" to contain
// rejected promises.
const filePromise = consumePromise.catch(() => {})
// Make sure we won't destroy the generator before the file is
// consumed.
promises.add(filePromise)
yield { type: 'file', name, stream, filename, headers }
// Do not consume more generator items until the current part has been
// fully consumed. This will also, re-throw if the iterator did throw
// during consumeFile.
await consumePromise
// Free memory
promises.delete(filePromise)
} else {
// FieldPart
while (true) {
result = await generator.next()
if (result.done) throw Boom.badRequest(`Missing end boundary after field '${name}'`)
else if (result.value === null) break
else payloadChunks.push(result.value)
}
const bytes = Buffer.concat(payloadChunks)
const data = transferEncoding === 'base64' ? B64.decode(bytes) : bytes
yield { type: 'field', headers, name, data }
}
continue // End of part. Let's fetch next one
}
if (chunks.startsWith(DASHDASH, 0)) {
chunks.shift(DASHDASH.length)
// Consume the rest of the generator
while (true) {
result = await generator.next()
if (result.done) break
if (result.value) chunks.push(result.value)
else throw Boom.badRequest(`Epilogue should not contain a boundary`)
}
if (chunks.length) {
if (chunks.startsWith(CRLF)) chunks.shift(CRLF.length)
else throw Boom.badRequest('Missing CRLF after epilogue boundary')
// Yield the remaining data as epilogue data
if (chunks.length) {
const data = Buffer.concat(chunks.shift(chunks.length))
yield { type: 'epilogue', data }
}
}
break
}
throw Boom.badRequest('Invalid characters after boundary')
}
} catch (err) {
await generator.throw(err)
throw err
} finally {
// If the caller of this method breaks the loop before the iterator is done,
// we need to manually destroy the underlying iterator. But we will do so
// only after the last yield file has been fully generated.
await Promise.all([...promises])
await generator.return(null)
}
}
async function* multipartSplit(
readable: Readable,
boundary: string
): AsyncGenerator<Buffer | null> {
const chunks = new Vise.Vise()
const boundaryBuffer = Buffer.from(`--${boundary}`)
const initialNeedle = Nigel.compile(boundaryBuffer)
const followupNeedle = Nigel.compile(Buffer.concat([CRLF, boundaryBuffer]))
let needle = initialNeedle
for await (const chunk of readable) {
chunks.push(chunk)
const matchingPos = Nigel.horspool(chunks, needle)
if (matchingPos !== -1) {
// Yield everything before the needle
yield Buffer.concat(chunks.shift(matchingPos))
// Drop the needle from the internal buffer
chunks.shift(needle.length)
// Notify we got a match
yield null
// Use the to the followup needle from now on
needle = followupNeedle
} else {
// Needle not found in chunks: Everything before the last
// needle.length bytes can be yield
const lengthSafeToYield = Math.max(0, chunks.length - needle.length)
yield Buffer.concat(chunks.shift(lengthSafeToYield))
}
}
// Yield the remaining chunks (which, by now, do not contain the needle)
yield Buffer.concat(chunks.shift(chunks.length))
}
async function consumeFile(
generator: AsyncGenerator<Buffer | null>,
stream: Duplex,
filename: string
): Promise<void> {
try {
let result: IteratorResult<Buffer | null>
while (true) {
result = await generator.next()
if (result.done) {
// A boundary is expected before the end of the multipart payload
throw Boom.badRequest(`Missing end boundary after file '${filename}'`)
} else if (stream.writable) {
if (result.value === null) {
stream.end()
} else {
const more = stream.write(result.value)
if (!more) await isWritableOrFinished(stream)
}
} else {
// Else the stream is not writable anymore, so we just consume (and
// ignore) the data until the end of the file.
}
// We got a boundary, so we are done with this file
if (result.value === null) break
}
} catch (err) {
stream.destroy(err as Error)
throw err
}
}
// https://www.rfc-editor.org/rfc/rfc2616#section-4.2
export function parseHeaders(
data: Buffer,
start = 0,
end = data.length
): Record<string, undefined | string> {
const headers: Record<string, undefined | string> = Object.create(null)
while (start < end) {
let lineEnd = Nigel.horspool(data, HEADER_NEEDLE, start)
// If the next line starts with a HTAB or SP, it's a continuation of the current line
while (lineEnd !== -1 && (data[lineEnd + 2] === 0x09 || data[lineEnd + 2] === 0x20)) {
lineEnd = Nigel.horspool(data, HEADER_NEEDLE, lineEnd + HEADER_NEEDLE.length + 1)
}
if (lineEnd === -1) throw Boom.badRequest('Missing CRLF terminator')
if (lineEnd > end) throw Boom.badRequest('Header line too long')
if (lineEnd < start + 1) throw Boom.badRequest('Empty header line')
const line = data.toString('utf-8', start, lineEnd)
const [name, value] = line.split(':', 2)
if (value === undefined) throw Boom.badRequest('Missing colon in header line')
if (name.length === 0) throw Boom.badRequest('Empty header name')
if (name === '__proto__') throw Boom.badRequest('Invalid header name')
// name must not contain control characters, spaces, or colons
// eslint-disable-next-line no-control-regex
if (name.match(/[\x00-\x1f\x7f\s]/)) throw Boom.badRequest('Invalid header name')
const nameNormalized = name.toLowerCase()
const valueNormalized = value.trim().replaceAll(/\r\n[\t ]+/g, ' ')
headers[nameNormalized] =
headers[nameNormalized] != null
? `${headers[nameNormalized]},${valueNormalized}`
: valueNormalized
start = lineEnd + HEADER_NEEDLE.length
}
return headers
}
export function getLeadingWhiteSpaceCount(chunks: Nigel.Haystack): number {
let whiteSpaceCount = 0
let charCode: number | undefined
for (let i = 0; i < chunks.length; i++) {
charCode = chunks.readUInt8(i)
if (charCode === 0x09 || charCode === 0x20) whiteSpaceCount = i + 1
else break
}
return whiteSpaceCount
}
export async function isWritableOrFinished(stream: Writable, signal?: AbortSignal): Promise<void> {
if (signal?.aborted) return
return new Promise<void>((resolve) => {
const done = () => {
resolve()
cleanup()
stream.off('drain', done)
signal?.removeEventListener('abort', done)
}
const cleanup = finished(stream, { readable: false }, done)
stream.on('drain', done)
signal?.addEventListener('abort', done)
})
}
This code can freely be used under the MIT licence.