Disk damage causes failure
Hi,RSS_1 runs with -rootDir /data_01/rss-data/ ... RSS_i runs with -rootDir /data_i/rss-data/ .
When a disk eg./data_03 is damaged, Spark app failed.
The code shows that spark.shuffle.rss.replicas (default=1) is used to avoid issues like this.But why the app still failed
Is decreasing spark.shuffle.rss.serverRatio (default=20) and increasing spark.shuffle.rss.replicas(eg. 2 ) will work?
Or what should I do when one or some of these disks damage ?
# exception
com.uber.rss.exceptions.RssNetworkException: writeRowGroup: hit exception writing heading bytes 104, DataBlockSyncWriteClient 32 [/xxxx:50682 -> rss03.xxxx.net/xxxx:12222 (rss03.xxxx.net)], SocketException (Connection reset)
at com.uber.rss.clients.DataBlockSyncWriteClient.writeData(DataBlockSyncWriteClient.java:133)
at com.uber.rss.clients.PlainShuffleDataSyncWriteClient.writeDataBlock(PlainShuffleDataSyncWriteClient.java:40)
at
// code of the configs
val replicas: ConfigEntry[Int] =
ConfigBuilder("spark.shuffle.rss.replicas")
.doc("number of replicas for replicated shuffle client.")
.intConf
.createWithDefault(1)
val serverRatio: ConfigEntry[Int] =
ConfigBuilder("spark.shuffle.rss.serverRatio")
.doc("how many executors mapping to one shuffle server.")
.intConf
.createWithDefault(20)
Would you try spark.shuffle.rss.replicas=2?
Would you try spark.shuffle.rss.replicas=2?
Thanks @hiboyang , it works ! Seems replicas=1 actually means the original data itself, no extra replication. replicas=2 is ok.
Hi @hiboyang , I set replicas=2 but another exception is thrown:
When mapper-A sends data to StreamServer5 and a replication to StreamServer3
- if I kill StreamServer5,mapper-A will use the StreamServer3 as expected.
- but if the rootDir belongs to StreamServer5 suddenly become
touch: setting times of ‘/data05/rss-data/’: Read-only file system, since the StreamServer5 still alive, mapper-A will still retry to send to SteamServer5 for another 3 attempts until failed.

Hi @Lobo2008, this is good testing! Looks like RSS needs to support this scenario where rootDir becomes unavailable. RSS client should mark that server as failed in that case and switch to another server if spark.shuffle.rss.replicas larger than 1. This should be a TODO work for RSS.
By the way @Lobo2008 , want to double check, would you expand +details for the first block of exceptions to see whether there is more clue?
Hi @hiboyang I post 2 apps for different exceptions but both failed for StreamServer5
application-1 failed on job10-Stage14, 26 tasks ( including retry attempts) failed for the same reason,
the only difference is hit exception writing heading|data bytes xxx
# hit exception writing heading bytes xxx
com.uber.rss.exceptions.RssNetworkException: writeRowGroup: hit exception writing heading bytes 124, DataBlockSyncWriteClient 106 [/10.203.89.173:46914 -> rss05.MY_IP.net/10.203.54.83:12202 (rss05.MY_IP.net)], SocketException (Broken pipe)
at com.uber.rss.clients.DataBlockSyncWriteClient.writeData(DataBlockSyncWriteClient.java:133)
at com.uber.rss.clients.PlainShuffleDataSyncWriteClient.writeDataBlock(PlainShuffleDataSyncWriteClient.java:40)
at com.uber.rss.clients.ServerIdAwareSyncWriteClient.writeDataBlock(ServerIdAwareSyncWriteClient.java:73)
at com.uber.rss.clients.ReplicatedWriteClient.lambda$writeDataBlock$2(ReplicatedWriteClient.java:82)
at com.uber.rss.clients.ReplicatedWriteClient.runAllActiveClients(ReplicatedWriteClient.java:154)
at com.uber.rss.clients.ReplicatedWriteClient.writeDataBlock(ReplicatedWriteClient.java:78)
at com.uber.rss.clients.MultiServerSyncWriteClient.writeDataBlock(MultiServerSyncWriteClient.java:124)
at com.uber.rss.clients.LazyWriteClient.writeDataBlock(LazyWriteClient.java:99)
at org.apache.spark.shuffle.RssShuffleWriter$$anonfun$sendDataBlocks$1.apply(RssShuffleWriter.scala:166)
at org.apache.spark.shuffle.RssShuffleWriter$$anonfun$sendDataBlocks$1.apply(RssShuffleWriter.scala:161)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.shuffle.RssShuffleWriter.sendDataBlocks(RssShuffleWriter.scala:161)
at org.apache.spark.shuffle.RssShuffleWriter.write(RssShuffleWriter.scala:119)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:415)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1403)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:421)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Broken pipe
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:141)
at com.uber.rss.clients.DataBlockSyncWriteClient.writeData(DataBlockSyncWriteClient.java:131)
... 22 more
# hit exception writing data yyy
com.uber.rss.exceptions.RssNetworkException: writeRowGroup: hit exception writing data 279, DataBlockSyncWriteClient 279 [/10.203.70.110:53497 -> rss05.MY_IP.net/10.203.54.83:12202 (rss05.MY_IP.net)], SocketException (Broken pipe)
at com.uber.rss.clients.DataBlockSyncWriteClient.writeData(DataBlockSyncWriteClient.java:141)
at com.uber.rss.clients.PlainShuffleDataSyncWriteClient.writeDataBlock(PlainShuffleDataSyncWriteClient.java:40)
at com.uber.rss.clients.ServerIdAwareSyncWriteClient.writeDataBlock(ServerIdAwareSyncWriteClient.java:73)
at com.uber.rss.clients.ReplicatedWriteClient.lambda$writeDataBlock$2(ReplicatedWriteClient.java:82)
at com.uber.rss.clients.ReplicatedWriteClient.runAllActiveClients(ReplicatedWriteClient.java:154)
at com.uber.rss.clients.ReplicatedWriteClient.writeDataBlock(ReplicatedWriteClient.java:78)
at com.uber.rss.clients.MultiServerSyncWriteClient.writeDataBlock(MultiServerSyncWriteClient.java:124)
at com.uber.rss.clients.LazyWriteClient.writeDataBlock(LazyWriteClient.java:99)
at org.apache.spark.shuffle.RssShuffleWriter$$anonfun$sendDataBlocks$1.apply(RssShuffleWriter.scala:166)
at org.apache.spark.shuffle.RssShuffleWriter$$anonfun$sendDataBlocks$1.apply(RssShuffleWriter.scala:161)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.shuffle.RssShuffleWriter.sendDataBlocks(RssShuffleWriter.scala:161)
at org.apache.spark.shuffle.RssShuffleWriter.write(RssShuffleWriter.scala:119)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:415)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1403)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:421)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Broken pipe
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at com.uber.rss.util.ByteBufUtils.readBytesToStream(ByteBufUtils.java:73)
at com.uber.rss.clients.DataBlockSyncWriteClient.writeData(DataBlockSyncWriteClient.java:139)
... 22 more
application-2 failed on job3-stage4, 1 task for 4 attempts :
# one of the four
com.uber.rss.exceptions.RssFinishUploadException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
at com.uber.rss.clients.MultiServerSyncWriteClient.finishUpload(MultiServerSyncWriteClient.java:131)
at com.uber.rss.clients.LazyWriteClient.finishUpload(LazyWriteClient.java:116)
at org.apache.spark.shuffle.RssShuffleWriter.write(RssShuffleWriter.scala:123)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:415)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1403)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:421)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.uber.rss.exceptions.RssFinishUploadException: Failed to finish upload to server 15, DataBlockSyncWriteClient 144 [/10.203.35.101:16558 -> rss05.MY_IP.net/10.203.54.83:12202 (rss05.MY_IP.net)], RssNetworkException (writeMessageLengthAndContent failed: DataBlockSyncWriteClient 144 [/10.203.35.101:16558 -> rss05.MY_IP.net/10.203.54.83:12202 (rss05.MY_IP.net.net)], SocketException (Broken pipe)). If the network is good, this error may indicate your shuffle data exceeds the server side limit. This shuffle client has written 77 bytes.
at com.uber.rss.clients.DataBlockSyncWriteClient.finishUpload(DataBlockSyncWriteClient.java:165)
at com.uber.rss.clients.ShuffleDataSyncWriteClientBase.finishUpload(ShuffleDataSyncWriteClientBase.java:89)
at com.uber.rss.clients.ServerIdAwareSyncWriteClient.finishUpload(ServerIdAwareSyncWriteClient.java:78)
at com.uber.rss.clients.ReplicatedWriteClient.lambda$finishUpload$3(ReplicatedWriteClient.java:88)
at com.uber.rss.clients.ReplicatedWriteClient.runAllActiveClients(ReplicatedWriteClient.java:154)
at com.uber.rss.clients.ReplicatedWriteClient.finishUpload(ReplicatedWriteClient.java:88)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: com.uber.rss.exceptions.RssNetworkException: writeMessageLengthAndContent failed: DataBlockSyncWriteClient 144 [/10.203.35.101:16558 -> rss05.MY_IP.net/10.203.54.83:12202 (rss05.MY_IP.net)], SocketException (Broken pipe)
at com.uber.rss.clients.ClientBase.writeMessageLengthAndContent(ClientBase.java:234)
at com.uber.rss.clients.ClientBase.writeControlMessageNotWaitResponseStatus(ClientBase.java:248)
at com.uber.rss.clients.ClientBase.writeControlMessageAndWaitResponseStatus(ClientBase.java:252)
at com.uber.rss.clients.DataBlockSyncWriteClient.finishUpload(DataBlockSyncWriteClient.java:159)
... 14 more
Caused by: java.net.SocketException: Broken pipe
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:141)
at com.uber.rss.clients.ClientBase.writeMessageLengthAndContent(ClientBase.java:228)
... 17 more
Thanks @Lobo2008 for the debugging info! I checked the source code again. The code in RSS is supposed to try another server if hitting error with one server including disk write failure. Seems that part is not working as expected. Let's see whether Uber folks have environment to test and debug this.
Thanks @Lobo2008 for the debugging info! I checked the source code again. The code in RSS is supposed to try another server if hitting error with one server including disk write failure. Seems that part is not working as expected. Let's see whether Uber folks have environment to test and debug this.
Thanks @hiboyang , one more question:
We have 55 StreamServers, and setspark.shuffle.rss.serverRatio=5.
if application-i uses 300 executors, which meansselectedServerCount=300/5=60.
Since 60 > 55, this app will select all the 55 StreamServers, thus no matter how Spark itself retries the job、stage、task, this app is destined to be failed if one disk crashes. Is this right?
Yes, if that replicas setting not work for you.
Another option: you could use spark.shuffle.rss.excludeHosts setting to exclude that server with bad disk.
Yes, if that replicas setting not work for you.
Another option: you could use
spark.shuffle.rss.excludeHostssetting to exclude that server with bad disk.
Thanks but we may not need this setting, gonna run a monitor service that will kill the rss if disk crashes for now.