node2vec
node2vec copied to clipboard
Setting Directed Flag Causes NullPointerException
Hi,
Somehow the Spark implementation crashes whenever the directed flag is set to true.
I ran it with both the karate.edgelist example and some dummy two-edge graph.
The exception is always raised at the same location inside initTransitionProb after the
graph has been loaded.
java.lang.NullPointerException
at Node2vec$$anonfun$initTransitionProb$2.apply(Node2vec.scala:69)
at Node2vec$$anonfun$initTransitionProb$2.apply(Node2vec.scala:68)
at org.apache.spark.graphx.impl.VertexPartitionBaseOps.map(VertexPartitionBaseOps.scala:61)
at org.apache.spark.graphx.impl.GraphImpl$$anonfun$5.apply(GraphImpl.scala:129)
at org.apache.spark.graphx.impl.GraphImpl$$anonfun$5.apply(GraphImpl.scala:129)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1038)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:89)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Cheers.
Reason
I had met same error when running val (j, q) = GraphOps.setupAlias(nodeAttr.neighbors) in GraphOps.initTransitionProb, This is due to the nodeAttr object being Null.
Detail
- Some
dstnodes should be included in thenode2attrobject but not in the following code:
val node2attr = triplets.map { case (src, dst, weight) =>
(src, Array((dst, weight)))
}.reduceByKey(_++_).map { case (srcId, neighbors: Array[(Long, Double)]) =>
var neighbors_ : Array[(Long, Double)] = neighbors.groupBy(_._1).map { case (group, traversable) =>
traversable.head
}.toArray
if (neighbors_.length > bcMaxDegree.value) {
neighbors_ = neighbors.sortWith{ case (left, right) => left._2 > right._2 }.slice(0, bcMaxDegree.value)
}
- Then when creating graph object by
val graph = Graph(indexedNodes, indexedEdges), thedstnodes mentioned above, which are missing, will be created by default. And the format by default is [vertexId, Null] ,instead of [vertexId, NodeAttr]. So the error come.
Solutions
To solve the problem, some modules should be modified. The details is shown below:
- GraphOps.initTransitionProb
val graph = Graph(indexedNodes, indexedEdges).mapVertices[NodeAttr] { case (vertexId, nodeAttr) =>
var path:Array[Long] = null
if (nodeAttr != null) { // add
val (j, q) = GraphOps.setupAlias(nodeAttr.neighbors)
val nextNodeIndex = GraphOps.drawAlias(j, q)
nodeAttr.path = Array(vertexId, nodeAttr.neighbors(nextNodeIndex)._1)
nodeAttr
}else{
NodeAttr() // create a new object
}
}
- Node2Vec.randomWalk
// add:.filter(x=>x._2.path.nonEmpty).
val examples = g.vertices.filter(x=>x._2.path.nonEmpty).cache
...
// add the condition: attr.dstNeighbors != null && attr.dstNeighbors.nonEmpty
iter.map { case (edge, (attr, pathBuffer)) =>
try {
if (pathBuffer != null && pathBuffer.nonEmpty && attr.dstNeighbors != null && attr.dstNeighbors.nonEmpty) {
val nextNodeIndex = GraphOps.drawAlias(attr.J, attr.q)
val nextNodeId = attr.dstNeighbors(nextNodeIndex)
s"$pathBuffer\t$nextNodeId"
} else {
pathBuffer //add
}
} catch {
case e: Exception => throw new RuntimeException(e.getMessage)
}
Hope this can help you! Good luck!