druid icon indicating copy to clipboard operation
druid copied to clipboard

Segment Rebalance Race

Open jtuglu1 opened this issue 2 months ago • 6 comments

I've identified a race in segment movement which causes temporary segment unavailability (and thus partial query results).

Segment load/drop callbacks are racing with prepareCurrentServers.

Consider the following scenario: Coordinator is moving segment S from server A to server B. S has 1x replication.

Basically, the load/drop callbacks can happen between the start/end of this function in a way where you get a

SegmentReplicaCount{requiredAndLoadable=1, required=1, loaded=2, loadedNonHistorical=0, loading=0, dropping=0, movingTo=0, movingFrom=0}. 

Essentially:

T0[Coordinator]: enter prepareCurrentServers()
T1[Coordinator]: Server[B] completed request[MOVE_TO] on segment[S] with status[SUCCESS]
T2[Coordinator]: Dropping segment [S] from server[A]
T3[A]: Completely removing segment[S] in [30,000]ms.
T4[Coordinator]: Server[A] completed request[DROP] on segment[S] with status[SUCCESS].
T5[Coordinator]: exit prepareCurrentServers()
T6[Coordinator]: enter prepareCluster()
T7[Coordinator]: exit prepareCluster()
T8[Coordinator]: enter initReplicaCounts()
T9[Coordinator]: exit initReplicaCounts()
T10[Coordinator]: Segment S replica count is SegmentReplicaCount{requiredAndLoadable=1, required=1, loaded=2, loadedNonHistorical=0, loading=0, dropping=0, movingTo=0, movingFrom=0}
T11[Coordinator]: Dropping segment [S] from server[B]

I think what's happening is that the server loop in prepareCurrentServers() reads the servers in a state where LOAD has persisted in the server view (2x loaded) but the DROP has not materialized yet in the view. This causes loaded=2. Then, I thought the in-flight DROP (since it hasn't materialized in the view) would get picked up in the queuedSegments load queue peons (and show up as dropping=1), but I think since the DROP callback returns – and clears the entry from the old queuedSegments load queue peons – before prepareCluster() has a chance to copy over the queued action to new queuedSegments, we lose that important bit of information. Hence, you are left in a weird state with a "valid" queue but an invalid load state. In other words, I think we need to somehow synchronize callbacks with this prepareCurrentServers() and prepareCluster().

Affected Version

All recent Druid versions.

jtuglu1 avatar Nov 20 '25 06:11 jtuglu1

@jtuglu1 , thanks for reporting this!

IIUC, there are two parts to the problem.

A

The issue is that there is a momentary lapse in time between when server B loads the segment (and it appears as loaded in accounting) and when the callback for dropping on server A occurs.

I think this may be true but more from the Broker's perspective i.e. it is possible that the Broker sees the drop from A and then the load on B, causing the segment to be unavailable for a brief period. Since the Coordinator is not aware of the inventory of the individual Brokers, there isn't much we can do here.

Generally, the Coordinator's own inventory is meant to serve as a proxy for what the Brokers see. And if Coordinator's inventory does not have S on B, then Coordinator wouldn't decide to remove the segment from B anyway. On the contrary, it might try to load S somewhere (A or B or maybe even C) leading to over-replication in the next cycle and then another drop from somewhere.

But I assume this is not the case discussed here.

Solution

Even though this is not the bug reported in this ticket, there is room for improvement (or atleast an alternate strategy).

We could delay the drop from A until Coordinator is sure that B has loaded S (this would save the extra work which sometimes ends up nullifying the move from A to B by reloading the segment on A 😅 ).

  • An easy way to do that would be to leave S in segmentsMarkedToDrop of peon A and do nothing in the callback of SegmentLoadQueueManager.
  • When S shows up on B and we recognize it as over-replicated (in a later duty run), we prioritize drop of the extra S from A (since it is already marked to drop from A).
  • We might also need to tweak the calculation of replicas so that movingFrom segments also count towards over-replication.
  • I suppose the drawback is that segment balancing would now seem to be slow as the segments would stick around on A until the next coordinator cycle (typically 1 minute).

B

Because the projectedReplicas (calculated here) is: loadedNotDropping + loading - max(0, moveFrom - moveTo) = 2 (both A/B are loaded at this point) + 0 - max(0, 1 - 1) = 2, this causes the drop to be subsequently scheduled on B.

This can probably play out as follows:

  • T1: Duty run starts
  • T1: Initialize ServerHolder for A
  • T2: SegmentLoadQueueManager calls peonA.unmarkSegmentToDrop(s)
  • T1: Considers segment S to have status loaded on A
  • T2: SegmentLoadQueueManager calls peonA.dropSegment(s)
  • T1: Runs rules assuming S is loaded on both A and B

This seems closer to what you are witnessing. Just to clarify though, the moveFrom and moveTo counts would both be 0 in this case.

Solution:

I think we can fix this one by:

  • Using a synchronized(lock) inside HttpLoadQueuePeon.getSegmentsMarkedToDrop()
  • Not calling peonA.unmarkSegmentToDrop() explicitly in SegmentLoadQueueManager#L128.
  • Instead, perform segmentsMarkedToDrop.remove(s) inside HttpLoadQueuePeon.dropSegment() inside the synchronized block.

@jtuglu1 , please let me know if either of these solutions works for the issue that you have encountered. I think solution B is something we should do anyway as it makes the whole flow more thread-safe.

kfaraz avatar Nov 20 '25 12:11 kfaraz

But I assume this is not the case discussed here.

Correct. That issue is outlined here.

Just to clarify though, the moveFrom and moveTo counts would both be 0 in this case.

Hmm – I'm not sure about this. Printing the server B action in the load callback routine actually yields MOVE_TO (basically the action has not been removed yet from the queuedSegments). This would expect me to believe that server A is also MOVE_FROM (since we have not yet scheduled the drop), since otherwise we would not get a zero result from that computation.

I think we can fix this one by:

While I think this fixes one potential problem, I'm not sure this fully fixes the issue. This is because I think there's some vagueness to the following question(s):

  • When exactly the server B goes from MOVE_TO to null action state (e.g. this line) w.r.t the load callback being issued. If we want to keep the current accounting method, we need to make sure B's action state change and A's state change from MOVE_FROM to DROP are atomic. This way, the accounting is not affected.

Even if we synchronize the drop logic above, the accounting is set up where if server A/B states are not updated atomically (A goes to DROP, B goes to null and loaded increases +1), we risk invalid accounting and a race.

jtuglu1 avatar Nov 20 '25 17:11 jtuglu1

Printing the server B action in the load callback routine actually yields MOVE_TO (basically the action has not been removed yet from the queuedSegments)

This might be true, but ServerHolder for B is not relevant in the next duty run since each run of the duty uses a fresh set of ServerHolder instances (which is a snapshot of the time the duty run starts). This is why the callback does not use the ServerHolder and refers to the peons directly (peon objects remain in scope as long as the underlying server is visible to the Coordinator), since the ServerHolder would have a stale snapshot state by the time the callback actually gets run.

The race condition that you have encountered, is it happening within the same duty run or across two duty runs?

kfaraz avatar Nov 21 '25 04:11 kfaraz

The race condition that you have encountered, is it happening within the same duty run or across two duty runs?

I haven't examined all occurrences (there are quite a few), but from the ~random~ sample that I've looked at:

  • Move initiatied in HistoricalManagementDuties run 1 (load on B initiated)
  • HistoricalManagementDuties run 1 ends
  • HistoricalManagementDuties run 2 starts
  • Load callback on B, and yes, B's ServerHolder is still MOVE_TO (serverB.getActionOnSegment(segment)).
  • Drop race happens.
  • HistoricalManagementDuties run 2 ends.

jtuglu1 avatar Nov 21 '25 05:11 jtuglu1

Think I've RC'd at least one race:

Basically the issue is:

  1. Duty group starts
  2. build() is called once (callsite)
  1. build() is called again (callsite)

basically, in cases where the load callback happens after the first build() but before duty completion, you get your race.

jtuglu1 avatar Nov 21 '25 06:11 jtuglu1

I had some time to do a bit more digging and I think there's something a bit different going on in some cases. The callbacks are racing with prepareCurrentServers. Basically, the load/drop callbacks can happen between the start/end of this function in a way where you get a SegmentReplicaCount{requiredAndLoadable=1, required=1, loaded=2, loadedNonHistorical=0, loading=0, dropping=0, movingTo=0, movingFrom=0}.

Essentially:

T0[Coordinator]: enter prepareCurrentServers()
T1[Coordinator]: Server[B] completed request[MOVE_TO] on segment[S] with status[SUCCESS]
T2[Coordinator]: Dropping segment [S] from server[A]
T3[A]: Completely removing segment[S] in [30,000]ms.
T4[Coordinator]: Server[A] completed request[DROP] on segment[S] with status[SUCCESS].
T5[Coordinator]: exit prepareCurrentServers()
T6[Coordinator]: enter prepareCluster()
T7[Coordinator]: exit prepareCluster()
T8[Coordinator]: enter initReplicaCounts()
T9[Coordinator]: exit initReplicaCounts()
T10[Coordinator]: Segment S replica count is SegmentReplicaCount{requiredAndLoadable=1, required=1, loaded=2, loadedNonHistorical=0, loading=0, dropping=0, movingTo=0, movingFrom=0}
T11[Coordinator]: Dropping segment [S] from server[B]

I think the segment replica counts not being updated after every duty is fine. I think what's happening is that the server loop in prepareCurrentServers reads the servers in a state where LOAD has persisted in the server view (2x loaded) but the DROP has not materialized yet in the view. I thought this would get picked up in the queuedSegments load queue peon (and decremented in dropping), but I think since the DROP callback returns – and clears the entry from the old queuedSegments – before prepareCluster has a chance to copy over the queued action to new queuedSegments. Hence, you are left in a weird state with a "valid" queue but an invalid load state. In other words, I think we need to somehow synchronize callbacks with this prepareCurrentServers and prepareCluster.

jtuglu1 avatar Dec 02 '25 22:12 jtuglu1