undici icon indicating copy to clipboard operation
undici copied to clipboard

feat: add decompress interceptor

Open adrianfalleiro opened this issue 1 year ago • 5 comments

Added a new interceptor to decompress the response body. I lifted some of the implementation from the way decompression is handled in the fetch client

This relates to...

  • https://github.com/nodejs/undici/issues/3255

Rationale

  • See discussion on https://github.com/nodejs/undici/discussions/3253#discussion-6660897

Changes

  • Adds a response interceptor to decompress gzip, brotli and deflate responses

Features

Bug Fixes

Breaking Changes and Deprecations

Status

  • [x] I have read and agreed to the Developer's Certificate of Origin
  • [x] Tested
  • [ ] Benchmarked (optional)
  • [ ] Documented
  • [x] Review ready
  • [ ] In review
  • [ ] Merge ready

adrianfalleiro avatar May 18 '24 17:05 adrianfalleiro

Should we also handle rawdeflate?

Uzlopak avatar May 18 '24 19:05 Uzlopak

Should we also handle rawdeflate?

Yep, I'm using the existing createInflate() util from the fetch client which handles both deflate and deflateRaw so it should handle both already. I'll add a test.

adrianfalleiro avatar May 18 '24 20:05 adrianfalleiro

Last piece for this to work for me with fetch. Fetch does some sort of back pressuring.

const wrappedResume = () => {
  if (this.#inputStream) {
    this.#inputStream.resume();
  }
  return resume();
};

return this.#handler.onHeaders!(statusCode, newRawHeaders, wrappedResume, statusText);

Full onHeaders

onHeaders(statusCode: number, rawHeaders: Buffer[], resume: () => void, statusText: string) {
  const parsedHeaders = util.parseHeaders(rawHeaders);
  const contentEncoding = parsedHeaders['content-encoding'] as string | undefined;
  const requestEncodings = contentEncoding ? contentEncoding.split(',').map((e) => e.trim().toLowerCase()) : [];

  const { method } = this.#opts;

  // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
  if (
    requestEncodings.length !== 0 &&
    method !== 'HEAD' &&
    method !== 'CONNECT' &&
    !nullBodyStatus.includes(statusCode)
  ) {
    const decoders: Transform[] = [];
    for (let i = requestEncodings.length - 1; i >= 0; --i) {
      const requestEncoding = requestEncodings[i];
      // https://www.rfc-editor.org/rfc/rfc9112.html#section-7.2
      if (requestEncoding === 'x-gzip' || requestEncoding === 'gzip') {
        decoders.push(
          zlib.createGunzip({
            // Be less strict when decoding compressed responses, since sometimes
            // servers send slightly invalid responses that are still accepted
            // by common browsers.
            // Always using Z_SYNC_FLUSH is what cURL does.
            flush: zlib.constants.Z_SYNC_FLUSH,
            finishFlush: zlib.constants.Z_SYNC_FLUSH,
          }),
        );
      } else if (requestEncoding === 'deflate') {
        throw new NotImplementedException('deflate is not supported');
      } else if (requestEncoding === 'br') {
        decoders.push(zlib.createBrotliDecompress());
      } else {
        decoders.length = 0;
        break;
      }
    }

    if (decoders.length !== 0) {
      const [firstDecoder, ...restDecoders] = decoders;
      this.#inputStream = firstDecoder;
      this.#inputStream.on('drain', () => {
        if (this.#inputStream) {
          this.#inputStream.resume();
        }
      });
      let outputStream = firstDecoder;

      if (restDecoders.length !== 0) {
        // eslint-disable-next-line @typescript-eslint/ban-ts-comment
        // @ts-expect-error
        outputStream = pipeline(this.#inputStream, ...restDecoders, this.#onDecompressStreamFinished.bind(this));
      } else {
        finished(this.#inputStream, this.#onDecompressStreamFinished.bind(this));
      }

      outputStream.on('data', (chunk) => {
        if (!this.#handler.onData!(chunk)) {
          if (this.#inputStream) {
            this.#inputStream.pause();
          }
        }
      });
    }
  }

  delete parsedHeaders['content-encoding'];
  const newRawHeaders = Object.entries(parsedHeaders)
    .map(([key, value]) => {
      if (Array.isArray(value)) {
        return value.map((v) => [key, v]).flat();
      } else {
        return [key, value];
      }
    })
    .flat()
    .map((v) => Buffer.from(v));

  const wrappedResume = () => {
    if (this.#inputStream) {
      this.#inputStream.resume();
    }
    return resume();
  };

  return this.#handler.onHeaders!(statusCode, newRawHeaders, wrappedResume, statusText);
}

tjhiggins avatar Jan 28 '25 16:01 tjhiggins

@tjhiggins Do you want to collaborate on this PR with me? I can give you commit access to the base branch.

I think the interceptor interface has changed a little since I opened this PR. I can work on ensuring it works with the latest version of Undici.

Once I've done that you, do you want to add the changes above for onHeaders and backpressure? Sound good?

adrianfalleiro avatar Jan 29 '25 15:01 adrianfalleiro

@tjhiggins Do you want to collaborate on this PR with me? I can give you commit access to the base branch.

I think the interceptor interface has changed a little since I opened this PR. I can work on ensuring it works with the latest version of Undici.

Once I've done that you, do you want to add the changes above for onHeaders and backpressure? Sound good?

I only just saw the new hooks for v7. Fetch does decompress, but after the interceptors in v6. Maybe with the new hook onResponseData I don't even need an interceptor. I'll wait for you to update before making any changes. Not in a huge hurry to spend more time on this - now that I have it working for v6.

tjhiggins avatar Jan 29 '25 16:01 tjhiggins

I basically forgot this PR1

We have now decompressInterceptor, but we have to tackle the streaming properly.

Uzlopak avatar Aug 21 '25 09:08 Uzlopak