Is RxNetty + RxJava really doing NIO?
Hi I've been integrating RxJava into our microservice app. It's working nicely and now I'm getting concurrent/parallel calls to downstream services rather than sequential via Apache HTTP however I was curious to know if I am also achieving NIO but it seems that I'm not. I've tried the built in IO scheduler as well as a custom scheduler with a single thread pool size with RxJava but it seems that the thread is being blocked by the outbound call to the downstream service and therefore not freed up to process other inbound calls. I came across RxNetty hoping that it'd save me from having to code directly to Netty but it seems to be doing the same (perhaps it has it's own thread pool and is therefore unbound to the parent RxJava thread).
The crude Java unit test below attempts to demonstrate the issue. For this to work I just created a basic downstream Golang app running in Docker which has an endpoint which sleeps for 4 seconds (don't pick on my Go code, it's copy/pasted from the web :)). I suspect that the RxJava toBlocking() call chained to the HttpClient.newClient code is the cause. I'm now pulling apart the ReactiveLab to see if I can find any clues but perhaps one of you guys can spot a quick fix.
RxJavaIntegrationTest.java
import io.reactivex.netty.protocol.http.client.HttpClient;
import org.junit.Test;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.concurrent.Executors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class RxJavaIntegrationTest {
@Test
public void concurrencyTest() {
Date start = new Date();
Scheduler smallThreadPool = Schedulers.from(Executors.newFixedThreadPool(1));
Observable<String> callA = Observable.create((Subscriber<? super String> s) -> {
s.onNext(callSlowEndpoint("CallA"));
s.onCompleted();
}).subscribeOn(smallThreadPool);
Observable<String> callB = Observable.create((Subscriber<? super String> s) -> {
s.onNext(callSlowEndpoint("CallB"));
s.onCompleted();
}).subscribeOn(smallThreadPool);
String result = Observable.zip(callA, callB, (x, y) -> {
return x.toString() + ":" + y.toString();
}).toBlocking().single();
Date end = new Date();
assertEquals("Hello form GOpool-1-thread-1:Hello form GOpool-1-thread-1", result);
assertTrue("If both calls are done in parrellel it should only take 4s + a bit of load up time.", end.getTime() - start.getTime() < 6000);
}
public String callSlowEndpoint(String input) {
System.out.println("Starting call for: " + input + Thread.currentThread().getName());
String result = HttpClient.newClient("192.168.99.100", 9090)
//.enableWireLogging("test-client", LogLevel.ERROR)
.createGet("/?input=" + input)
//.doOnNext(resp -> logger.info(resp.toString()))
.flatMap(resp -> resp.getContent()
.map(bb -> bb.toString(Charset.defaultCharset())))
.toBlocking()
.single();
System.out.println("Ending call for: " + input + Thread.currentThread().getName());
return result + Thread.currentThread().getName();
}
}
Golong service
server.go
package main
import (
"fmt"
"net/http"
//"string"
"log"
"time"
)
func sayhelloName(w http.ResponseWriter, r *http.Request) {
r.ParseForm() // parse arguments, you have to call this by yourself
//fmt.Println(r.Form) // print form information in server side
//fmt.Println("path", r.URL.Path)
//fmt.Println("scheme", r.URL.Scheme)
//fmt.Println(r.Form["url_long"])
//for k, v := range r.Form {
//fmt.Println("key:", k)
//fmt.Println("val:", strings.Join(v, ""))
//}
fmt.Println("start", r.URL.Query()["input"])
time.Sleep(4000 * time.Millisecond)
fmt.Println("stop", r.URL.Query()["input"])
fmt.Fprintf(w, "Hello form GO") // send data to client side
}
func main() {
http.HandleFunc("/", sayhelloName) // set router
err := http.ListenAndServe(":9090", nil) // set listen port
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}
Dockerfile
FROM golang:1.6-onbuild
README.md
docker build -t my-golang-app .
docker run -it --rm -p 9090:9090 --name my-running-app my-golang-app
http://192.168.99.100:9090/?input=HelloWorld
I suspect that the RxJava toBlocking() call chained to the HttpClient.newClient code is the cause.
It is definitely something to avoid, see #481 for some suggestions.
To test the nio part, you want something like this. It's pure java/rnetty and shows nio in action. To make it a bit more interesting, the server streams the results back in chunks which lets you see the interleaving better.
This gives you output like this:
/9-0 [request received on rxnetty-nio-eventloop-1-4] [response buffered on RxIoScheduler-10] [response received on rxnetty-nio-eventloop-1-2 at 1184]
/2-0 [request received on rxnetty-nio-eventloop-1-8] [response buffered on RxIoScheduler-4] [response received on rxnetty-nio-eventloop-1-3 at 1184]
/3-0 [request received on rxnetty-nio-eventloop-1-2] [response buffered on RxIoScheduler-6] [response received on rxnetty-nio-eventloop-1-4 at 1184]
/4-0 [request received on rxnetty-nio-eventloop-1-7] [response buffered on RxIoScheduler-8] [response received on rxnetty-nio-eventloop-1-5 at 1184]
/5-0 [request received on rxnetty-nio-eventloop-1-6] [response buffered on RxIoScheduler-3] [response received on rxnetty-nio-eventloop-1-6 at 1184]
/6-0 [request received on rxnetty-nio-eventloop-1-1] [response buffered on RxIoScheduler-9] [response received on rxnetty-nio-eventloop-1-7 at 1184]
/7-0 [request received on rxnetty-nio-eventloop-1-3] [response buffered on RxIoScheduler-2] [response received on rxnetty-nio-eventloop-1-8 at 1184]
/8-0 [request received on rxnetty-nio-eventloop-1-5] [response buffered on RxIoScheduler-7] [response received on rxnetty-nio-eventloop-1-1 at 1184]
/10-0 [request received on rxnetty-nio-eventloop-1-5] [response buffered on RxIoScheduler-11] [response received on rxnetty-nio-eventloop-1-3 at 1184]
/1-0 [request received on rxnetty-nio-eventloop-1-4] [response buffered on RxIoScheduler-5] [response received on rxnetty-nio-eventloop-1-2 at 1185]
/4-1 [request received on rxnetty-nio-eventloop-1-7] [response buffered on RxIoScheduler-8] [response received on rxnetty-nio-eventloop-1-5 at 2166]
/3-1 [request received on rxnetty-nio-eventloop-1-2] [response buffered on RxIoScheduler-6] [response received on rxnetty-nio-eventloop-1-4 at 2166]
/5-1 [request received on rxnetty-nio-eventloop-1-6] [response buffered on RxIoScheduler-3] [response received on rxnetty-nio-eventloop-1-6 at 2166]
/6-1 [request received on rxnetty-nio-eventloop-1-1] [response buffered on RxIoScheduler-9] [response received on rxnetty-nio-eventloop-1-7 at 2166]
/7-1 [request received on rxnetty-nio-eventloop-1-3] [response buffered on RxIoScheduler-2] [response received on rxnetty-nio-eventloop-1-8 at 2166]
/8-1 [request received on rxnetty-nio-eventloop-1-5] [response buffered on RxIoScheduler-7] [response received on rxnetty-nio-eventloop-1-1 at 2166]
/1-1 [request received on rxnetty-nio-eventloop-1-4] [response buffered on RxIoScheduler-5] [response received on rxnetty-nio-eventloop-1-2 at 2167]
/2-1 [request received on rxnetty-nio-eventloop-1-8] [response buffered on RxIoScheduler-4] [response received on rxnetty-nio-eventloop-1-3 at 2167]
/9-1 [request received on rxnetty-nio-eventloop-1-4] [response buffered on RxIoScheduler-10] [response received on rxnetty-nio-eventloop-1-2 at 2169]
/10-1 [request received on rxnetty-nio-eventloop-1-5] [response buffered on RxIoScheduler-11] [response received on rxnetty-nio-eventloop-1-3 at 2169]
/10-2 [request received on rxnetty-nio-eventloop-1-5] [response buffered on RxIoScheduler-11] [response received on rxnetty-nio-eventloop-1-3 at 3167]
/1-2 [request received on rxnetty-nio-eventloop-1-4] [response buffered on RxIoScheduler-5] [response received on rxnetty-nio-eventloop-1-2 at 3168]
/5-2 [request received on rxnetty-nio-eventloop-1-6] [response buffered on RxIoScheduler-3] [response received on rxnetty-nio-eventloop-1-6 at 3168]
/6-2 [request received on rxnetty-nio-eventloop-1-1] [response buffered on RxIoScheduler-9] [response received on rxnetty-nio-eventloop-1-7 at 3167]
/7-2 [request received on rxnetty-nio-eventloop-1-3] [response buffered on RxIoScheduler-2] [response received on rxnetty-nio-eventloop-1-8 at 3167]
/8-2 [request received on rxnetty-nio-eventloop-1-5] [response buffered on RxIoScheduler-7] [response received on rxnetty-nio-eventloop-1-1 at 3168]
/9-2 [request received on rxnetty-nio-eventloop-1-4] [response buffered on RxIoScheduler-10] [response received on rxnetty-nio-eventloop-1-2 at 3168]
/3-2 [request received on rxnetty-nio-eventloop-1-2] [response buffered on RxIoScheduler-6] [response received on rxnetty-nio-eventloop-1-4 at 3168]
/4-2 [request received on rxnetty-nio-eventloop-1-7] [response buffered on RxIoScheduler-8] [response received on rxnetty-nio-eventloop-1-5 at 3168]
/2-2 [request received on rxnetty-nio-eventloop-1-8] [response buffered on RxIoScheduler-4] [response received on rxnetty-nio-eventloop-1-3 at 3169]
/6-3 [request received on rxnetty-nio-eventloop-1-1] [response buffered on RxIoScheduler-9] [response received on rxnetty-nio-eventloop-1-7 at 4165]
/5-3 [request received on rxnetty-nio-eventloop-1-6] [response buffered on RxIoScheduler-3] [response received on rxnetty-nio-eventloop-1-6 at 4171]
/1-3 [request received on rxnetty-nio-eventloop-1-4] [response buffered on RxIoScheduler-5] [response received on rxnetty-nio-eventloop-1-2 at 4171]
/2-3 [request received on rxnetty-nio-eventloop-1-8] [response buffered on RxIoScheduler-4] [response received on rxnetty-nio-eventloop-1-3 at 4171]
/7-3 [request received on rxnetty-nio-eventloop-1-3] [response buffered on RxIoScheduler-2] [response received on rxnetty-nio-eventloop-1-8 at 4171]
/8-3 [request received on rxnetty-nio-eventloop-1-5] [response buffered on RxIoScheduler-7] [response received on rxnetty-nio-eventloop-1-1 at 4171]
/3-3 [request received on rxnetty-nio-eventloop-1-2] [response buffered on RxIoScheduler-6] [response received on rxnetty-nio-eventloop-1-4 at 4173]
/4-3 [request received on rxnetty-nio-eventloop-1-7] [response buffered on RxIoScheduler-8] [response received on rxnetty-nio-eventloop-1-5 at 4173]
/10-3 [request received on rxnetty-nio-eventloop-1-5] [response buffered on RxIoScheduler-11] [response received on rxnetty-nio-eventloop-1-3 at 4180]
/9-3 [request received on rxnetty-nio-eventloop-1-4] [response buffered on RxIoScheduler-10] [response received on rxnetty-nio-eventloop-1-2 at 4180]
@coolbananas118 RxNetty, by default, does not introduce any new threadpools over the eventloops of netty. In order to do any work out of the eventloops, just as you are doing, use an RxJava scheduler.
In your example, you do not need .toBlocking().single() at every step. You can modify callSlowEndpoint to return Observable<String>, then remove Observable.create() and use callSlowEndpoint return value instead. You can just use .toBlocking on the zipped observable of two outbound calls. By doing this, both the outbound calls will be running in parallel.
@NiteshKant : I think we should close this issue to avoid confusion regarding true NIO support of RxNetty.