pez icon indicating copy to clipboard operation
pez copied to clipboard

Support back pressure

Open matthieusieben opened this issue 2 years ago • 1 comments

Support plan

  • is this issue currently blocking your project? (yes/no): yes
  • is this issue affecting a production system? (yes/no): no

Context

  • node version: 16.20.0
  • module version with issue: 5.1.0
  • last module version without issue: 5.1.0
  • environment (e.g. node, browser, native): node
  • used with (e.g. hapi application, another framework, standalone, ...): hapi application (route.payload.parse = "gunzip")
  • any other relevant information:

What are you trying to achieve or the steps to reproduce?

const { promisify } = require("node:util")
const { pipeline } = require("node:stream/promises")
const Pez = require('@hapi/pez')
const Content = require('@hapi/content')

const sleep = promisify(setTimeout)

server.route({
  method: 'POST',
  path: `/upload`,
  options: {
    payload: {
      output: 'stream',
      parse: 'gunzip',
      maxBytes: 10 * 1024 * 1024 * 1024, // 10GB
    },
  },
  handler: async (r, h) => {
    const contentType = Content.type(r.headers['content-type'])
    if (contentType.mime === 'multipart/form-data') {
      const promises = []

      const dispenser = new Pez.Dispenser({
        boundary: contentType.boundary,
      })

      dispenser.on('part', (stream) => {
        console.log('part', stream.name)
        try {
          promises.push(processPartSlowly(stream))
        } catch (err) {
          promises.push(Promise.reject(err))
        }
      })

      await pipeline(r.payload, dispenser)
      
      console.log('payload fully consumed')

      const results = await Promise.allSettled(promises)
      
      console.log('all streams were consumed')

      console.log('results', results)

    }
  }
})

async function processPartSlowly(stream) {
  for await (const chunk of stream) {
    sleep(chunk.length) // Will sleep for 1 second per 1000 bytes
  }
}

What was the result you got?

The payload fully consumed log message comes in quite fast, no matter what the consumer speed is.

Back pressure should be applied to the input stream. In the current situation if consumption of the stream is slow, it will result in a high memory usage. This situation could even arise when Hapi's route.options.payload.output option is set to file and the file system is slow.

What result did you expect?

The inbound HTTP request stream (r.payload) should stop being consumed if the highWaterMarck of the stream is reached. The return value of this line should not be ignored.

matthieusieben avatar Jun 05 '23 13:06 matthieusieben

Sooo, I ended up re-writing pez using async generators. This allows for much easier control over the dataflow. The code bellow can be used like so:

      for await (const part of multipart(r.payload, contentType.boundary)) {

        if (part.type === 'preamble') {
          console.log('preamble', part.data.toString())
        }

        if (part.type === 'file') {
          // part.stream is a readable that *MUST* be consumed
          // back-pressure will be applied

          const chunks = []
          for await (const chunk of part.stream) chunks.push(chunk)
          const buffer = Buffer.concat(chunks)
          console.log('file', part.name, buffer.length)
        }

        if (part.type === 'field') {
          console.log('field', part.name, part.value)
        }

        if (part.type === 'epilogue') {
          console.log('epilogue', part.data.toString())
        }
      }

Back pressure will work as expected, meaning that the http request multipart payload will not be consumed faster than the processing of each individual part. This also mean that the steam of the file parts must be consumed for the processing of the payload to work properly.

Code here

Use this code at your own risk. It was not tested and should not be used without proper testing & foolproofing.

import { Duplex, PassThrough, Readable } from 'node:stream'

import * as B64 from '@hapi/b64'
import * as Boom from '@hapi/boom'
import * as Content from '@hapi/content'
import * as Nigel from '@hapi/nigel'
import * as Vise from '@hapi/vise'

export const CRLF = Buffer.from(`\r\n`)
export const DASHDASH = Buffer.from(`--`)

export const HEADER_NEEDLE = Nigel.compile(CRLF)
export const HEADERS_NEEDLE = Nigel.compile(Buffer.concat([CRLF, CRLF]))

type PreamblePart = {
  type: 'preamble'
  data: Buffer
}
type FilePart = {
  type: 'file'
  name: string
  stream: Readable
  filename: string
  headers: Record<string, undefined | string>
}
type FieldPart = {
  type: 'field'
  name: string
  data: Buffer
  headers: Record<string, undefined | string>
}
type EpiloguePart = {
  type: 'epilogue'
  data: Buffer
}

export type Part = PreamblePart | FilePart | FieldPart | EpiloguePart

export async function* multipart(
  readable: Readable,
  { boundary }: { boundary: string }
): AsyncGenerator<Part> {
  const chunks = new Vise.Vise()

  const promises = new Set<Promise<void>>()
  let result: IteratorResult<Buffer | null, any>

  const generator = multipartSplit(readable, boundary)
  try {
    // Everything up to the first boundary is preamble data
    while (true) {
      result = await generator.next()
      if (result.done) throw Boom.badRequest('Missing initial boundary')
      if (result.value) chunks.push(result.value)
      else {
        // Got the first boundary, process preamble data

        if (chunks.length > 0) {
          const dataLength = chunks.length - CRLF.length
          if (dataLength < 0) throw Boom.badRequest('Preamble too short')

          if (!chunks.startsWith(CRLF, dataLength)) {
            throw Boom.badRequest('Preamble missing CRLF terminator')
          }

          if (dataLength) {
            const data = Buffer.concat(chunks.shift(dataLength))
            yield { type: 'preamble', data }
          }

          // Remove anything left (the "CRLF")
          chunks.shift(chunks.length)
        }

        // Process the rest
        break
      }
    }

    // Process fields and files
    while (true) {
      result = await generator.next()
      if (result.done) throw Boom.badRequest('Missing epilogue boundary')
      if (!result.value) throw Boom.badRequest('Incomplete multipart payload')

      const previousLength = chunks.length

      // Add the chunk to the buffer
      chunks.push(result.value)

      // If the chunks were previously empty, ignore any TAB & SPACE char right
      // after the boundary. Otherwise, the chunks were cleaned in an earlier
      // iteration.
      if (previousLength === 0) {
        const whiteSpaceCount = getLeadingWhiteSpaceCount(chunks)
        if (whiteSpaceCount) chunks.shift(whiteSpaceCount)
      }

      // We can't determine what we have until we have at least 2 bytes (CRLF or DASHDASH)
      if (chunks.length < 2) continue

      // At this point, we are after a boundary and we are looking for
      // an epilogue prefix ('--') or a new line (CRLF) and some headers.

      if (chunks.startsWith(CRLF)) {
        let start = CRLF.length
        let matchingPos = Nigel.horspool(chunks, HEADERS_NEEDLE, start)

        // Consume the generator until the end of headers (CRLF 2x) is found
        while (matchingPos === -1) {
          result = await generator.next()
          if (result.done) throw Boom.badRequest('Missing end boundary')
          if (!result.value) throw Boom.badRequest(`Unexpected boundary before part payload`)

          chunks.push(result.value)

          matchingPos = Nigel.horspool(chunks, HEADERS_NEEDLE, start)

          // No need to re-check in the bytes from previous iteration
          start = chunks.length - HEADERS_NEEDLE.length + 1
        }

        // Process headers
        const headersData = Buffer.concat(chunks.shift(matchingPos + HEADERS_NEEDLE.length))
        const headers = parseHeaders(
          headersData,
          CRLF.length,
          headersData.length - HEADERS_NEEDLE.length
        )

        if (!headers['content-disposition']) {
          throw Boom.badRequest('Missing content-disposition header')
        }

        const { name, filename } = Content.disposition(headers['content-disposition'])
        const transferEncoding = headers['content-transfer-encoding']?.toLowerCase()

        // At this point "chunks" (if not empty) contains initial payload data
        const payloadChunks = chunks.shift(chunks.length)

        if (filename !== undefined) {
          // FilePart. We will create (and yield) a stream that will consume the
          // generator untile the next boundary. The stream will only consume
          // the generator when it is consumed itself, allowing back pressure to
          // work.

          const stream: Duplex =
            transferEncoding === 'base64' ? new B64.Decoder() : new PassThrough()

          for (const bodyChunk of payloadChunks) stream.write(bodyChunk)

          // Start an async process to consume the generator. We must not
          // await before the stream is yield to avoid any deadlock.
          const consumePromise = consumeFile(generator, stream, filename)

          // We need to catch() synchronously to avoid uncaught promise
          // rejection. Also, we don't want/need "promises" to contain
          // rejected promises.
          const filePromise = consumePromise.catch(() => {})

          // Make sure we won't destroy the generator before the file is
          // consumed.
          promises.add(filePromise)

          yield { type: 'file', name, stream, filename, headers }

          // Do not consume more generator items until the current part has been
          // fully consumed. This will also, re-throw if the iterator did throw
          // during consumeFile.
          await consumePromise

          // Free memory
          promises.delete(filePromise)
        } else {
          // FieldPart
          while (true) {
            result = await generator.next()
            if (result.done) throw Boom.badRequest(`Missing end boundary after field '${name}'`)
            else if (result.value === null) break
            else payloadChunks.push(result.value)
          }

          const bytes = Buffer.concat(payloadChunks)
          const data = transferEncoding === 'base64' ? B64.decode(bytes) : bytes

          yield { type: 'field', headers, name, data }
        }

        continue // End of part. Let's fetch next one
      }

      if (chunks.startsWith(DASHDASH, 0)) {
        chunks.shift(DASHDASH.length)

        // Consume the rest of the generator
        while (true) {
          result = await generator.next()
          if (result.done) break
          if (result.value) chunks.push(result.value)
          else throw Boom.badRequest(`Epilogue should not contain a boundary`)
        }

        if (chunks.length) {
          if (chunks.startsWith(CRLF)) chunks.shift(CRLF.length)
          else throw Boom.badRequest('Missing CRLF after epilogue boundary')

          // Yield the remaining data as epilogue data
          if (chunks.length) {
            const data = Buffer.concat(chunks.shift(chunks.length))
            yield { type: 'epilogue', data }
          }
        }

        break
      }

      throw Boom.badRequest('Invalid characters after boundary')
    }
  } catch (err) {
    await generator.throw(err)
    throw err
  } finally {
    // If the caller of this method breaks the loop before the iterator is done,
    // we need to manually destroy the underlying iterator. But we will do so
    // only after the last yield file has been fully generated.

    await Promise.all([...promises])
    await generator.return(null)
  }
}

async function* multipartSplit(
  readable: Readable,
  boundary: string
): AsyncGenerator<Buffer | null> {
  const chunks = new Vise.Vise()
  const boundaryBuffer = Buffer.from(`--${boundary}`)

  const initialNeedle = Nigel.compile(boundaryBuffer)
  const followupNeedle = Nigel.compile(Buffer.concat([CRLF, boundaryBuffer]))

  let needle = initialNeedle

  for await (const chunk of readable) {
    chunks.push(chunk)

    const matchingPos = Nigel.horspool(chunks, needle)
    if (matchingPos !== -1) {
      // Yield everything before the needle
      yield Buffer.concat(chunks.shift(matchingPos))
      // Drop the needle from the internal buffer
      chunks.shift(needle.length)
      // Notify we got a match
      yield null
      // Use the to the followup needle from now on
      needle = followupNeedle
    } else {
      // Needle not found in chunks: Everything before the last
      // needle.length bytes can be yield
      const lengthSafeToYield = Math.max(0, chunks.length - needle.length)
      yield Buffer.concat(chunks.shift(lengthSafeToYield))
    }
  }

  // Yield the remaining chunks (which, by now, do not contain the needle)
  yield Buffer.concat(chunks.shift(chunks.length))
}

async function consumeFile(
  generator: AsyncGenerator<Buffer | null>,
  stream: Duplex,
  filename: string
): Promise<void> {
  try {
    let result: IteratorResult<Buffer | null>
    while (true) {
      result = await generator.next()

      if (result.done) {
        // A boundary is expected before the end of the multipart payload
        throw Boom.badRequest(`Missing end boundary after file '${filename}'`)
      } else if (stream.writable) {
        if (result.value === null) {
          stream.end()
        } else {
          const more = stream.write(result.value)
          if (!more) await isWritableOrFinished(stream)
        }
      } else {
        // Else the stream is not writable anymore, so we just consume (and
        // ignore) the data until the end of the file.
      }

      // We got a boundary, so we are done with this file
      if (result.value === null) break
    }
  } catch (err) {
    stream.destroy(err as Error)
    throw err
  }
}

// https://www.rfc-editor.org/rfc/rfc2616#section-4.2
export function parseHeaders(
  data: Buffer,
  start = 0,
  end = data.length
): Record<string, undefined | string> {
  const headers: Record<string, undefined | string> = Object.create(null)

  while (start < end) {
    let lineEnd = Nigel.horspool(data, HEADER_NEEDLE, start)

    // If the next line starts with a HTAB or SP, it's a continuation of the current line
    while (lineEnd !== -1 && (data[lineEnd + 2] === 0x09 || data[lineEnd + 2] === 0x20)) {
      lineEnd = Nigel.horspool(data, HEADER_NEEDLE, lineEnd + HEADER_NEEDLE.length + 1)
    }

    if (lineEnd === -1) throw Boom.badRequest('Missing CRLF terminator')
    if (lineEnd > end) throw Boom.badRequest('Header line too long')
    if (lineEnd < start + 1) throw Boom.badRequest('Empty header line')

    const line = data.toString('utf-8', start, lineEnd)
    const [name, value] = line.split(':', 2)

    if (value === undefined) throw Boom.badRequest('Missing colon in header line')
    if (name.length === 0) throw Boom.badRequest('Empty header name')
    if (name === '__proto__') throw Boom.badRequest('Invalid header name')
    // name must not contain control characters, spaces, or colons
    // eslint-disable-next-line no-control-regex
    if (name.match(/[\x00-\x1f\x7f\s]/)) throw Boom.badRequest('Invalid header name')

    const nameNormalized = name.toLowerCase()
    const valueNormalized = value.trim().replaceAll(/\r\n[\t ]+/g, ' ')

    headers[nameNormalized] =
      headers[nameNormalized] != null
        ? `${headers[nameNormalized]},${valueNormalized}`
        : valueNormalized

    start = lineEnd + HEADER_NEEDLE.length
  }

  return headers
}

export function getLeadingWhiteSpaceCount(chunks: Nigel.Haystack): number {
  let whiteSpaceCount = 0
  let charCode: number | undefined
  for (let i = 0; i < chunks.length; i++) {
    charCode = chunks.readUInt8(i)
    if (charCode === 0x09 || charCode === 0x20) whiteSpaceCount = i + 1
    else break
  }
  return whiteSpaceCount
}

export async function isWritableOrFinished(stream: Writable, signal?: AbortSignal): Promise<void> {
  if (signal?.aborted) return
  return new Promise<void>((resolve) => {
    const done = () => {
      resolve()
      cleanup()
      stream.off('drain', done)
      signal?.removeEventListener('abort', done)
    }

    const cleanup = finished(stream, { readable: false }, done)
    stream.on('drain', done)
    signal?.addEventListener('abort', done)
  })
}

This code can freely be used under the MIT licence.

matthieusieben avatar Jun 07 '23 07:06 matthieusieben