2017-01-09 3 views
1

Я пытаюсь выполнить некоторую передачу сообщений на графике для вычисления рекурсивных функций. Я получаю сообщение об ошибке, когда я определяю график, вершинами которого являются вывод aggregateMessages. Код для контекстаGraphX ​​VertexRDD NullPointerException

> val newGraph = Graph(newVertices, edges) 

newGraph: org.apache.spark.graphx.Graph[List[Double],Int] = [email protected] 

//This is the RDD that causes the problem 
> val result = newGraph.aggregateMessages[List[Double]](
    {triplet => triplet.sendToDst(triplet.srcAttr)}, 
    {(a,b) => a.zip(b).map { case (x, y) => x + y }}, 
    {TripletFields.Src}) 

result: org.apache.spark.graphx.VertexRDD[List[Double]] = VertexRDDImpl[1990] at RDD at VertexRDD.scala:57 

> result.take(1) 
res121: Array[(org.apache.spark.graphx.VertexId, List[Double])] = Array((1944425548,List(0.0, 0.0, 137.0, 292793.0))) 

до сих пор без проблем, но когда я пытаюсь

> val newGraph2 = Graph(result, edges) 

newGraph2: org.apache.spark.graphx.Graph[List[Double],Int] = [email protected] 

> val result2 = newGraph2.aggregateMessages[List[Double]](
    {triplet => triplet.sendToDst(triplet.srcAttr)}, 
    {(a,b) => a.zip(b).map { case (x, y) => x + y }}, 
    {TripletFields.Src}) 

> result2.count 

я получаю следующее (обрезается) Ошибка:

result2: org.apache.spark.graphx.VertexRDD[List[Double]] = VertexRDDImpl[2009] at RDD at VertexRDD.scala:57 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4839.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4839.0 (TID 735, 10.0.2.15): java.lang.NullPointerException 
    at $anonfun$2.apply(<console>:62) 
    at $anonfun$2.apply(<console>:62) 
    at org.apache.spark.graphx.impl.AggregatingEdgeContext.send(EdgePartition.scala:536) 
    at org.apache.spark.graphx.impl.AggregatingEdgeContext.sendToDst(EdgePartition.scala:531) 
    at $anonfun$1.apply(<console>:61) 
    at $anonfun$1.apply(<console>:61) 
    at org.apache.spark.graphx.impl.EdgePartition.aggregateMessagesEdgeScan(EdgePartition.scala:409) 
    at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:237) 
    at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:207) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
... 
Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
... 
Caused by: java.lang.NullPointerException 
    at $anonfun$2.apply(<console>:62) 
    at $anonfun$2.apply(<console>:62) 
    at org.apache.spark.graphx.impl.AggregatingEdgeContext.send(EdgePartition.scala:536) 
    at org.apache.spark.graphx.impl.AggregatingEdgeContext.sendToDst(EdgePartition.scala:531) 
    at $anonfun$1.apply(<console>:61) 
    at $anonfun$1.apply(<console>:61) 
    at org.apache.spark.graphx.impl.EdgePartition.aggregateMessagesEdgeScan(EdgePartition.scala:409) 
    at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:237) 
    at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$anonfun$apply$3.apply(GraphImpl.scala:207) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
    at org.apache.spark.scheduler.Task.run(Task.scala:85) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    ... 3 more 

Я не думаю, что это тип ошибки несоответствия, потому что aggregateMessages возвращает VertexRDD, любые идеи, почему я получаю эту проблему?

ответ

1

Не все узлы на графике возвращаются aggregateMessages, только те, которые получают сообщение. Исключение NullPointerException вызвано ребрами в графе, указывающем на эти узлы, и отсутствием значения узла по умолчанию в определении графика.

Смежные вопросы