PageRank causes java.util.NoSuchElementException
When running PageRank on a cluster, sometimes I hit a NoSuchElementException that's caused somewhere in VertexSetRDD. Full stack trace and command below. The line numbers may be slightly off due to debugging printlns.
Command:
/root/graphx/run-example org.apache.spark.graph.Analytics spark://ec2-54-224-159-106.compute-1.amazonaws.com:7077 pagerank hdfs://ec2-54-224-159-106.compute-1.amazonaws.com:9000/soc-LiveJournal1.txt --numIter=10 --numEPart=128
Stack Trace:
java.util.NoSuchElementException: End of stream
at org.apache.spark.util.NextIterator.next(NextIterator.scala:83)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29)
at org.apache.spark.graph.VertexSetRDD$$anonfun$8.apply(VertexSetRDD.scala:314)
at org.apache.spark.graph.VertexSetRDD$$anonfun$8.apply(VertexSetRDD.scala:313)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.ZippedRDD.compute(ZippedRDD.scala:64)
at org.apache.spark.graph.VertexSetRDD.compute(VertexSetRDD.scala:149)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:159)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:212)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Interestingly, https://github.com/amplab/graphx/pull/51 seems to fix the issue.
simple code is less buggy :)
Hmm, is that error in the master branch? Those line numbers don't seem to "line-up." I guess for now maybe it makes sense to merge #51 since it is simpler.
I'm running into this issue in the latest version built off of #132
The code I'm running is pretty simple:
val input = sc.sequenceFile[VectorWritable, VectorWritable](inputPath, classOf[VectorWritable], classOf[VectorWritable])
// not even parsing the vectors, just making some big graph
val edges = input.flatMap {
case (vec1, vec2) =>
Seq(Edge(Random.nextLong, Random.nextLong, 1), Edge(Random.nextLong(),Random.nextLong(), 1))
}
val g = Graph.fromEdges(edges, 1)
val cc = ConnectedComponents.run(g)
cc.vertices.count()
The error I see is this: 4/01/09 10:38:46 WARN scheduler.TaskSetManager: Lost TID 400 (task 4.0:0) 14/01/09 10:38:46 WARN scheduler.TaskSetManager: Loss was due to java.util.NoSuchElementException java.util.NoSuchElementException: End of stream at org.apache.spark.util.NextIterator.next(NextIterator.scala:83) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29) at org.apache.spark.graph.impl.RoutingTable$$anonfun$1.apply(RoutingTable.scala:34) at org.apache.spark.graph.impl.RoutingTable$$anonfun$1.apply(RoutingTable.scala:33) at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:449) at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:449) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:242) at org.apache.spark.rdd.RDD.iterator(RDD.scala:231) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662)
This is running on a Hadoop 2.2.0 cluster with YARN 2.2.0. I've previously run against the same data set with Bagel and it works great.
Just to make it simpler you can change the input to something like: val input = sc.parallelize(1 to 100000000, 400) and change the flatMap input appropriately and see the same error.
edit: Also, oddly I can run it with say 11 splits on 10 works and it goes through. I see the same End of Stream error in the logs but it just fails that tasks and keeps going.
Thanks for the report. I haven't yet been able to reproduce this. How many cores does Spark have in your configuration?
@ankurdave I've tried it with 4-8 cores. I can try with 1 if you think it would help pin down what's going on.
edit: Just to be clear, I mean 10-20 worker nodes with 4-8 cores each. (the machines have 24 cores each though).