Segment Rebalance Race
I've identified a race in segment movement which causes temporary segment unavailability (and thus partial query results).
Segment load/drop callbacks are racing with prepareCurrentServers.
Consider the following scenario: Coordinator is moving segment S from server A to server B. S has 1x replication.
Basically, the load/drop callbacks can happen between the start/end of this function in a way where you get a
SegmentReplicaCount{requiredAndLoadable=1, required=1, loaded=2, loadedNonHistorical=0, loading=0, dropping=0, movingTo=0, movingFrom=0}.
Essentially:
T0[Coordinator]: enter prepareCurrentServers()
T1[Coordinator]: Server[B] completed request[MOVE_TO] on segment[S] with status[SUCCESS]
T2[Coordinator]: Dropping segment [S] from server[A]
T3[A]: Completely removing segment[S] in [30,000]ms.
T4[Coordinator]: Server[A] completed request[DROP] on segment[S] with status[SUCCESS].
T5[Coordinator]: exit prepareCurrentServers()
T6[Coordinator]: enter prepareCluster()
T7[Coordinator]: exit prepareCluster()
T8[Coordinator]: enter initReplicaCounts()
T9[Coordinator]: exit initReplicaCounts()
T10[Coordinator]: Segment S replica count is SegmentReplicaCount{requiredAndLoadable=1, required=1, loaded=2, loadedNonHistorical=0, loading=0, dropping=0, movingTo=0, movingFrom=0}
T11[Coordinator]: Dropping segment [S] from server[B]
I think what's happening is that the server loop in prepareCurrentServers() reads the servers in a state where LOAD has persisted in the server view (2x loaded) but the DROP has not materialized yet in the view. This causes loaded=2. Then, I thought the in-flight DROP (since it hasn't materialized in the view) would get picked up in the queuedSegments load queue peons (and show up as dropping=1), but I think since the DROP callback returns – and clears the entry from the old queuedSegments load queue peons – before prepareCluster() has a chance to copy over the queued action to new queuedSegments, we lose that important bit of information. Hence, you are left in a weird state with a "valid" queue but an invalid load state. In other words, I think we need to somehow synchronize callbacks with this prepareCurrentServers() and prepareCluster().
Affected Version
All recent Druid versions.
@jtuglu1 , thanks for reporting this!
IIUC, there are two parts to the problem.
A
The issue is that there is a momentary lapse in time between when server B loads the segment (and it appears as loaded in accounting) and when the callback for dropping on server A occurs.
I think this may be true but more from the Broker's perspective i.e. it is possible that the Broker sees the drop from A and then the load on B, causing the segment to be unavailable for a brief period. Since the Coordinator is not aware of the inventory of the individual Brokers, there isn't much we can do here.
Generally, the Coordinator's own inventory is meant to serve as a proxy for what the Brokers see. And if Coordinator's inventory does not have S on B, then Coordinator wouldn't decide to remove the segment from B anyway. On the contrary, it might try to load S somewhere (A or B or maybe even C) leading to over-replication in the next cycle and then another drop from somewhere.
But I assume this is not the case discussed here.
Solution
Even though this is not the bug reported in this ticket, there is room for improvement (or atleast an alternate strategy).
We could delay the drop from A until Coordinator is sure that B has loaded S (this would save the extra work which sometimes ends up nullifying the move from A to B by reloading the segment on A 😅 ).
- An easy way to do that would be to leave S in
segmentsMarkedToDropof peon A and do nothing in the callback ofSegmentLoadQueueManager. - When S shows up on B and we recognize it as over-replicated (in a later duty run), we prioritize drop of the extra S from A (since it is already marked to drop from A).
- We might also need to tweak the calculation of replicas so that
movingFromsegments also count towards over-replication. - I suppose the drawback is that segment balancing would now seem to be slow as the segments would stick around on A until the next coordinator cycle (typically 1 minute).
B
Because the projectedReplicas (calculated here) is: loadedNotDropping + loading - max(0, moveFrom - moveTo) = 2 (both A/B are loaded at this point) + 0 - max(0, 1 - 1) = 2, this causes the drop to be subsequently scheduled on B.
This can probably play out as follows:
- T1: Duty run starts
- T1: Initialize
ServerHolderfor A - T2:
SegmentLoadQueueManagercallspeonA.unmarkSegmentToDrop(s) - T1: Considers segment S to have status
loadedon A - T2:
SegmentLoadQueueManagercallspeonA.dropSegment(s) - T1: Runs rules assuming S is loaded on both A and B
This seems closer to what you are witnessing. Just to clarify though, the moveFrom and moveTo counts would both be 0 in this case.
Solution:
I think we can fix this one by:
- Using a
synchronized(lock)insideHttpLoadQueuePeon.getSegmentsMarkedToDrop() - Not calling
peonA.unmarkSegmentToDrop()explicitly in SegmentLoadQueueManager#L128. - Instead, perform
segmentsMarkedToDrop.remove(s)insideHttpLoadQueuePeon.dropSegment()inside the synchronized block.
@jtuglu1 , please let me know if either of these solutions works for the issue that you have encountered. I think solution B is something we should do anyway as it makes the whole flow more thread-safe.
But I assume this is not the case discussed here.
Correct. That issue is outlined here.
Just to clarify though, the moveFrom and moveTo counts would both be 0 in this case.
Hmm – I'm not sure about this. Printing the server B action in the load callback routine actually yields MOVE_TO (basically the action has not been removed yet from the queuedSegments). This would expect me to believe that server A is also MOVE_FROM (since we have not yet scheduled the drop), since otherwise we would not get a zero result from that computation.
I think we can fix this one by:
While I think this fixes one potential problem, I'm not sure this fully fixes the issue. This is because I think there's some vagueness to the following question(s):
- When exactly the server B goes from
MOVE_TOtonullaction state (e.g. this line) w.r.t the load callback being issued. If we want to keep the current accounting method, we need to make sure B's action state change and A's state change fromMOVE_FROMtoDROPare atomic. This way, the accounting is not affected.
Even if we synchronize the drop logic above, the accounting is set up where if server A/B states are not updated atomically (A goes to DROP, B goes to null and loaded increases +1), we risk invalid accounting and a race.
Printing the server B action in the load callback routine actually yields MOVE_TO (basically the action has not been removed yet from the queuedSegments)
This might be true, but ServerHolder for B is not relevant in the next duty run since each run of the duty uses a fresh set of ServerHolder instances (which is a snapshot of the time the duty run starts). This is why the callback does not use the ServerHolder and refers to the peons directly (peon objects remain in scope as long as the underlying server is visible to the Coordinator), since the ServerHolder would have a stale snapshot state by the time the callback actually gets run.
The race condition that you have encountered, is it happening within the same duty run or across two duty runs?
The race condition that you have encountered, is it happening within the same duty run or across two duty runs?
I haven't examined all occurrences (there are quite a few), but from the ~random~ sample that I've looked at:
- Move initiatied in HistoricalManagementDuties run 1 (load on B initiated)
- HistoricalManagementDuties run 1 ends
- HistoricalManagementDuties run 2 starts
- Load callback on B, and yes, B's ServerHolder is still MOVE_TO (
serverB.getActionOnSegment(segment)). - Drop race happens.
- HistoricalManagementDuties run 2 ends.
Think I've RC'd at least one race:
Basically the issue is:
- Duty group starts
- build() is called once (callsite)
- This time segmentAssigner != null || loadQueueManager == null is false.
- build() is called again (callsite)
- This time segmentAssigner != null || loadQueueManager == null is true, so it critically DOESN'T update the serverManager for the RunRules (since the parameters are returned + passed to the next rule chain in the duty group).
basically, in cases where the load callback happens after the first build() but before duty completion, you get your race.
I had some time to do a bit more digging and I think there's something a bit different going on in some cases. The callbacks are racing with prepareCurrentServers. Basically, the load/drop callbacks can happen between the start/end of this function in a way where you get a SegmentReplicaCount{requiredAndLoadable=1, required=1, loaded=2, loadedNonHistorical=0, loading=0, dropping=0, movingTo=0, movingFrom=0}.
Essentially:
T0[Coordinator]: enter prepareCurrentServers()
T1[Coordinator]: Server[B] completed request[MOVE_TO] on segment[S] with status[SUCCESS]
T2[Coordinator]: Dropping segment [S] from server[A]
T3[A]: Completely removing segment[S] in [30,000]ms.
T4[Coordinator]: Server[A] completed request[DROP] on segment[S] with status[SUCCESS].
T5[Coordinator]: exit prepareCurrentServers()
T6[Coordinator]: enter prepareCluster()
T7[Coordinator]: exit prepareCluster()
T8[Coordinator]: enter initReplicaCounts()
T9[Coordinator]: exit initReplicaCounts()
T10[Coordinator]: Segment S replica count is SegmentReplicaCount{requiredAndLoadable=1, required=1, loaded=2, loadedNonHistorical=0, loading=0, dropping=0, movingTo=0, movingFrom=0}
T11[Coordinator]: Dropping segment [S] from server[B]
I think the segment replica counts not being updated after every duty is fine. I think what's happening is that the server loop in prepareCurrentServers reads the servers in a state where LOAD has persisted in the server view (2x loaded) but the DROP has not materialized yet in the view. I thought this would get picked up in the queuedSegments load queue peon (and decremented in dropping), but I think since the DROP callback returns – and clears the entry from the old queuedSegments – before prepareCluster has a chance to copy over the queued action to new queuedSegments. Hence, you are left in a weird state with a "valid" queue but an invalid load state. In other words, I think we need to somehow synchronize callbacks with this prepareCurrentServers and prepareCluster.