druid icon indicating copy to clipboard operation
druid copied to clipboard

Add read timeout for getting records when sampling a stream

Open zachjsh opened this issue 2 years ago • 9 comments

Description

Previously, when sampling streaming data, the sampler would wait timeoutMs for numRows to be returned. If the stream has less rows than numRows, the sampler would wait the full timeoutMs period. In some cases, initially connecting to a given stream to sample may take a considerable amount of time, but reading subsequent records thereafter is a much faster operation. For these cases, this PR adds a new sampler config property readTimeoutMs, which if specified, causes the sampler to return after this period of time if no further records are found between the last record found, or timeoutMs, whichever comes first. The new readTimeoutMs is only applied once some records have been read from the stream during sampling. For example if a stream has only 1 record in it at a given time, and timeoutMs is configured as 15000 miliseconds and readTimeoutMs is configured as 2000 milliseconds while numRows is configured as 500, then assuming the first record is read quickly, the sampler will return after roughly 2000 milliseconds, whereas previously it would have returned after the full 15000 millesecond timeoutMs config value.

This PR has:

  • [x] been self-reviewed.
    • [ ] using the concurrency checklist (Remove this item if the PR doesn't have any relation to concurrency.)
  • [ ] added documentation for new or modified features or behaviors.
  • [ ] a release note entry in the PR description.
  • [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • [ ] added or updated version, license, or notice information in licenses.yaml
  • [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • [ ] added integration tests.
  • [x] been tested in a test Druid cluster.

zachjsh avatar Sep 25 '23 23:09 zachjsh

We really have to resist the temptation to add a config for every corner case we run into. Is it absolutely necessary to have this config? Even if you need these, why not use standard terms such as connectTimeout, readTimeout that are more common names and easier to relate to than something like nextRecordTimeout?

abhishekagarwal87 avatar Sep 26 '23 05:09 abhishekagarwal87

We really have to resist the temptation to add a config for every corner case we run into. Is it absolutely necessary to have this config? Even if you need these, why not use standard terms such as connectTimeout, readTimeout that are more common names and easier to relate to than something like nextRecordTimeout?

@abhishekagarwal87 , yes I can change to readTimeout, that is a better name.

zachjsh avatar Sep 26 '23 15:09 zachjsh

I am not entirely sure we need a config here either. Irrespective of what we name the config, it is going to cause confusion with the existing timeoutMs, especially since the SamplerConfig is used with batch too where this config would not be meaningful.

I think we could just have a fixed maximum number of empty polls. When we poll the topic for records, and say we don't get anything for say 10 tries, we finish the sampling.

Since this is only used for sampling, there is no real obligation to keep waiting until we have read the requested number of records. Compare this to batch where if there is no file at the specified path, we just fail or return empty and don't wait for data to show up.

@zachjsh , @abhishekagarwal87 , what do you think?

kfaraz avatar Sep 27 '23 03:09 kfaraz

I am not entirely sure we need a config here either. Irrespective of what we name the config, it is going to cause confusion with the existing timeoutMs, especially since the SamplerConfig is used with batch too where this config would not be meaningful.

I think we could just have a fixed maximum number of empty polls. When we poll the topic for records, and say we don't get anything for say 10 tries, we finish the sampling.

Since this is only used for sampling, there is no real obligation to keep waiting until we have read the requested number of records. Compare this to batch where if there is no file at the specified path, we just fail or return empty and don't wait for data to show up.

@zachjsh , @abhishekagarwal87 , what do you think?

thanks @kfaraz , just wanted to be clear that this new config is not strictly needed for things to work properly, most users will not have to touch this. However its usage enables an improved UX

About your idea described above, the poll timeout is hardcoded to 100 milliseconds. There could be cases where a user may want to allow for longer than 1 second for the next record to be read. As for confusion with timeoutMs, let me know if there is something I can add that helps differentiate them

zachjsh avatar Sep 27 '23 17:09 zachjsh

I feel that some kind of config here makes sense, either as a timeout or a max number of tries after encountering an empty read, to allow the behavior to be disabled or adjusted to the characteristics of a specific stream or user desire (streams may have different data rates, different users might want more or less data when sampling), so I don't think we should impose a one-size-fits all behavior here

imo it should be called something like streamingNextRecordTimeout or something with "streaming" in the name if we're concerned that there would be confusion with batch sampling cases

jon-wei avatar Sep 27 '23 18:09 jon-wei

It seems that the intent here is to prevent sampling from Kafka getting stuck when fewer than numRows exist in the stream. Instead of adding a second shorter timeout, could we tackle that problem directly? Something like having the sampler check the latest offset available in Kafka first (using RecordSupplier#getLatestSequenceNumber) and then returning early once it has read up through that latest message.

gianm avatar Sep 27 '23 19:09 gianm

Btw, on general design. Sampling is intended to be used in a very specific situation— it's there so the web console (& any similar third-party UIs) can show people a few records from a stream prior to configuring their supervisors. I can't imagine that most users will set any of these config parameters in a case like that. They'll just get whatever behavior the UI requests from the API.

For that reason I think it's important to focus on the out-of-box behavior, so users using the builtin web console get the benefit of these adjustments. That means:

  • If we're introducing a new config, we should consider setting a default that allows people using web console to get the benefit.
  • We should also consider adjustments that do not require a new config.

gianm avatar Sep 27 '23 19:09 gianm

Something like having the sampler check the latest offset available in Kafka first (using RecordSupplier#getLatestSequenceNumber) and then returning early once it has read up through that latest message.

Would this cause issues when useEarliestOffset is false? Or if the earliest offset happens to be close to the latest in a stream that's actively receiving data?

From my understanding, the timeout here is basically defining how long a stream can be idle (have no data arrive / offsets stay the same). The goal is to improve the experience for web console (and similar UIs) when a stream is low volume at the time ingestion is being setup in Druid. Right now, for the web console, if there are fewer than 500 (numRows) rows, each call to the sampler always takes 15 seconds (timeoutMs). The web console calls the sampler a lot during the normal flow, so this can be pretty slow.

I can't imagine that most users will set any of these config parameters in a case like that.

Agreed. The intent of the config in the sampler payload is to allow UIs to tweak their settings without users having to redeploy Druid. It's not intended to be set (or even seen) by normal end users.

dkoepke avatar Oct 04 '23 16:10 dkoepke

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the [email protected] list. Thank you for your contributions.

github-actions[bot] avatar Mar 05 '24 00:03 github-actions[bot]

This pull request/issue has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

github-actions[bot] avatar Apr 03 '24 00:04 github-actions[bot]