[SPARK-47702][CORE] Remove Shuffle service endpoint from the locations list when RDD block is removed form a node.
What changes were proposed in this pull request?
Credit to @maheshk114 for the initial investigation and the fix.
This PR fix a bug where the shuffle service's ID is kept among the block location list at the removing of a RDD block from a node. Before this change StorageLevel.NONE is used to notify about the block remove which causes the block manager master ignoring the update of the locations for shuffle service's IDs (for details please see the method BlockManagerMasterEndpoint#updateBlockInfo() and keep in mind StorageLevel.NONE.useDisk is false). But after this change only the replication count is set to 0 to notify the block remove so StorageLevel#isValid is still false but storageLevel.useDisk is kept as true this way the the shuffle service's ID will be removed from the block location list.
Why are the changes needed?
If the block location is not updated properly, then tasks fails with fetch failed exception. The tasks will try to read the RDD blocks from a node using external shuffle service. The read will fail, if the node is already decommissioned.
WARN BlockManager [Executor task launch worker for task 25.0 in stage 6.0 (TID 1567)]: Failed to fetch remote block rdd_5_25 from BlockManagerId(4, vm-92303839, 7337, None) (failed attempt 1)
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:103)
at org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(BlockManager.scala:1155)
at org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(BlockManager.scala:1099)
at scala.Option.orElse(Option.scala:447)
at org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:1099)
at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:1045)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1264)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1326)
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added a new UT.
Was this patch authored or co-authored using generative AI tooling?
No
This is an updated version of https://github.com/apache/spark/pull/45836
cc @mridulm
cc @LuciferYang
Merged into master and branch-3.5. Thanks @attilapiros
@attilapiros I have reverted the merge on branch-3.5 because it was causing compilation failures. If you have the time, please submit an independent pr for branch-3.5. Thanks ~
@LuciferYang
Opened a new PR for branch-3.5: https://github.com/apache/spark/pull/48357