[FEATURE] maxTriggerDelay feature in Pulsar-Spark Connector
Is your feature request related to a problem? Please describe.
I am consuming data from Pulsar through Spark Structure Streaming in micro-batches. Right now, what happens is that spark consumes messages as soon as they arrive in the Pulsar Broker queue i.e a micro-batch gets executed as soon as the messages arrive in the pulsar queue.
Describe the solution you'd like
However, I want to trigger a single micro-batch only if a certain number of messages have arrived in the queue or if a specific time period has passed. This is possible in the Spark-Kafka connector with the following configurations

Basically, it will be perfect if we can have the above config in pulsar-spark connector.
Describe alternatives you've considered
I know that there is an option pulsar.reader.receiverQueueSize which we can pass as follows :
pulsar_df = spark.readStream.format("pulsar").option("service.url", "pulsar://localhost:6650").option("admin.url", "http://localhost:8080").option("topic", "persistent://public/default/test-cdc-topic-3").option("pulsar.reader.receiverQueueSize","1000000")
However, this only resolves one side of the problem i.e configuring the maximum size of the message queue. Even, if we set this, currently what happens is that the connector triggers the micro-batch as soon as new messages arrive. Ideally, what should happen is that the micro-batch execution should wait until 'x' unit of time provided the receiverQueueSize threshold has not been reached.
Further, I also know that this is possible in Java Client for Pulsar on top of which Pulsar-Spark connector is written through Batch Receiving Policy
Additional context I am using the following environment:
- Spark: 3.3.1
- Pulsar: 2.10.2
- PySpark: 3.3.1
- Python: 3.9.15
@syhily please have a look.
@keenborder786 This config request translates into implementing the SupportsAdmissionControl interface.
We will work on that during Q2
@nlu90 Should I close the issue then?
@keenborder786 we can leave it open and close it when the implementation is done
okay great.
@nlu90 has there been any update regarding this, since Q2 has elapsed?