RemoteShuffleService icon indicating copy to clipboard operation
RemoteShuffleService copied to clipboard

Why this cannot support speculation

Open boneanxs opened this issue 5 years ago • 5 comments

Is there any spark issues that I can track this random task attempt ids problem? I test a lot of cases, but I can not reproduce this. Thanks in advance. https://github.com/uber/RemoteShuffleService/blob/7220c23694e0175e01719621707680a2718173cf/src/main/scala/org/apache/spark/shuffle/RssShuffleManager.scala#L79

boneanxs avatar Nov 24 '20 03:11 boneanxs

It may already work with speculation in latest version. Do you get any issue when running with speculation?

hiboyang avatar Jan 08 '21 06:01 hiboyang

Presumption is not supported at this time. Turn on presumption for possible data duplication. When will Spark conjecture be supported?

shikuiye avatar May 18 '21 14:05 shikuiye

@hiboyang Is there now speculation implemented in Uber's internal RSS?

shikuiye avatar May 18 '21 15:05 shikuiye

It may already work with speculation in latest version. Do you get any issue when running with speculation?

Hi @hiboyang ,seems speculation is still not supported yet, app always throw exception when I set spark.shuffle.service.enabled=true

spark 2.4.5 + rss master branch

22/09/27 16:55:25 ERROR FileFormatWriter: Aborting job 06b0682a-c848-4b0e-a78c-754774e500ff.
org.apache.spark.SparkException: Adaptive execution failed due to stage materialization failures.
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.org$apache$spark$sql$execution$adaptive$AdaptiveSparkPlanExec$$cleanUpAndThrowException(AdaptiveSparkPlanExec.scala:540)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$anonfun$getFinalPhysicalPlan$1$$anonfun$apply$4.apply(AdaptiveSparkPlanExec.scala:169)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$anonfun$getFinalPhysicalPlan$1$$anonfun$apply$4.apply(AdaptiveSparkPlanExec.scala:156)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$anonfun$getFinalPhysicalPlan$1.apply(AdaptiveSparkPlanExec.scala:156)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$anonfun$getFinalPhysicalPlan$1.apply(AdaptiveSparkPlanExec.scala:142)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:776)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:142)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.doExecute(AdaptiveSparkPlanExec.scala:258)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:163)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:159)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:187)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:184)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:159)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:143)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:163)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:159)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:187)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:184)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:159)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:79)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:126)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
	at main.scala.MyQuery$.outputDF(MyQuery.scala:42)
	at main.scala.MyQuery$.executeQuery(MyQuery.scala:57)
	at main.scala.MyQuery$.main(MyQuery.scala:105)
	at main.scala.MyQuery.main(MyQuery.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:852)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:927)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:936)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.SparkException: Early failed query stage found: ShuffleQueryStage 0
+- Exchange hashpartitioning(l_returnflag#51, l_linestatus#52, 401), true
   +- *(1) HashAggregate(keys=[l_returnflag#51, l_linestatus#52], functions=[partial_sum(l_quantity#47), partial_sum(l_extendedprice#48), partial_sum(UDF(l_extendedprice#48, l_discount#49)), partial_sum(UDF(UDF(l_extendedprice#48, l_discount#49), l_tax#50)), partial_avg(l_quantity#47), partial_avg(l_extendedprice#48), partial_avg(l_discount#49), partial_count(1)], output=[l_returnflag#51, l_linestatus#52, sum#262, sum#263, sum#264, sum#265, sum#266, count#267L, sum#268, count#269L, sum#270, count#271L, count#272L])
      +- *(1) Project [l_quantity#47, l_extendedprice#48, l_discount#49, l_tax#50, l_returnflag#51, l_linestatus#52]
         +- *(1) Filter (isnotnull(l_shipdate#53) && (l_shipdate#53 <= 1998-09-02))
            +- *(1) SerializeFromObject [assertnotnull(input[0, main.scala.Lineitem, true]).l_orderkey AS l_orderkey#43L, assertnotnull(input[0, main.scala.Lineitem, true]).l_partkey AS l_partkey#44L, assertnotnull(input[0, main.scala.Lineitem, true]).l_suppkey AS l_suppkey#45L, assertnotnull(input[0, main.scala.Lineitem, true]).l_linenumber AS l_linenumber#46L, assertnotnull(input[0, main.scala.Lineitem, true]).l_quantity AS l_quantity#47, assertnotnull(input[0, main.scala.Lineitem, true]).l_extendedprice AS l_extendedprice#48, assertnotnull(input[0, main.scala.Lineitem, true]).l_discount AS l_discount#49, assertnotnull(input[0, main.scala.Lineitem, true]).l_tax AS l_tax#50, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, main.scala.Lineitem, true]).l_returnflag, true, false) AS l_returnflag#51, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, main.scala.Lineitem, true]).l_linestatus, true, false) AS l_linestatus#52, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, main.scala.Lineitem, true]).l_shipdate, true, false) AS l_shipdate#53, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, main.scala.Lineitem, true]).l_commitdate, true, false) AS l_commitdate#54, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, main.scala.Lineitem, true]).l_receiptdate, true, false) AS l_receiptdate#55, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, main.scala.Lineitem, true]).l_shipinstruct, true, false) AS l_shipinstruct#56, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, main.scala.Lineitem, true]).l_shipmode, true, false) AS l_shipmode#57, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, main.scala.Lineitem, true]).l_comment, true, false) AS l_comment#58]
               +- Scan[obj#42]

	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$anonfun$getFinalPhysicalPlan$1$$anonfun$apply$4.apply(AdaptiveSparkPlanExec.scala:167)
	... 51 more
Caused by: com.uber.rss.exceptions.RssException: Do not support speculation in Remote Shuffle Service
	at org.apache.spark.shuffle.RssShuffleManager.registerShuffle(RssShuffleManager.scala:83)
	at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:93)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:294)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency$lzycompute(ShuffleExchangeExec.scala:77)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.shuffleDependency(ShuffleExchangeExec.scala:76)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:66)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:62)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:148)
	at org.apache.spark.sql.execution.adaptive.QueryStageExec$$anonfun$materialize$1.apply(QueryStageExec.scala:77)
	at org.apache.spark.sql.execution.adaptive.QueryStageExec$$anonfun$materialize$1.apply(QueryStageExec.scala:77)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:187)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:184)
	at org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:76)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$anonfun$getFinalPhysicalPlan$1$$anonfun$apply$4.apply(AdaptiveSparkPlanExec.scala:158)
	... 51 more

Lobo2008 avatar Sep 27 '22 09:09 Lobo2008

This is caused by this line of code: https://github.com/uber/RemoteShuffleService/blob/607358d97dc17e1d4e8d29b145e43248ac9ef6dd/src/main/scala/org/apache/spark/shuffle/RssShuffleManager.scala#L83

Please feel free to remove it and submit a PR. It was added in very early version of RSS, and now should be safe to remove it.

hiboyang avatar Sep 28 '22 01:09 hiboyang