stan.java icon indicating copy to clipboard operation
stan.java copied to clipboard

Reactive support for subscriptions

Open pepijno opened this issue 6 years ago • 3 comments

Thanks for the awesome library!

I was wondering if support for reactive subscriptions can be added. For example, instead of doing

Subscription sub = sc.subscribe("foo", new MessageHandler() {
    public void onMessage(Message m) {
        ...
    }
}, /*SubscriptionOptions*/);

doing something like the following (using reactor core)

ReactiveSubscription sub = sc.subscribeReactive("foo", /*SubscriptionOptions*/);
Flux<Message> stream = sub.stream();

The above is just a possible example, I don't know what the best way of such an implementation would look like.

pepijno avatar Sep 12 '19 11:09 pepijno

I am open to the idea, but also don't want to bring in a new library/dependency, perhaps we could do a blog post/doc example instead?

sasbury avatar Sep 12 '19 18:09 sasbury

I have been thinking about this a little bit.

It's not so hard to make a Flux from a subscription object via the Flux.generate(...) part of the Project Reactor API. This might be worth a blog post if it's not obvious. But the only back-pressure support available would be buffering or dropping elements.

To add proper reactive support (effectively bridging a publishing Flux and a subscription Flux via Nats Streaming) seems quite a bit more complicated.

bmarcj avatar Feb 22 '20 18:02 bmarcj

I need reactive stream support as well with proper backpressure support. Note that this doesn't require a new library since as of Java 9 Reactive Stream support is built into Java via Flow. So if a NATS specific implementations of those interfaces are written that follow the Reactive Streams specification then it should work with reactive libraries like Project Reactor and RxJava.

steven-sheehy avatar Sep 15 '20 15:09 steven-sheehy