graphx icon indicating copy to clipboard operation
graphx copied to clipboard

PageRank causes java.util.NoSuchElementException

Open dcrankshaw opened this issue 12 years ago • 7 comments

When running PageRank on a cluster, sometimes I hit a NoSuchElementException that's caused somewhere in VertexSetRDD. Full stack trace and command below. The line numbers may be slightly off due to debugging printlns.

Command:

/root/graphx/run-example org.apache.spark.graph.Analytics spark://ec2-54-224-159-106.compute-1.amazonaws.com:7077 pagerank hdfs://ec2-54-224-159-106.compute-1.amazonaws.com:9000/soc-LiveJournal1.txt --numIter=10 --numEPart=128

Stack Trace:

java.util.NoSuchElementException: End of stream
    at org.apache.spark.util.NextIterator.next(NextIterator.scala:83)
    at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29)
    at org.apache.spark.graph.VertexSetRDD$$anonfun$8.apply(VertexSetRDD.scala:314)
    at org.apache.spark.graph.VertexSetRDD$$anonfun$8.apply(VertexSetRDD.scala:313)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:84)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.ZippedRDD.compute(ZippedRDD.scala:64)
    at org.apache.spark.graph.VertexSetRDD.compute(VertexSetRDD.scala:149)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:159)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:212)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

dcrankshaw avatar Nov 07 '13 06:11 dcrankshaw

Interestingly, https://github.com/amplab/graphx/pull/51 seems to fix the issue.

dcrankshaw avatar Nov 07 '13 06:11 dcrankshaw

simple code is less buggy :)

rxin avatar Nov 07 '13 06:11 rxin

Hmm, is that error in the master branch? Those line numbers don't seem to "line-up." I guess for now maybe it makes sense to merge #51 since it is simpler.

jegonzal avatar Nov 07 '13 16:11 jegonzal

I'm running into this issue in the latest version built off of #132

The code I'm running is pretty simple:

    val input = sc.sequenceFile[VectorWritable, VectorWritable](inputPath, classOf[VectorWritable], classOf[VectorWritable])
// not even parsing the vectors, just making some big graph
val edges = input.flatMap {
      case (vec1, vec2) =>
        Seq(Edge(Random.nextLong, Random.nextLong, 1), Edge(Random.nextLong(),Random.nextLong(), 1))
    }
 val g = Graph.fromEdges(edges, 1)
 val cc = ConnectedComponents.run(g)
 cc.vertices.count()

The error I see is this: 4/01/09 10:38:46 WARN scheduler.TaskSetManager: Lost TID 400 (task 4.0:0) 14/01/09 10:38:46 WARN scheduler.TaskSetManager: Loss was due to java.util.NoSuchElementException java.util.NoSuchElementException: End of stream at org.apache.spark.util.NextIterator.next(NextIterator.scala:83) at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:29) at org.apache.spark.graph.impl.RoutingTable$$anonfun$1.apply(RoutingTable.scala:34) at org.apache.spark.graph.impl.RoutingTable$$anonfun$1.apply(RoutingTable.scala:33) at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:449) at org.apache.spark.rdd.RDD$$anonfun$1.apply(RDD.scala:449) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:34) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:242) at org.apache.spark.rdd.RDD.iterator(RDD.scala:231) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102) at org.apache.spark.scheduler.Task.run(Task.scala:53) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662)

This is running on a Hadoop 2.2.0 cluster with YARN 2.2.0. I've previously run against the same data set with Bagel and it works great.

ccsevers avatar Jan 09 '14 17:01 ccsevers

Just to make it simpler you can change the input to something like: val input = sc.parallelize(1 to 100000000, 400) and change the flatMap input appropriately and see the same error.

edit: Also, oddly I can run it with say 11 splits on 10 works and it goes through. I see the same End of Stream error in the logs but it just fails that tasks and keeps going.

ccsevers avatar Jan 09 '14 18:01 ccsevers

Thanks for the report. I haven't yet been able to reproduce this. How many cores does Spark have in your configuration?

ankurdave avatar Jan 11 '14 05:01 ankurdave

@ankurdave I've tried it with 4-8 cores. I can try with 1 if you think it would help pin down what's going on.

edit: Just to be clear, I mean 10-20 worker nodes with 4-8 cores each. (the machines have 24 cores each though).

ccsevers avatar Jan 12 '14 19:01 ccsevers