Trying to get reactive backpressure to work
Hi, I'm trying to do a PoC on reactive backpressure using pgadba, and I kind of get the impression it isn't honoring backpressure. I'm doing a simple query from a database using a rowpublisheroperation:
DataSource ds = DataSourceFactory.newFactory("org.postgresql.adba.PgDataSourceFactory")
.builder()
.url("jdbc:postgresql://localhost:5432/dvdrental")
.username("postgres")
.password("mysecretpassword")
.build();
CompletableFuture<String> result = new CompletableFuture<>();
Session session = ds.getSession();
// Very basic SubmissionPublisher (and a Flow.Processor), so I can expose the result as a publisher
RowProcessor rp = new RowProcessor();
final CompletableFuture<String> completableFuture = session.<String>rowPublisherOperation("select title from film limit 1000")
.subscribe(rp, result)
.submit()
.getCompletionStage()
.toCompletableFuture();
// Convert to RxJava2, because I'm most confortable with that
long count = FlowInterop.fromFlowPublisher(rp)
.doOnTerminate(()->session.close())
.map(e->(String)e.at("title").get())
.doOnNext(e->System.err.println("Title: "+e))
.count()
.blockingGet();
System.err.println("Count: "+count);
Where the rowprocessor is this:
public class RowProcessor extends SubmissionPublisher<Result.RowColumn> implements Flow.Processor<Result.RowColumn,Result.RowColumn> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(RowColumn item) {
submit(item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.err.println("completed");
close();
}
}
Now what happens is this: If I limit to about 500 it works fine.
select title from film limit 500
results in:
completed
Count: 500
All good. If I increase to 900 I see this:
completed
Count: 895
Along with 5 IllegalStateExceptions:
java.lang.IllegalStateException: failed to offer item to subscriber
at org.postgresql.adba/org.postgresql.adba.submissions.ProcessorSubmission.lambda$0(ProcessorSubmission.java:82)
at java.base/java.util.concurrent.SubmissionPublisher.retryOffer(SubmissionPublisher.java:445)
at java.base/java.util.concurrent.SubmissionPublisher.doOffer(SubmissionPublisher.java:422)
at java.base/java.util.concurrent.SubmissionPublisher.offer(SubmissionPublisher.java:550)
at org.postgresql.adba/org.postgresql.adba.submissions.ProcessorSubmission.addRow(ProcessorSubmission.java:81)
at org.postgresql.adba/org.postgresql.adba.communication.network.Portal.addDataRow(Portal.java:152)
at org.postgresql.adba/org.postgresql.adba.communication.network.ExecuteResponse.read(ExecuteResponse.java:31)
at org.postgresql.adba/org.postgresql.adba.communication.NetworkConnection.handleRead(NetworkConnection.java:407)
at org.postgresql.adba/org.postgresql.adba.execution.DefaultNioLoop.run(DefaultNioLoop.java:127)
at java.base/java.lang.Thread.run(Thread.java:844)
If I do more with the results (like printing the rows to the console) the problem gets worse, so I get the impression that if the pg driver does not slow down reading the result when the consumer can't keep up.
Hi. I agree on that this needs improvement.
What the driver needs to do is to stop reading from the tcp socket once it encounters back pressure, but it doesn't do that yet.
Patches are of source welcome, otherwise i will look at it later this week. My day job is currently very busy so my open source work suffers a bit.
No rush, I know that feeling. Glad it is something that makes sense.