RxNetty icon indicating copy to clipboard operation
RxNetty copied to clipboard

Is RxNetty + RxJava really doing NIO?

Open coolbananas118 opened this issue 9 years ago • 4 comments

Hi I've been integrating RxJava into our microservice app. It's working nicely and now I'm getting concurrent/parallel calls to downstream services rather than sequential via Apache HTTP however I was curious to know if I am also achieving NIO but it seems that I'm not. I've tried the built in IO scheduler as well as a custom scheduler with a single thread pool size with RxJava but it seems that the thread is being blocked by the outbound call to the downstream service and therefore not freed up to process other inbound calls. I came across RxNetty hoping that it'd save me from having to code directly to Netty but it seems to be doing the same (perhaps it has it's own thread pool and is therefore unbound to the parent RxJava thread).

The crude Java unit test below attempts to demonstrate the issue. For this to work I just created a basic downstream Golang app running in Docker which has an endpoint which sleeps for 4 seconds (don't pick on my Go code, it's copy/pasted from the web :)). I suspect that the RxJava toBlocking() call chained to the HttpClient.newClient code is the cause. I'm now pulling apart the ReactiveLab to see if I can find any clues but perhaps one of you guys can spot a quick fix.

RxJavaIntegrationTest.java

import io.reactivex.netty.protocol.http.client.HttpClient;
import org.junit.Test;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.schedulers.Schedulers;

import java.nio.charset.Charset;
import java.util.Date;
import java.util.concurrent.Executors;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class RxJavaIntegrationTest {

    @Test
    public void concurrencyTest() {
        Date start = new Date();

        Scheduler smallThreadPool = Schedulers.from(Executors.newFixedThreadPool(1));

        Observable<String> callA = Observable.create((Subscriber<? super String> s) -> {
            s.onNext(callSlowEndpoint("CallA"));
            s.onCompleted();
        }).subscribeOn(smallThreadPool);

        Observable<String> callB = Observable.create((Subscriber<? super String> s) -> {
            s.onNext(callSlowEndpoint("CallB"));
            s.onCompleted();
        }).subscribeOn(smallThreadPool);

        String result = Observable.zip(callA, callB, (x, y) -> {
            return x.toString() + ":" + y.toString();
        }).toBlocking().single();

        Date end = new Date();
        assertEquals("Hello form GOpool-1-thread-1:Hello form GOpool-1-thread-1", result);
        assertTrue("If both calls are done in parrellel it should only take 4s + a bit of load up time.", end.getTime() - start.getTime() < 6000);
    }

    public String callSlowEndpoint(String input) {

        System.out.println("Starting call for: " + input + Thread.currentThread().getName());

        String result = HttpClient.newClient("192.168.99.100", 9090)
                //.enableWireLogging("test-client", LogLevel.ERROR)
                .createGet("/?input=" + input)
                        //.doOnNext(resp -> logger.info(resp.toString()))
                .flatMap(resp -> resp.getContent()
                        .map(bb -> bb.toString(Charset.defaultCharset())))
                .toBlocking()
                .single();

        System.out.println("Ending call for: " + input + Thread.currentThread().getName());
        return result + Thread.currentThread().getName();
    }

}

Golong service

server.go

package main

import (
    "fmt"
    "net/http"
    //"string"
    "log"
    "time"
)

func sayhelloName(w http.ResponseWriter, r *http.Request) {
    r.ParseForm()  // parse arguments, you have to call this by yourself
    //fmt.Println(r.Form)  // print form information in server side
    //fmt.Println("path", r.URL.Path)
    //fmt.Println("scheme", r.URL.Scheme)
    //fmt.Println(r.Form["url_long"])
    //for k, v := range r.Form {
        //fmt.Println("key:", k)
        //fmt.Println("val:", strings.Join(v, ""))
    //}
    fmt.Println("start", r.URL.Query()["input"])
    time.Sleep(4000 * time.Millisecond)
    fmt.Println("stop", r.URL.Query()["input"])
    fmt.Fprintf(w, "Hello form GO") // send data to client side
}

func main() {
    http.HandleFunc("/", sayhelloName) // set router
    err := http.ListenAndServe(":9090", nil) // set listen port
    if err != nil {
        log.Fatal("ListenAndServe: ", err)
    }
}

Dockerfile

FROM golang:1.6-onbuild

README.md

docker build -t my-golang-app .
docker run -it --rm -p 9090:9090 --name my-running-app my-golang-app
http://192.168.99.100:9090/?input=HelloWorld

coolbananas118 avatar Aug 03 '16 11:08 coolbananas118

I suspect that the RxJava toBlocking() call chained to the HttpClient.newClient code is the cause.

It is definitely something to avoid, see #481 for some suggestions.

brharrington avatar Aug 03 '16 12:08 brharrington

To test the nio part, you want something like this. It's pure java/rnetty and shows nio in action. To make it a bit more interesting, the server streams the results back in chunks which lets you see the interleaving better.

This gives you output like this:

/9-0 [request received on rxnetty-nio-eventloop-1-4] [response buffered on RxIoScheduler-10] [response received on rxnetty-nio-eventloop-1-2 at 1184]
/2-0 [request received on rxnetty-nio-eventloop-1-8] [response buffered on RxIoScheduler-4] [response received on rxnetty-nio-eventloop-1-3 at 1184]
/3-0 [request received on rxnetty-nio-eventloop-1-2] [response buffered on RxIoScheduler-6] [response received on rxnetty-nio-eventloop-1-4 at 1184]
/4-0 [request received on rxnetty-nio-eventloop-1-7] [response buffered on RxIoScheduler-8] [response received on rxnetty-nio-eventloop-1-5 at 1184]
/5-0 [request received on rxnetty-nio-eventloop-1-6] [response buffered on RxIoScheduler-3] [response received on rxnetty-nio-eventloop-1-6 at 1184]
/6-0 [request received on rxnetty-nio-eventloop-1-1] [response buffered on RxIoScheduler-9] [response received on rxnetty-nio-eventloop-1-7 at 1184]
/7-0 [request received on rxnetty-nio-eventloop-1-3] [response buffered on RxIoScheduler-2] [response received on rxnetty-nio-eventloop-1-8 at 1184]
/8-0 [request received on rxnetty-nio-eventloop-1-5] [response buffered on RxIoScheduler-7] [response received on rxnetty-nio-eventloop-1-1 at 1184]
/10-0 [request received on rxnetty-nio-eventloop-1-5] [response buffered on RxIoScheduler-11] [response received on rxnetty-nio-eventloop-1-3 at 1184]
/1-0 [request received on rxnetty-nio-eventloop-1-4] [response buffered on RxIoScheduler-5] [response received on rxnetty-nio-eventloop-1-2 at 1185]
/4-1 [request received on rxnetty-nio-eventloop-1-7] [response buffered on RxIoScheduler-8] [response received on rxnetty-nio-eventloop-1-5 at 2166]
/3-1 [request received on rxnetty-nio-eventloop-1-2] [response buffered on RxIoScheduler-6] [response received on rxnetty-nio-eventloop-1-4 at 2166]
/5-1 [request received on rxnetty-nio-eventloop-1-6] [response buffered on RxIoScheduler-3] [response received on rxnetty-nio-eventloop-1-6 at 2166]
/6-1 [request received on rxnetty-nio-eventloop-1-1] [response buffered on RxIoScheduler-9] [response received on rxnetty-nio-eventloop-1-7 at 2166]
/7-1 [request received on rxnetty-nio-eventloop-1-3] [response buffered on RxIoScheduler-2] [response received on rxnetty-nio-eventloop-1-8 at 2166]
/8-1 [request received on rxnetty-nio-eventloop-1-5] [response buffered on RxIoScheduler-7] [response received on rxnetty-nio-eventloop-1-1 at 2166]
/1-1 [request received on rxnetty-nio-eventloop-1-4] [response buffered on RxIoScheduler-5] [response received on rxnetty-nio-eventloop-1-2 at 2167]
/2-1 [request received on rxnetty-nio-eventloop-1-8] [response buffered on RxIoScheduler-4] [response received on rxnetty-nio-eventloop-1-3 at 2167]
/9-1 [request received on rxnetty-nio-eventloop-1-4] [response buffered on RxIoScheduler-10] [response received on rxnetty-nio-eventloop-1-2 at 2169]
/10-1 [request received on rxnetty-nio-eventloop-1-5] [response buffered on RxIoScheduler-11] [response received on rxnetty-nio-eventloop-1-3 at 2169]
/10-2 [request received on rxnetty-nio-eventloop-1-5] [response buffered on RxIoScheduler-11] [response received on rxnetty-nio-eventloop-1-3 at 3167]
/1-2 [request received on rxnetty-nio-eventloop-1-4] [response buffered on RxIoScheduler-5] [response received on rxnetty-nio-eventloop-1-2 at 3168]
/5-2 [request received on rxnetty-nio-eventloop-1-6] [response buffered on RxIoScheduler-3] [response received on rxnetty-nio-eventloop-1-6 at 3168]
/6-2 [request received on rxnetty-nio-eventloop-1-1] [response buffered on RxIoScheduler-9] [response received on rxnetty-nio-eventloop-1-7 at 3167]
/7-2 [request received on rxnetty-nio-eventloop-1-3] [response buffered on RxIoScheduler-2] [response received on rxnetty-nio-eventloop-1-8 at 3167]
/8-2 [request received on rxnetty-nio-eventloop-1-5] [response buffered on RxIoScheduler-7] [response received on rxnetty-nio-eventloop-1-1 at 3168]
/9-2 [request received on rxnetty-nio-eventloop-1-4] [response buffered on RxIoScheduler-10] [response received on rxnetty-nio-eventloop-1-2 at 3168]
/3-2 [request received on rxnetty-nio-eventloop-1-2] [response buffered on RxIoScheduler-6] [response received on rxnetty-nio-eventloop-1-4 at 3168]
/4-2 [request received on rxnetty-nio-eventloop-1-7] [response buffered on RxIoScheduler-8] [response received on rxnetty-nio-eventloop-1-5 at 3168]
/2-2 [request received on rxnetty-nio-eventloop-1-8] [response buffered on RxIoScheduler-4] [response received on rxnetty-nio-eventloop-1-3 at 3169]
/6-3 [request received on rxnetty-nio-eventloop-1-1] [response buffered on RxIoScheduler-9] [response received on rxnetty-nio-eventloop-1-7 at 4165]
/5-3 [request received on rxnetty-nio-eventloop-1-6] [response buffered on RxIoScheduler-3] [response received on rxnetty-nio-eventloop-1-6 at 4171]
/1-3 [request received on rxnetty-nio-eventloop-1-4] [response buffered on RxIoScheduler-5] [response received on rxnetty-nio-eventloop-1-2 at 4171]
/2-3 [request received on rxnetty-nio-eventloop-1-8] [response buffered on RxIoScheduler-4] [response received on rxnetty-nio-eventloop-1-3 at 4171]
/7-3 [request received on rxnetty-nio-eventloop-1-3] [response buffered on RxIoScheduler-2] [response received on rxnetty-nio-eventloop-1-8 at 4171]
/8-3 [request received on rxnetty-nio-eventloop-1-5] [response buffered on RxIoScheduler-7] [response received on rxnetty-nio-eventloop-1-1 at 4171]
/3-3 [request received on rxnetty-nio-eventloop-1-2] [response buffered on RxIoScheduler-6] [response received on rxnetty-nio-eventloop-1-4 at 4173]
/4-3 [request received on rxnetty-nio-eventloop-1-7] [response buffered on RxIoScheduler-8] [response received on rxnetty-nio-eventloop-1-5 at 4173]
/10-3 [request received on rxnetty-nio-eventloop-1-5] [response buffered on RxIoScheduler-11] [response received on rxnetty-nio-eventloop-1-3 at 4180]
/9-3 [request received on rxnetty-nio-eventloop-1-4] [response buffered on RxIoScheduler-10] [response received on rxnetty-nio-eventloop-1-2 at 4180]

jamesgorman2 avatar Aug 05 '16 08:08 jamesgorman2

@coolbananas118 RxNetty, by default, does not introduce any new threadpools over the eventloops of netty. In order to do any work out of the eventloops, just as you are doing, use an RxJava scheduler.

In your example, you do not need .toBlocking().single() at every step. You can modify callSlowEndpoint to return Observable<String>, then remove Observable.create() and use callSlowEndpoint return value instead. You can just use .toBlocking on the zipped observable of two outbound calls. By doing this, both the outbound calls will be running in parallel.

NiteshKant avatar Aug 14 '16 11:08 NiteshKant

@NiteshKant : I think we should close this issue to avoid confusion regarding true NIO support of RxNetty.

mithunsasidharan avatar Jun 14 '17 09:06 mithunsasidharan