pgadba icon indicating copy to clipboard operation
pgadba copied to clipboard

Trying to get reactive backpressure to work

Open flyaruu opened this issue 6 years ago • 2 comments

Hi, I'm trying to do a PoC on reactive backpressure using pgadba, and I kind of get the impression it isn't honoring backpressure. I'm doing a simple query from a database using a rowpublisheroperation:

DataSource ds = DataSourceFactory.newFactory("org.postgresql.adba.PgDataSourceFactory")
    .builder()
    .url("jdbc:postgresql://localhost:5432/dvdrental")
    .username("postgres")
    .password("mysecretpassword")
    .build();

CompletableFuture<String> result = new CompletableFuture<>();
Session session = ds.getSession();
// Very basic SubmissionPublisher (and a Flow.Processor), so I can expose the result as a publisher
RowProcessor rp = new RowProcessor();
final CompletableFuture<String> completableFuture = session.<String>rowPublisherOperation("select title from film limit 1000")
			.subscribe(rp, result)
			.submit()
			.getCompletionStage()
			.toCompletableFuture();

// Convert to RxJava2, because I'm most confortable with that
long count = FlowInterop.fromFlowPublisher(rp)
	.doOnTerminate(()->session.close())
	.map(e->(String)e.at("title").get())
	.doOnNext(e->System.err.println("Title: "+e))
	.count()
	.blockingGet();
System.err.println("Count: "+count);

Where the rowprocessor is this:

public class RowProcessor extends SubmissionPublisher<Result.RowColumn> implements Flow.Processor<Result.RowColumn,Result.RowColumn> {

	private Subscription subscription;
	@Override
	public void onSubscribe(Subscription subscription) {
		this.subscription = subscription;
		this.subscription.request(1);
	}
	@Override
	public void onNext(RowColumn item) {
		submit(item);
		this.subscription.request(1);
	}
	@Override
	public void onError(Throwable throwable) {
		throwable.printStackTrace();
	}
	@Override
	public void onComplete() {
		System.err.println("completed");
		close();
	}
}

Now what happens is this: If I limit to about 500 it works fine.

select title from film limit 500

results in:

completed
Count: 500

All good. If I increase to 900 I see this:

completed
Count: 895

Along with 5 IllegalStateExceptions:

java.lang.IllegalStateException: failed to offer item to subscriber
	at org.postgresql.adba/org.postgresql.adba.submissions.ProcessorSubmission.lambda$0(ProcessorSubmission.java:82)
	at java.base/java.util.concurrent.SubmissionPublisher.retryOffer(SubmissionPublisher.java:445)
	at java.base/java.util.concurrent.SubmissionPublisher.doOffer(SubmissionPublisher.java:422)
	at java.base/java.util.concurrent.SubmissionPublisher.offer(SubmissionPublisher.java:550)
	at org.postgresql.adba/org.postgresql.adba.submissions.ProcessorSubmission.addRow(ProcessorSubmission.java:81)
	at org.postgresql.adba/org.postgresql.adba.communication.network.Portal.addDataRow(Portal.java:152)
	at org.postgresql.adba/org.postgresql.adba.communication.network.ExecuteResponse.read(ExecuteResponse.java:31)
	at org.postgresql.adba/org.postgresql.adba.communication.NetworkConnection.handleRead(NetworkConnection.java:407)
	at org.postgresql.adba/org.postgresql.adba.execution.DefaultNioLoop.run(DefaultNioLoop.java:127)
	at java.base/java.lang.Thread.run(Thread.java:844)

If I do more with the results (like printing the rows to the console) the problem gets worse, so I get the impression that if the pg driver does not slow down reading the result when the consumer can't keep up.

flyaruu avatar Apr 22 '19 08:04 flyaruu

Hi. I agree on that this needs improvement.

What the driver needs to do is to stop reading from the tcp socket once it encounters back pressure, but it doesn't do that yet.

Patches are of source welcome, otherwise i will look at it later this week. My day job is currently very busy so my open source work suffers a bit.

alexanderkjall avatar Apr 23 '19 07:04 alexanderkjall

No rush, I know that feeling. Glad it is something that makes sense.

flyaruu avatar Apr 23 '19 14:04 flyaruu