spring-cloud-gateway icon indicating copy to clipboard operation
spring-cloud-gateway copied to clipboard

Use part of the request body for routing, but don't load the whole request into RAM

Open bergerst opened this issue 4 years ago • 7 comments

Is your feature request related to a problem? Please describe. I have a case where I want to read the first X bytes from the request and route to a different host depending on the value read.

I know that you can accomplish what I'm doing by caching the request using ServerWebExchangeUtils.cacheRequestBody() and then reading the bytes from the cached DataBuffer.

But in my case, multiple HTTP clients could send requests with 250 MB of data or more. If I don't limit the DirectMemory, the process will be killed by my cgroup RAM limit. If I limit the DirectMemory, new requests will get HTTP 500 responses stating that there's not enough memory.

I tried to solve the problem by using DataBufferUtils.takeUntilByteCount() without caching, but then either the request is forwarded without a body since the Producer of the data already sent everything or an error is thrown that 2 subscribers can't listen to the same request Flux.

Describe the solution you'd like This probably goes against the principles of reactive programming, but is there some way to stop the request producer midway and to produce a new Flux which repeats the first few DataBuffers which were read in the custom GatewayFilter and then adds the rest of the outstanding DataBuffers?

Additional context I've read about ModifyRequestBodyGatewayFilterFactory and ReadBodyPredicateFactory and both of them rely on reading the whole request body into an Object.

bergerst avatar Nov 18 '21 12:11 bergerst

What I did to solve my problem for now is to forward anything from the request to a Subscriber with a Sinks.many().multicast().onBackpressureBuffer(3000) inside. Then I override the request Flux with sink.asFlux() in a ServerHttpRequestDecorator.

The Subscriber.onNext() reads DataBuffers until it has enough information to determine where the request should be forwarded to and then emits that info in a Sinks.one().

If the buffer size is too small, sink.tryEmitNext() will lead to EmitResult.FAIL_ZERO_SUBSCRIBER or EmitResult.FAIL_OVERFLOW. It seems like the MonoSendMany doesn't read anything from the queue until a certain event happens.

At this point of time, I have no idea what it's waiting for. Thread.sleep() and then emitting the value again in the Subscriber does not change anything. It will repeatedly return one of the EmitResults mentioned above in an endless loop.

bergerst avatar Dec 01 '21 10:12 bergerst

I have encountered similar problems, I think there is a better solution, here is my code:

import java.util.List;
import java.util.function.Function;

import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage;
import org.springframework.cloud.gateway.support.BodyInserterContext;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class DemoFilter implements GatewayFilter {

    // set a reasonable value
    private static final int BYTE_COUNT = 1024;

    private final List<HttpMessageReader<?>> messageReaders;

    public DemoFilter() {
        messageReaders = HandlerStrategies.withDefaults().messageReaders();
    }

    private String getHost(String str) {
        // implement this method
        return "host based on str";
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);

        final byte[] first1024Bytes = new byte[BYTE_COUNT];

        Flux<DataBuffer> modifiedBody = serverRequest.bodyToFlux(DataBuffer.class).map(new Function<DataBuffer, DataBuffer>() {
            int readByteCount = 0;

            @Override
            public DataBuffer apply(DataBuffer dataBuffer) {
                if (readByteCount < BYTE_COUNT) {
                    int rawPosition = dataBuffer.readPosition();
                    int before = dataBuffer.readableByteCount();
                    dataBuffer.read(first1024Bytes, readByteCount, Math.min(dataBuffer.readableByteCount(), BYTE_COUNT - readByteCount));
                    int after = dataBuffer.readableByteCount();
                    readByteCount += before - after;
                    // recover dataBuffer!
                    dataBuffer.readPosition(rawPosition);
                }
                return dataBuffer;
            }
        });

        final Mono<Object> mono1 = Mono.defer(() -> {
            exchange.getAttributes().put("newHost", getHost(new String(first1024Bytes)));
            // latter filters can use the "newHost" attribute to do something
            return Mono.empty();
        });

        BodyInserter<Flux<DataBuffer>, ReactiveHttpOutputMessage> bodyInserter = BodyInserters.fromPublisher(modifiedBody, DataBuffer.class);
        HttpHeaders headers = new HttpHeaders();
        headers.putAll(exchange.getRequest().getHeaders());

        CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
        final Mono<Void> mono2 = bodyInserter.insert(outputMessage, new BodyInserterContext())
                .then(Mono.defer(() -> {
                    ServerHttpRequest decorator = decorate(exchange, headers, outputMessage);
                    return chain.filter(exchange.mutate().request(decorator).build());
                })).onErrorResume((Function<Throwable, Mono<Void>>) Mono::error);

        return modifiedBody.then(mono1).then(mono2);
    }

    ServerHttpRequestDecorator decorate(
            ServerWebExchange exchange, HttpHeaders headers,
            CachedBodyOutputMessage outputMessage
    ) {
        return new ServerHttpRequestDecorator(exchange.getRequest()) {
            @Override
            public HttpHeaders getHeaders() {
                long contentLength = headers.getContentLength();
                HttpHeaders httpHeaders = new HttpHeaders();
                httpHeaders.putAll(headers);
                if (contentLength > 0) {
                    httpHeaders.setContentLength(contentLength);
                } else {
                    httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
                }
                return httpHeaders;
            }

            @Override
            public Flux<DataBuffer> getBody() {
                return outputMessage.getBody();
            }
        };
    }

}

tmzk1005 avatar Dec 16 '21 07:12 tmzk1005

@tmzk1005 Your modifiedBody.then(mono1) waits until the whole request is done, which is the same as caching the request.

At that point, using ServerWebExchangeUtils.cacheRequestBody() and then reading the bytes from exchange.getAttribute(CACHED_REQUEST_BODY_ATTR) would achieve the same result.

Set -XX:MaxDirectMemorySize=150M and send a 200 MB request to your application. It will throw an out of memory error for Direct Memory.

bergerst avatar Dec 16 '21 13:12 bergerst

@bergerst Yes, I was wrong. I test my application as you suggested and got an out of memory error for Direct Memory.

I wonder Method windowUntil of Flux may help solve this problem, but I did not figure out a perfect solution.

tmzk1005 avatar Dec 17 '21 08:12 tmzk1005

Not sure this is something I want to support given the complexity.

spencergibb avatar Mar 02 '22 18:03 spencergibb

We are fighting with SOAP in Spring Cloud and have similar issues. To detect the SOAP Operation or read the SOAP security header, we need to read part of the message. At a certain point we have all information and could let the rest of the body (several MB) stream through. I would love to see an example. The documentation is currently sparse.

robert-gdv avatar May 23 '22 21:05 robert-gdv

@robert-gdv The SOAP action can be read from the Content-Type HTTP header. It should look like this, normally: Content-Type: application/soap+xml;action="urn:ihe:iti:2007:ProvideAndRegisterDocumentSet-b"

The action part of the Content-Type is optional, though. So you have to make sure the client actually sends it. That still doesn't solve the Security element part, though.

bergerst avatar May 24 '22 06:05 bergerst