RemoteShuffleService icon indicating copy to clipboard operation
RemoteShuffleService copied to clipboard

Disk damage causes failure

Open Lobo2008 opened this issue 3 years ago • 10 comments

Hi,RSS_1 runs with -rootDir /data_01/rss-data/ ... RSS_i runs with -rootDir /data_i/rss-data/ . When a disk eg./data_03 is damaged, Spark app failed.

The code shows that spark.shuffle.rss.replicas (default=1) is used to avoid issues like this.But why the app still failed

Is decreasing spark.shuffle.rss.serverRatio (default=20) and increasing spark.shuffle.rss.replicas(eg. 2 ) will work? Or what should I do when one or some of these disks damage ?

# exception
com.uber.rss.exceptions.RssNetworkException: writeRowGroup: hit exception writing heading bytes 104, DataBlockSyncWriteClient 32 [/xxxx:50682 -> rss03.xxxx.net/xxxx:12222 (rss03.xxxx.net)], SocketException (Connection reset)
	at com.uber.rss.clients.DataBlockSyncWriteClient.writeData(DataBlockSyncWriteClient.java:133)
	at com.uber.rss.clients.PlainShuffleDataSyncWriteClient.writeDataBlock(PlainShuffleDataSyncWriteClient.java:40)
	at 
 // code of the configs
  val replicas: ConfigEntry[Int] =
    ConfigBuilder("spark.shuffle.rss.replicas")
      .doc("number of replicas for replicated shuffle client.")
      .intConf
      .createWithDefault(1)
  val serverRatio: ConfigEntry[Int] =
    ConfigBuilder("spark.shuffle.rss.serverRatio")
      .doc("how many executors mapping to one shuffle server.")
      .intConf
      .createWithDefault(20)

Lobo2008 avatar Oct 28 '22 06:10 Lobo2008

Would you try spark.shuffle.rss.replicas=2?

hiboyang avatar Oct 30 '22 06:10 hiboyang

Would you try spark.shuffle.rss.replicas=2?

Thanks @hiboyang , it works ! Seems replicas=1 actually means the original data itself, no extra replication. replicas=2 is ok.

Lobo2008 avatar Oct 31 '22 02:10 Lobo2008

Hi @hiboyang , I set replicas=2 but another exception is thrown:

When mapper-A sends data to StreamServer5 and a replication to StreamServer3

  • if I kill StreamServer5,mapper-A will use the StreamServer3 as expected.
  • but if the rootDir belongs to StreamServer5 suddenly become touch: setting times of ‘/data05/rss-data/’: Read-only file system, since the StreamServer5 still alive, mapper-A will still retry to send to SteamServer5 for another 3 attempts until failed.

image

Lobo2008 avatar Nov 03 '22 07:11 Lobo2008

Hi @Lobo2008, this is good testing! Looks like RSS needs to support this scenario where rootDir becomes unavailable. RSS client should mark that server as failed in that case and switch to another server if spark.shuffle.rss.replicas larger than 1. This should be a TODO work for RSS.

hiboyang avatar Nov 03 '22 17:11 hiboyang

By the way @Lobo2008 , want to double check, would you expand +details for the first block of exceptions to see whether there is more clue?

Screen Shot 2022-11-03 at 10 53 00 AM

hiboyang avatar Nov 03 '22 17:11 hiboyang

Hi @hiboyang I post 2 apps for different exceptions but both failed for StreamServer5

application-1 failed on job10-Stage14, 26 tasks ( including retry attempts) failed for the same reason, the only difference is hit exception writing heading|data bytes xxx

# hit exception writing heading bytes xxx
com.uber.rss.exceptions.RssNetworkException: writeRowGroup: hit exception writing heading bytes 124, DataBlockSyncWriteClient 106 [/10.203.89.173:46914 -> rss05.MY_IP.net/10.203.54.83:12202 (rss05.MY_IP.net)], SocketException (Broken pipe)
	at com.uber.rss.clients.DataBlockSyncWriteClient.writeData(DataBlockSyncWriteClient.java:133)
	at com.uber.rss.clients.PlainShuffleDataSyncWriteClient.writeDataBlock(PlainShuffleDataSyncWriteClient.java:40)
	at com.uber.rss.clients.ServerIdAwareSyncWriteClient.writeDataBlock(ServerIdAwareSyncWriteClient.java:73)
	at com.uber.rss.clients.ReplicatedWriteClient.lambda$writeDataBlock$2(ReplicatedWriteClient.java:82)
	at com.uber.rss.clients.ReplicatedWriteClient.runAllActiveClients(ReplicatedWriteClient.java:154)
	at com.uber.rss.clients.ReplicatedWriteClient.writeDataBlock(ReplicatedWriteClient.java:78)
	at com.uber.rss.clients.MultiServerSyncWriteClient.writeDataBlock(MultiServerSyncWriteClient.java:124)
	at com.uber.rss.clients.LazyWriteClient.writeDataBlock(LazyWriteClient.java:99)
	at org.apache.spark.shuffle.RssShuffleWriter$$anonfun$sendDataBlocks$1.apply(RssShuffleWriter.scala:166)
	at org.apache.spark.shuffle.RssShuffleWriter$$anonfun$sendDataBlocks$1.apply(RssShuffleWriter.scala:161)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.shuffle.RssShuffleWriter.sendDataBlocks(RssShuffleWriter.scala:161)
	at org.apache.spark.shuffle.RssShuffleWriter.write(RssShuffleWriter.scala:119)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:415)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1403)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:421)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Broken pipe
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:141)
	at com.uber.rss.clients.DataBlockSyncWriteClient.writeData(DataBlockSyncWriteClient.java:131)
	... 22 more


# hit exception writing data yyy
com.uber.rss.exceptions.RssNetworkException: writeRowGroup: hit exception writing data 279, DataBlockSyncWriteClient 279 [/10.203.70.110:53497 -> rss05.MY_IP.net/10.203.54.83:12202 (rss05.MY_IP.net)], SocketException (Broken pipe)
	at com.uber.rss.clients.DataBlockSyncWriteClient.writeData(DataBlockSyncWriteClient.java:141)
	at com.uber.rss.clients.PlainShuffleDataSyncWriteClient.writeDataBlock(PlainShuffleDataSyncWriteClient.java:40)
	at com.uber.rss.clients.ServerIdAwareSyncWriteClient.writeDataBlock(ServerIdAwareSyncWriteClient.java:73)
	at com.uber.rss.clients.ReplicatedWriteClient.lambda$writeDataBlock$2(ReplicatedWriteClient.java:82)
	at com.uber.rss.clients.ReplicatedWriteClient.runAllActiveClients(ReplicatedWriteClient.java:154)
	at com.uber.rss.clients.ReplicatedWriteClient.writeDataBlock(ReplicatedWriteClient.java:78)
	at com.uber.rss.clients.MultiServerSyncWriteClient.writeDataBlock(MultiServerSyncWriteClient.java:124)
	at com.uber.rss.clients.LazyWriteClient.writeDataBlock(LazyWriteClient.java:99)
	at org.apache.spark.shuffle.RssShuffleWriter$$anonfun$sendDataBlocks$1.apply(RssShuffleWriter.scala:166)
	at org.apache.spark.shuffle.RssShuffleWriter$$anonfun$sendDataBlocks$1.apply(RssShuffleWriter.scala:161)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.shuffle.RssShuffleWriter.sendDataBlocks(RssShuffleWriter.scala:161)
	at org.apache.spark.shuffle.RssShuffleWriter.write(RssShuffleWriter.scala:119)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:415)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1403)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:421)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Broken pipe
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
	at com.uber.rss.util.ByteBufUtils.readBytesToStream(ByteBufUtils.java:73)
	at com.uber.rss.clients.DataBlockSyncWriteClient.writeData(DataBlockSyncWriteClient.java:139)
	... 22 more

application-2 failed on job3-stage4, 1 task for 4 attempts :

# one of the four
com.uber.rss.exceptions.RssFinishUploadException
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
	at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
	at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
	at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
	at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
	at com.uber.rss.clients.MultiServerSyncWriteClient.finishUpload(MultiServerSyncWriteClient.java:131)
	at com.uber.rss.clients.LazyWriteClient.finishUpload(LazyWriteClient.java:116)
	at org.apache.spark.shuffle.RssShuffleWriter.write(RssShuffleWriter.scala:123)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:415)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1403)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:421)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: com.uber.rss.exceptions.RssFinishUploadException: Failed to finish upload to server 15, DataBlockSyncWriteClient 144 [/10.203.35.101:16558 -> rss05.MY_IP.net/10.203.54.83:12202 (rss05.MY_IP.net)], RssNetworkException (writeMessageLengthAndContent failed: DataBlockSyncWriteClient 144 [/10.203.35.101:16558 -> rss05.MY_IP.net/10.203.54.83:12202 (rss05.MY_IP.net.net)], SocketException (Broken pipe)). If the network is good, this error may indicate your shuffle data exceeds the server side limit. This shuffle client has written 77 bytes.
	at com.uber.rss.clients.DataBlockSyncWriteClient.finishUpload(DataBlockSyncWriteClient.java:165)
	at com.uber.rss.clients.ShuffleDataSyncWriteClientBase.finishUpload(ShuffleDataSyncWriteClientBase.java:89)
	at com.uber.rss.clients.ServerIdAwareSyncWriteClient.finishUpload(ServerIdAwareSyncWriteClient.java:78)
	at com.uber.rss.clients.ReplicatedWriteClient.lambda$finishUpload$3(ReplicatedWriteClient.java:88)
	at com.uber.rss.clients.ReplicatedWriteClient.runAllActiveClients(ReplicatedWriteClient.java:154)
	at com.uber.rss.clients.ReplicatedWriteClient.finishUpload(ReplicatedWriteClient.java:88)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
	at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: com.uber.rss.exceptions.RssNetworkException: writeMessageLengthAndContent failed: DataBlockSyncWriteClient 144 [/10.203.35.101:16558 -> rss05.MY_IP.net/10.203.54.83:12202 (rss05.MY_IP.net)], SocketException (Broken pipe)
	at com.uber.rss.clients.ClientBase.writeMessageLengthAndContent(ClientBase.java:234)
	at com.uber.rss.clients.ClientBase.writeControlMessageNotWaitResponseStatus(ClientBase.java:248)
	at com.uber.rss.clients.ClientBase.writeControlMessageAndWaitResponseStatus(ClientBase.java:252)
	at com.uber.rss.clients.DataBlockSyncWriteClient.finishUpload(DataBlockSyncWriteClient.java:159)
	... 14 more
Caused by: java.net.SocketException: Broken pipe
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:141)
	at com.uber.rss.clients.ClientBase.writeMessageLengthAndContent(ClientBase.java:228)
	... 17 more

Lobo2008 avatar Nov 04 '22 03:11 Lobo2008

Thanks @Lobo2008 for the debugging info! I checked the source code again. The code in RSS is supposed to try another server if hitting error with one server including disk write failure. Seems that part is not working as expected. Let's see whether Uber folks have environment to test and debug this.

hiboyang avatar Nov 04 '22 17:11 hiboyang

Thanks @Lobo2008 for the debugging info! I checked the source code again. The code in RSS is supposed to try another server if hitting error with one server including disk write failure. Seems that part is not working as expected. Let's see whether Uber folks have environment to test and debug this.

Thanks @hiboyang , one more question:

We have 55 StreamServers, and setspark.shuffle.rss.serverRatio=5. if application-i uses 300 executors, which meansselectedServerCount=300/5=60.

Since 60 > 55, this app will select all the 55 StreamServers, thus no matter how Spark itself retries the job、stage、task, this app is destined to be failed if one disk crashes. Is this right?

Lobo2008 avatar Nov 07 '22 03:11 Lobo2008

Yes, if that replicas setting not work for you.

Another option: you could use spark.shuffle.rss.excludeHosts setting to exclude that server with bad disk.

hiboyang avatar Nov 09 '22 02:11 hiboyang

Yes, if that replicas setting not work for you.

Another option: you could use spark.shuffle.rss.excludeHosts setting to exclude that server with bad disk.

Thanks but we may not need this setting, gonna run a monitor service that will kill the rss if disk crashes for now.

Lobo2008 avatar Nov 11 '22 10:11 Lobo2008