spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-47702][CORE] Remove Shuffle service endpoint from the locations list when RDD block is removed form a node.

Open attilapiros opened this issue 1 year ago • 2 comments

What changes were proposed in this pull request?

Credit to @maheshk114 for the initial investigation and the fix.

This PR fix a bug where the shuffle service's ID is kept among the block location list at the removing of a RDD block from a node. Before this change StorageLevel.NONE is used to notify about the block remove which causes the block manager master ignoring the update of the locations for shuffle service's IDs (for details please see the method BlockManagerMasterEndpoint#updateBlockInfo() and keep in mind StorageLevel.NONE.useDisk is false). But after this change only the replication count is set to 0 to notify the block remove so StorageLevel#isValid is still false but storageLevel.useDisk is kept as true this way the the shuffle service's ID will be removed from the block location list.

Why are the changes needed?

If the block location is not updated properly, then tasks fails with fetch failed exception. The tasks will try to read the RDD blocks from a node using external shuffle service. The read will fail, if the node is already decommissioned.

WARN BlockManager [Executor task launch worker for task 25.0 in stage 6.0 (TID 1567)]: Failed to fetch remote block rdd_5_25 from BlockManagerId(4, vm-92303839, 7337, None) (failed attempt 1)
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:103)
	at org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(BlockManager.scala:1155)
	at org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(BlockManager.scala:1099)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:1099)
	at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:1045)
	at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1264)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1326)

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added a new UT.

Was this patch authored or co-authored using generative AI tooling?

No

attilapiros avatar Aug 15 '24 22:08 attilapiros

This is an updated version of https://github.com/apache/spark/pull/45836

attilapiros avatar Aug 15 '24 22:08 attilapiros

cc @mridulm

attilapiros avatar Aug 15 '24 22:08 attilapiros

cc @LuciferYang

attilapiros avatar Oct 01 '24 23:10 attilapiros

Merged into master and branch-3.5. Thanks @attilapiros

LuciferYang avatar Oct 04 '24 15:10 LuciferYang

@attilapiros I have reverted the merge on branch-3.5 because it was causing compilation failures. If you have the time, please submit an independent pr for branch-3.5. Thanks ~

LuciferYang avatar Oct 04 '24 17:10 LuciferYang

@LuciferYang

Opened a new PR for branch-3.5: https://github.com/apache/spark/pull/48357

attilapiros avatar Oct 04 '24 21:10 attilapiros