Implement per-segment query timeout on data nodes
Druid can struggle with query fairness in the native engine at the data node level (peon, historical).
Background
There are N primary “resources” which queries contend over during query processing:
- Processing threads (every query type)
- Each query will issue per-segment runners which are then run as individual futures on the processing thread pool.
- Larger # of segments -> larger # of runners -> larger # of threads
- Processing buffers (every query type)
- Each processing thread is assigned a buffer
- Additional merge Buffers (only GroupBy)
- Merge buffersGroupBy queries require 1 merge buffer per query (2 total buffer), while a nested GroupBy query requires merge buffers (3 total buffer).
What's already solved
- Initial priority-based scheduling of segments
- Each processing thread is held for however long it takes to process one segment, and is released back to the pool after that. When it goes back to the pool, another query can pick it up next, and at that time a higher priority query can take over
- This generally means that as long as two queries have different priorities, the highest priority one will always "take over" relatively quickly at the Historical/Peon level
Problem: Priority Inversion Issue
- Automatic query priority assignment isn’t great, so we cannot always rely on the priority assigned being representative of the query’s impact on the cluster.
- The above breaks down in the following case:
Query A – Priority 0 (highest), 10 segment, all take 1s to process
Query B – Priority 0 (lowest), 1000 segment, each take 10s to process
Query B beats A to all the processing threads, now A needs to wait for 10 of B’s segments (wait ~10s) to finish processing.
Description
Introduces a timeout context parameter segmentTimeout into the query context which can be used to specify the maximum time each segment scan should take. This parameter operates similar to the timeout key, where the default is no timeout, and any value > 0 is treated as a valid timeout.
Since Guava ListenableFuture are older than CompletableFuture, they do not come with their own self-managed timeout capabilities (meaning there's no .withTimeout, etc.). This means that in order to service the timeouts to all futures, I've needed to create a separate threadpool (in order to isolate work from scan queue). This will mean that small timeslices of CPU will be spent on servicing/cancelling timeouts, but this should negligible if configured correctly.
Release note
Key changed/added classes in this PR
-
MyFoo -
OurBar -
TheirBaz
This PR has:
- [x] been self-reviewed.
- [ ] using the concurrency checklist (Remove this item if the PR doesn't have any relation to concurrency.)
- [ ] added documentation for new or modified features or behaviors.
- [ ] a release note entry in the PR description.
- [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
- [ ] added or updated version, license, or notice information in licenses.yaml
- [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
- [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
- [ ] added integration tests.
- [ ] been tested in a test Druid cluster.
I think, this solves the problem only a narrow class of problems and adds another parameter that may not see a broader adoption. I do however agree that query scheduling at data level is an area worth exploring and tinkering with. Though I wonder if there are better ways to solve this problem. One solution that comes to my mind is if we can use virtual threads of sort. Right now, we have this processing thread pool that essentially dictates the compute capacity that segment processing threads. But if these threads are doing lot more IO than CPU, that capacity is being wasted. Recently java has gotten the ability of Virtual threads and that could be used to run segment processing instead of directly using OS-level threads.
A higher-level comment is that we shouldn't just make this change without some confidence that our solution makes lives better for a good number of use cases. You should first build a test setup that can be used to simulate query congestion at data level along with metrics that reflect the degree of the congestion, throughput, fairness. Once such a system is in place, thats when you can craft few strategies and using your test setup to measure what strategy is the best.
I think, this solves the problem only a narrow class of problems and adds another parameter that may not see a broader adoption. I do however agree that query scheduling at data level is an area worth exploring and tinkering with. Though I wonder if there are better ways to solve this problem. One solution that comes to my mind is if we can use virtual threads of sort. Right now, we have this processing thread pool that essentially dictates the compute capacity that segment processing threads. But if these threads are doing lot more IO than CPU, that capacity is being wasted. Recently java has gotten the ability of Virtual threads and that could be used to run segment processing instead of directly using OS-level threads.
A higher-level comment is that we shouldn't just make this change without some confidence that our solution makes lives better for a good number of use cases. You should first build a test setup that can be used to simulate query congestion at data level along with metrics that reflect the degree of the congestion, throughput, fairness. Once such a system is in place, thats when you can craft few strategies and using your test setup to measure what strategy is the best.
Regarding virtual threads – yes I thought about this as well, however that was introduced in JDK 21, which is currently not supported fully (AFAIK). When this becomes available, I 100% agree that (assuming historical processing threads aren't mostly CPU-bound), switching to userspace thread and increasing the pool size will be improvement.
A higher-level comment is that we shouldn't just make this change without some confidence that our solution makes lives better for a good number of use cases. You should first build a test setup that can be used to simulate query congestion at data level along with metrics that reflect the degree of the congestion, throughput, fairness. Once such a system is in place, thats when you can craft few strategies and using your test setup to measure what strategy is the best.
Yes, of course – we've observed this issue in our largest cluster numerous times (daily) where larger, low-priority queries beat smaller, shorter queries to the processing threads. This causes backup when processing each segment takes a while (on the order of O(seconds/mins) in some extreme cases). I see the point about having too many "unused" parameters (config bloat, etc.). By default this would be turned off, but it gives us an extra lever to turn to reduce the impact of these heavy-hitter queries.
@abhishekagarwal87 This idea came from my discussion with @gianm and @gianm suggested this change (see: https://apachedruidworkspace.slack.com/archives/C030CMF6B70/p1746578390954639?thread_ts=1745436989.786489&cid=C030CMF6B70) We have also observed some queries that takes on avg 1-2 minutes to process a single segment (see: https://apachedruidworkspace.slack.com/archives/C030CMF6B70/p1746141850807709?thread_ts=1745436989.786489&cid=C030CMF6B70)
Hmm. Looking at the query, my guess is that the segment processing is spending bulk of its time in CPU and as such might not be interruptible at all.
btw I still have the same concerns and we definitely need tests and metrics to test various techniques to improve progress when data nodes have query congestion.
Re virtual threads and JDK 21 support, we will need to be compatible with it given that its been around for a while and Java 17 itself reached its EOL in Sep 24.