pulsar-spark icon indicating copy to clipboard operation
pulsar-spark copied to clipboard

[FEATURE] maxTriggerDelay feature in Pulsar-Spark Connector

Open keenborder786 opened this issue 3 years ago • 6 comments

Is your feature request related to a problem? Please describe.

I am consuming data from Pulsar through Spark Structure Streaming in micro-batches. Right now, what happens is that spark consumes messages as soon as they arrive in the Pulsar Broker queue i.e a micro-batch gets executed as soon as the messages arrive in the pulsar queue.

Describe the solution you'd like However, I want to trigger a single micro-batch only if a certain number of messages have arrived in the queue or if a specific time period has passed. This is possible in the Spark-Kafka connector with the following configurations image

Basically, it will be perfect if we can have the above config in pulsar-spark connector.

Describe alternatives you've considered

I know that there is an option pulsar.reader.receiverQueueSize which we can pass as follows :

pulsar_df = spark.readStream.format("pulsar").option("service.url", "pulsar://localhost:6650").option("admin.url", "http://localhost:8080").option("topic", "persistent://public/default/test-cdc-topic-3").option("pulsar.reader.receiverQueueSize","1000000")

However, this only resolves one side of the problem i.e configuring the maximum size of the message queue. Even, if we set this, currently what happens is that the connector triggers the micro-batch as soon as new messages arrive. Ideally, what should happen is that the micro-batch execution should wait until 'x' unit of time provided the receiverQueueSize threshold has not been reached.

Further, I also know that this is possible in Java Client for Pulsar on top of which Pulsar-Spark connector is written through Batch Receiving Policy

Additional context I am using the following environment:

  • Spark: 3.3.1
  • Pulsar: 2.10.2
  • PySpark: 3.3.1
  • Python: 3.9.15

keenborder786 avatar Mar 28 '23 16:03 keenborder786

@syhily please have a look.

keenborder786 avatar Mar 28 '23 16:03 keenborder786

@keenborder786 This config request translates into implementing the SupportsAdmissionControl interface.

We will work on that during Q2

nlu90 avatar Mar 29 '23 19:03 nlu90

@nlu90 Should I close the issue then?

keenborder786 avatar Mar 29 '23 19:03 keenborder786

@keenborder786 we can leave it open and close it when the implementation is done

nlu90 avatar Mar 29 '23 21:03 nlu90

okay great.

keenborder786 avatar Mar 29 '23 21:03 keenborder786

@nlu90 has there been any update regarding this, since Q2 has elapsed?

oneebhkhan avatar Aug 02 '23 12:08 oneebhkhan