RxNetty icon indicating copy to clipboard operation
RxNetty copied to clipboard

ReactiveStreams support

Open NiteshKant opened this issue 9 years ago • 9 comments

There have been some discussions in #565 and #561 around upgrading RxJava to 2.x in RxNetty. I would like to spin that into the following:

Make RxNetty stick to exposing ReactiveStreams interfaces from the public APIs and use RxJava 2.x internally.

What?

RxJava becomes an implementation detail for RxNetty. Most of the users of RxNetty would not directly see RxJava in the public APIs. Users will just use ReactiveStreams interfaces and when they need function composition, they can use any function composition library.

Why?

  • Gives the option of shading RxJava in RxNetty and hence decoupling applications using a different RxJava version.
  • Option to switch to a different function composition library in the future, if and when one is available and interesting.

None of the above is a very strong reason to absolutely do this change but there is no strong reason to go the other way either (use RxJava classes in public API).

Thoughts?

NiteshKant avatar Nov 08 '16 20:11 NiteshKant

@jamesgorman2 Let me know if you are interested in giving this a shot or have thoughts on this.

NiteshKant avatar Nov 09 '16 17:11 NiteshKant

A maxim from one of my FP buddies was emit the most permissive type you can and accept the most restrictive. Following this it might make sense to return Flowables (and pass them to handlers) in place of Observables, and to accept Publishers for the same. This should let anyone using another streams library use any ReactiveStreams converters they like but give native RxJava support out of the box.

From a pragmatic point:

  • RxJava is already loaded in the JVM hiding it doesn't save anything here;
  • Using RxJava 1 components wouldn't be too bad if only Flowable is exposed in the API. Exposing Observable in particular would be mean one has be fully qualified which is less nice. That said, the new namespacing means no collisions so it's not a critical problem;
  • this sets up for following the RxJava interop recommendations but gives you RxJava 2 out of the box. Adding a core, then a separate RxJava 2 wrapper feels like a bit of needless indirection (at least at this point) and the nature of the RxNetty API makes this a pain;
  • it does make a statement that RxJava is the prefered way to deal with RxNetty. I'm not sure what this means, maybe nothing at all.

jamesgorman2 avatar Nov 09 '16 19:11 jamesgorman2

@jamesgorman2 thanks for the feedback!

I understand the reasoning for the mix of Publisher and Flowable but my preference is to be consistent since these are two different libraries (ReactiveStreams and RxJava).

In order to convert a Publisher to Flowable, this is the code required:

Flowable.fromPublisher(publisher);

The implementation of which is to just cast the publisher to Flowable if it is already a Flowable instance. So, it isn't even allocating an object in all cases when the user is using RxJava2.

RxJava is already loaded in the JVM

It's not a big deal but there are cases where shading RxJava may make sense if there are various versions of RxJava2 that exists in the app and there is a backward incompatible change to RxJava (highly unlikely but possible)

if only Flowable is exposed in the API.

Yes, we will only use Flowable as we always require backpressure.

Adding a core, then a separate RxJava 2 wrapper feels like a bit of needless indirection

No I do not intend to do that at all. Users would use the snippet I mentioned above to convert to RxJava2.

NiteshKant avatar Nov 10 '16 18:11 NiteshKant

OK, so it looks like the main difference in opinion boils down to what it means to expose (a) only Publisher, (b) only Flowable, or (c) both. I have a personal preference for (b) or (C), but maybe good to put some examples and pros and cons. I suspect I'm less concerned about (c) because Publisher is already in the type def for Flowable, but happy to be convinced to (a) or (b).

Here's my first attempt at a comparison, more pros and cons might be helpful:

A

Pros

  • easy to use preferred reactive streams library
  • single streams API package (ie more coherent API)

Cons

  • have to call, eg, Flowable.fromPublisher(publisher); all the time to get things going - users have to make a choice about how they are going to manipulate streams
  • might get gnarly trying to keep RxJava out with some of the deeper API types that take Subscriber, OnSubscribe, etc (eg ContentSource)
public interface RequestHandler<I, O> {
    Publisher<Void> handle(HttpServerRequest<I> request, HttpServerResponse<O> response);
}

public final class ContentSource<T> extends Publisher<T> {
  ...
}

public abstract class ResponseContentWriter<C> extends Publisher<Void> {
  ...
  public abstract ResponseContentWriter<C> write(? extends Publisher<C> msgs);
  public abstract <T extends TrailingHeaders> Publisher<Void> write(? extends Publisher<C> contentSource,
                                                                    Func0<T> trailerFactory,
                                                                    Func2<T, C, T> trailerMutator,
                                                                    Func1<C, Boolean> flushSelector);
  ...
}

B

Pros

  • easy to start manipulating the stream (ie RxJava out of the box), users don't have to make a choice about how to manipulate streams
  • single streams API package makes it clearer how this works (ie more coherent API)

Cons

  • extra layer to use prefered streams library when returning to RxNetty (Flowable.fromPublisher(subTypeOfPublisher);)
public interface RequestHandler<I, O> {
    Flowable<Void> handle(HttpServerRequest<I> request, HttpServerResponse<O> response);
}

public final class ContentSource<T> extends Flowable<T> {
  ...
}

public abstract class ResponseContentWriter<C> extends Flowable<Void> {
  ...
  public abstract ResponseContentWriter<C> write(? extends Flowable<C> msgs);
  public abstract <T extends TrailingHeaders> Flowable<Void> write(? extends Flowable<C> contentSource,
                                                                   Func0<T> trailerFactory,
                                                                   Func2<T, C, T> trailerMutator,
                                                                   Func1<C, Boolean> flushSelector);
  ...
}

C

Pros

  • same ease of use of RxJava as (B) - ie 'it just works'
  • same ease of use of other streams implementations as (A) - ie have to call X.fromPublisher(flowable) on the way in, but don't have to call Flowable.fromPublisher(subTypeOfPublisher); on the way out

Cons

  • two streams API packages is congnitive overhead, may be unclear to users
  • two streams API packages forces more imports
  • might get gnarly trying to keep RxJava out with some of the deeper API types that take Subscriber, OnSubscribe, etc (eg ContentSource)
public interface RequestHandler<I, O> {
    Publisher<Void> handle(HttpServerRequest<I> request, HttpServerResponse<O> response);
}

public final class ContentSource<T> extends Flowable<T> {
  ...
}

public abstract class ResponseContentWriter<C> extends Flowable<Void> {
  ...
  public abstract ResponseContentWriter<C> write(? extends Publisher<C> msgs);
  public abstract <T extends TrailingHeaders> Flowable<Void> write(? extends Publisher<C> contentSource,
                                                                   Func0<T> trailerFactory,
                                                                   Func2<T, C, T> trailerMutator,
                                                                   Func1<C, Boolean> flushSelector);
  ...
}

jamesgorman2 avatar Nov 10 '16 19:11 jamesgorman2

@jamesgorman2 apologies for the delayed response. I think a good way is to start doing these changes may be using Flowable in the APIs and then at a later point we can see if it makes sense to change it to Publisher.

NiteshKant avatar Nov 17 '16 06:11 NiteshKant

Sounds like a reasonable place to start.

@NiteshKant do you have a particular engagement model in mind for this?

jamesgorman2 avatar Nov 17 '16 08:11 jamesgorman2

@jamesgorman2 I created a branch for these changes. Probably take one module at a time and start changing stuff 😄

NiteshKant avatar Nov 17 '16 19:11 NiteshKant

Sounds good @NiteshKant ,

jamesgorman2 avatar Nov 19 '16 00:11 jamesgorman2

This issue hasn't seen an update in a few years. Would someone be willing to provide an update? I am interested in using RxNetty in an existing project that uses RxJava2 and would prefer not to bring in RxJava (1).

Thanks!

kevinteg avatar Mar 09 '21 19:03 kevinteg