druid icon indicating copy to clipboard operation
druid copied to clipboard

Segment loading and balancing improvements

Open kfaraz opened this issue 3 years ago • 0 comments

Motivation

Segment loading is one of the more complicated pieces of Druid operations and is prone to failures and race conditions. Due to a lack of clean logs and metrics, it is often very difficult to debug an affected cluster.

Identified issues and proposed changes

Discussed below are a few issues with the current implementation and possible fixes that can help improve the coordinator operation.

1. Balancing always causes over-replication!

Problem: While moving a segment from server A to B, the load succeeds on B but the drop on A is never invoked. This happens because the server inventory view maintained by the coordinator may still not be updated when the drop callback comes. Under normal circumstances, this is not noticeable because load rules quickly drop the over-replicated segments. But if load rules get stuck for some reason, the number of over-replicated segments keeps increasing thereby overloading historicals.

over_replication

Proposed fix:

  1. Do not check the server inventory to determine if the drop should be invoked.
  2. Use the HTTP status (applicable only to HttpLoadQueuePeon) received from the change request to determine if the load was successful or not.
  3. Provide a success as well as a failure callback to the LoadQueuePeon.

2. Changes to replicationThrottleLimit

Definition:

The maximum number of segments that can be replicated at one time.

The main aim of throttling replication is to avoid a stampede when a server suddenly disappears. It is possible that the server is temporarily invisible to the coordinator due to ephemeral network issues. In such scenarios, the load queue should not be completely filled up with replicas which might appear soon after.

Problem 1: The current implementation considers replication on a cluster level, rather than a tier level. Ideally, if load rules determine that a segment should be available on a tier, the segment should be quickly loaded there. Relying on data existing on other tiers kind of defeats the purpose of tiering.

Proposed fix:

  1. Do not consider the first copy of a segment on a tier as a replica and do not subject it to replication throttling.

At the start of every run, a tier is considered eligible for replication only if there is no pending replication in that tier. Based on the replicationThrottleLimit, we add items to the replication queue during the course of RunRules.

Problem 2: While rules are executing, the replication queue might free up as replicas get successfully loaded. The rules misinterpret this situation as the replicationThrottleLimit not having been met and queue up more replicas. In some situations, this can go on for a very long time increasing the duration of a now stale coordinator run cycle.

Proposed fix:

  1. Clarify the definition of the replicationThrottleLimit as The maximum number of segment that can be queued up for replication in one coordinator cycle.
  2. Determine replicationThrottleLimit at the start of the run for each tier. Compare the number of segments assigned for replication in this run rather than the current queue size against the replicationThrottleLimit to determine if the limit has been reached.

3. Drops happen only after target replication is met on every tier

Problem: The target replication level on a new tier might be high and would be subject to replication throttling. Drop of an unneeded replica should not have to wait on this.

Proposed fix: Drop over-replicated segments as soon as a single replica of the segment is available on every required tier.

4. Only loaded segments count towards over-replication

Changes in item (2) above ensure that segments assigned freshly to a tier would not be subject to replication throttling anymore. This implies that when a server disappears temporarily (due to intermittent network issues), segments whose only copy in this tier was on the disappeared server would quickly be reassigned to other servers. It's possible that the server then reappears soon after.

In the current implementation, all the items in the load queue would finish loading and only then would the rules realise that there is an over-replication. This could take several cycles to happen and would unnecessarily block other important load operations.

Proposed fix:

  1. Consider a segment as over-replicated if loaded + loading > required in any tier.
  2. While dropping over-replicated segments, look first in the loading queues. If found, cancel the loading operation. Otherwise, drop one of the loaded segments.

5. Segments being balanced may count towards over-replication

As a consequence of the fix in item (4) above, segments that are in the loading queue for balancing might be viewed as over-replicated.

Proposed fix:

  1. Maintain a marker to distinguish between "moving" segments and "loading" segments.
  2. Do not consider "moving" segments while computing replication level or while cancelling load operations to fix over-replication.
  3. For all other purposes, continue to treat "moving" and "loading" segments as the same.

6. Historicals can get overassigned

During the course of a coordinator run, the available space on a server is not updated even when a load completes. This is because a snapshot of the cluster is taken at the start of the run and the coordinator duties use this snapshot to make assignment decisions.

Proposed fix:

  1. Maintain the current size and max size of every server inside the HttpLoadQueuePeon.
  2. Refresh this value at the start of every run.
  3. HttpLoadQueuePeon already maintains the size of items currently in queue.
  4. As soon as an item in the load/drop queue succeeds, update the current size.
  5. Add a method boolean canLoad(DataSegment segment) which checks both the current size, the queue size and the maxSegmentsInNodeLoadingQueue to determine if a segment can be loaded.

Currently, both the LoadRule and BalanceSegments have to account for the space available on a server and its loading queue to determine if a segment can be assigned to a server. The new method canLoad would simplify this logic. Hopefully, we would also be able to phase out maxSegmentsInNodeLoadingQueue after this.

7. Changes to cachingCost balancer strategy

Problem: There are some ways in which cachingCost differs from cost balancer strategy.

  1. It does not subtract the cost of segments about to be dropped.
  2. It has a correction factor of maxSize / availableSize which is supposed to work for heterogeneous historicals but causes issues with homogeneous historicals.

Proposed fix:

  1. Subtract the cost of segments marked to be dropped.
  2. Remove the correction factor from cachingCost. This means that clusters with heterogeneous historical sizes in the same tier should use diskNormalized and should avoid both cost and cachingCost strategies.

8. Balancing skips unused segments but not overshadowed segments

Problem: The duty MarkAsUnusedOvershadowedSegments marks overshadowed segments as unused. But unfortunately, the BalanceSegments is unable to use this information as it works off of an older snapshot.

Proposed fix: This can be easily remedied by updating the list of used segments maintained by the CoordinatorRuntimeParams whenever a segment is marked as unused.

9. Logging and metrics

Load rules currently flood the logs with cryptic info which hardly provides any assistance while debugging a stuck load.

We should clean up the logs and provide verbose info only on debug or trace levels. Some new metrics should also be added to track the following info:

  1. Number of items currently in the replication queue.
  2. Number of cancelled load operations.
  3. Average time taken for cost computation of each segment.

Current work status

I am working on the changes for the above fixes and will publish the PRs soon.

Further improvements

Alongwith these, there are other areas that require improvement to make segment loading more robust and reliable.

  1. A single rule run can sometimes take a lot of time (upto hours). This is highly undesirable as the rules would be using a stale metadata snapshot and keep making incorrect decisions. Ideally, the coordinator should make short and quick decisions and keep dynamically correcting itself. Most of this time is spent in cost computation, as can be easily verified by using a RandomBalancerStrategy. An in-depth comparison of cost strategies and identification of possible improvements would go a long way to improve balancing and loading.

  2. The cache used by cachingCost balancer strategy is constructed freshly at the start of every coordinator cycle. The advantage that cachingCost seems to offer over cost is that you would need to compute the cost for a segment only against the relevant interval buckets rather than all the segments. But the cost of all the segments is anyway computed at the start while building the cache. Since the number of segments for which cost is to be computed is typically much lower than the total number of segments, this does not offer any real advantage. An improvement could be to maybe reuse the cache across coordinator runs if possible or atleast populate the cache lazily for interval buckets when they are needed.

  3. The replicationThrottleLimit considers the number of replicas rather than their size to throttle replication. Ideally, this throttling should try to ensure that only a certain percentage (based on size) of the segments that each historical of a tier currently houses are considered for replication in each run. This value can be determined for each tier at the start of the run. Using this approach, we should be able to eventually phase out this config.

kfaraz avatar Aug 09 '22 04:08 kfaraz