druid icon indicating copy to clipboard operation
druid copied to clipboard

Introduce cost-based tasks autoscaler for streaming ingestion

Open Fly-Style opened this issue 2 months ago • 6 comments

Cost-Based Autoscaler for Seekable Stream Supervisors

Pre-requisite

PR will be marked as 'ready for review' when an author will add an integration test.

Overview

Implements a cost-based autoscaling algorithm for seekable stream supervisor tasks that optimizes task count by balancing lag reduction against resource efficiency.

Note: this patch doesn't support autoscaling (down) during task rollover. Temporarily, it scales down in the same manner as scales up.

Key Features

1. Scales up during the runtime, scales down during task rollover. 2. Predictive Cost Calculation

  • Predicts lag and idle ratio for candidate task counts using more conservative logarithmic scaling;
  • Evaluates multiple task count options within ±2 positions of current count;
  • Tracks historical observed lag values, provides stable normalization even with varying lag magnitudes;

3. Lag-Aware Idle Estimation

  • High lag: Inverse relationship - more tasks = less idle (processing backlog)
  • Low lag: Normal relationship - more tasks = more idle (waiting for data)

Algorithm Design

The Ideal Idle Range: [0.2, 0.6]

The autoscaler targets an idle ratio between 20% and 60%. This range represents optimal task utilization:

                        Idle Ratio
                             |
    OVERLOADED          IDEAL RANGE          UNDERUTILIZED
    (scale UP)        (no action needed)      (scale DOWN)
                             |
    <-------|---------------|---------------|------->
    0.0    0.2             0.4             0.6    1.0
            |_______________|_______________|
                    Zero cost zone

Cost Function

In case we are not in our ideal idle range, the autoscaler uses a weighted cost function to evaluate different task count configurations:

cost = lagWeight × normalizedLag + idleWeight × predictedIdleCost

Components of cost function:

  • Lag component: Measures how quickly the system processes "backlog"
  • Idle component: Measures resource efficiency (tasks waiting for data or busy)

Example: High Lag with High Idle Scenario (lag > 1,000 AND idle > 0.6)

Behavior: More tasks = less idle (inverted mode - tasks are idle despite backlog)

This inverted mode only applies when both conditions are met:

  • High lag (> 1,000) indicating a significant backlog
  • High idle (> 0.6) indicating tasks are underutilized

This strange combination suggests something is preventing tasks from working (I/O bottleneck, rate limiting, etc.). Adding more workers may help process the backlog.

Note: If lag is high but idle is low or in ideal range, normal mode applies (tasks are already busy processing the backlog).

Starting from 10 tasks with 0.8 idle ratio (above ideal, inverted mode):

Tasks Task Ratio Scale Factor Inverse Factor Predicted Idle In Ideal Range?
2 0.2x 0.32 3.13 1.00 (clamped) NO (above 0.6)
5 0.5x 0.45 2.22 1.00 (clamped) NO (above 0.6)
10 1.0x 0.63 1.59 1.00 (clamped) NO (above 0.6)
13 1.3x 0.76 1.32 1.00 (clamped) NO (above 0.6)
17 1.7x 0.90 1.11 0.89 NO (above 0.6)
25 2.5x 1.13 0.88 0.70 NO (above 0.6)
50 5.0x 1.63 0.61 0.49 YES
Key files Changed

Core Implementation:

  • WeightedCostFunction.java - Cost function and adaptive bounds
  • CostBasedAutoScaler.java - Autoscaler orchestration
  • CostMetrics.java - Metrics data class

Tests:

  • WeightedCostFunctionTest.java - Comprehensive unit tests
  • CostBasedAutoScalerTest.java - Integration tests

Additionally, we add reading poll-idle ratio-avg from /rowStats task endpoint.


This PR has:

  • [x] been self-reviewed.
    • [ ] using the concurrency checklist (Remove this item if the PR doesn't have any relation to concurrency.)
  • [x] added documentation for new or modified features or behaviors.
  • [ ] a release note entry in the PR description.
  • [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • [x] added or updated version, license, or notice information in licenses.yaml
  • [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • [x] added integration tests.
  • [ ] been tested in a test Druid cluster.

Fly-Style avatar Dec 05 '25 22:12 Fly-Style

While I think this will be very useful, the primary issue we've run into with the current scaler is that it needs to shut down tasks in order to scale (causes a lot of lag during this process). https://github.com/apache/druid/pull/18466 was working on a way to fix this.

I think this will help the scaler be smarter on each scale, but each scale action still costs a lot to do.

jtuglu1 avatar Dec 06 '25 00:12 jtuglu1

@jtuglu1 thanks for your input, appreciate it! The aim to make more capable / tunable autoscaler, while in parallel we will make improvements proposed in #18466 .

Fly-Style avatar Dec 08 '25 12:12 Fly-Style

@kfaraz thanks for the review. I addressed most of the comments in https://github.com/apache/druid/pull/18819/commits/e5b40c7a0220f5e1bdeb9f559dfafcf91239ca44 , except comment regarding using the correct metrica for poll-idle-ratio. Unfortunately, initial plan with measuring poll from the consumer was not correct, more details in this comment. I simply did not know it measures a bit wrong thing. :(

Will do a separate endpoint for correct metrics to fetch correct metrics from all tasks and calculate average with consequent data normalization in separate commit.

cc @cryptoe

Fly-Style avatar Dec 10 '25 15:12 Fly-Style

At most, maybe keep just one config scaleActionPeriod that can be specified as an ISO period (e.g. PT1M) or something (mostly since you would be using this in embedded tests). The other configs don't really add any value. They are legacy configs in lag-based auto-scaler which we might as well avoid adding in the new strategy.

I don't fully agree with this. At the very least, we use config.getScaleActionStartDelayMillis() internally when doing red/black deployments where supervisors can get paused. It's better in our case to put a delay after resubmitting the supervisor, otherwise we end-up over-scaling after a deployment. Similarly, we update the specs frequently to add new/update existing columns. Putting a cooldown after submission allows the scaler to adjust accurately to the lag rather than getting in a scaling loop and becoming way over-scaled (in supervisors with 500+ tasks this is issue). I agree the rest are not too useful in practice.

jtuglu1 avatar Dec 15 '25 17:12 jtuglu1

Thanks for sharing the insight on config.getScaleActionStartDelayMillis(), @jtuglu1 ! Can you share some typical values that you use for this config, and how it compares to the scale action period?

kfaraz avatar Dec 15 '25 19:12 kfaraz

Thanks for sharing the insight on config.getScaleActionStartDelayMillis(), @jtuglu1 ! Can you share some typical values that you use for this config, and how it compares to the scale action period?

We typically set a submit (start) delay of 20-30mins after suspension/re-submit. This allows the scaler a chance to recover at its current task count before scaling (because often times, scaling too frequently will disrupt lag more than it helps). Instead, we opt to try and let the lag recover under 20mins, and if there's a sustained decrease in read tput (either due to new column) or increase in write tput, we allow the scaler to scale.

Scale action period is typically much smaller, maybe 5-10mins.

jtuglu1 avatar Dec 15 '25 19:12 jtuglu1

@Fly-Style , I have merged this PR. Please address the open comments in a follow up PR.

kfaraz avatar Dec 17 '25 05:12 kfaraz