ydb-java-sdk icon indicating copy to clipboard operation
ydb-java-sdk copied to clipboard

Add CompletableFuture to wait until AsyncReader is fully initialised

Open Eistern opened this issue 2 years ago • 0 comments

Currently, when working with the AsyncReader there is no convenient way to block the main thread until it is not needed

Example:

package org.example;

import tech.ydb.topic.TopicClient;
import tech.ydb.topic.read.AsyncReader;
import tech.ydb.topic.read.Message;
import tech.ydb.topic.read.PartitionSession;
import tech.ydb.topic.read.events.AbstractReadEventHandler;
import tech.ydb.topic.read.events.DataReceivedEvent;
import tech.ydb.topic.read.events.ReaderClosedEvent;
import tech.ydb.topic.settings.ReadEventHandlersSettings;
import tech.ydb.topic.settings.ReaderSettings;
import tech.ydb.topic.settings.TopicReadSettings;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static org.example.YdbConsts.*;

public class Reader {
    public static void main(String[] args) {
        TopicClient topicClient = TopicClient.newClient(createTransport())
                .build();

        AsyncReader asyncReader = topicClient.createAsyncReader(ReaderSettings.newBuilder()
                        .setConsumerName(CONSUMER_NAME)
                        .setTopics(topic())
                        .build(),
                ReadEventHandlersSettings.newBuilder()
                        .setEventHandler(new EventHandler())
                        .build()
        );

        CompletableFuture<Void> init = asyncReader.init();
        init.join();
        synchronized (Reader.class) {
            try {
                Reader.class.wait(10_000);
            } catch (InterruptedException ignored) {}
        }
    }

    private static List<TopicReadSettings> topic() {
        List<TopicReadSettings> result = new ArrayList<>();
        for (TopicDesc topic : TOPICS) {
            TopicReadSettings topicS = TopicReadSettings.newBuilder()
                    .setPath(topic.name())
                    .build();
            result.add(topicS);
        }
        return result;
    }

    private static class EventHandler extends AbstractReadEventHandler {
        @Override
        public void onMessages(DataReceivedEvent event) {
            PartitionSession partitionSession = event.getPartitionSession();
            System.out.printf("Got message from: %s %d %d%n", partitionSession.getPath(), partitionSession.getPartitionId(), partitionSession.getId());
            for (Message message : event.getMessages()) {
                String s = new String(message.getData());
                System.out.printf("Message %s%n", s);
            }

            synchronized (this) {
                try {
                    this.wait(10_000);
                } catch (InterruptedException ignored) {}
            }

            event.commit();
        }

        @Override
        public void onReaderClosed(ReaderClosedEvent event) {
            System.out.println("Closed");
        }
    }
}

This example won't work without an additional wait after joining on a init future due to all other threads being marked as a daemon


Main request: Provide some means to actually wait for the reader to be properly opened (even without assigned partition session)

Eistern avatar Oct 29 '23 06:10 Eistern