2017-01-03 2 views
4

Я пытаюсь вычислить сумму значений узлов в графе искрового графа. Короче говоря, граф - это дерево, а верхний узел (корень) должен суммировать всех детей и их детей. Мой график на самом деле дерево, которое выглядит, как это и ожидается суммируется значение должно быть 1850:Суммирование агрегации Spark GraphX ​​

         +----+ 
        +---------------> | VertexID 14 
        |    | | Value: 1000 
       +---+--+   +----+ 
    +------------>  | VertexId 11 
    |   |  | Value:  +----+ 
    |   +------+ Sum of 14 & 24 | VertexId 24 
+---++    +--------------> | Value: 550 
| | VertexId 20     +----+ 
| | Value: 
+----++Sum of 11 & 911 
     | 
     |   +-----+ 
     +----------->  | VertexId 911 
        |  | Value: 300 
        +-----+ 

Первый удар в этом выглядит следующим образом:

val vertices: RDD[(VertexId, Int)] = 
     sc.parallelize(Array((20L, 0) 
     , (11L, 0) 
     , (14L, 1000) 
     , (24L, 550) 
     , (911L, 300) 
    )) 

    //note that the last value in the edge is for factor (positive or negative) 
    val edges: RDD[Edge[Int]] = 
     sc.parallelize(Array(
     Edge(14L, 11L, 1), 
     Edge(24L, 11L, 1), 
     Edge(11L, 20L, 1), 
     Edge(911L, 20L, 1) 
    )) 

    val dataItemGraph = Graph(vertices, edges) 


    val sum: VertexRDD[(Int, BigDecimal, Int)] = dataItemGraph.aggregateMessages[(Int, BigDecimal, Int)](
     sendMsg = { triplet => triplet.sendToDst(1, triplet.srcAttr, 1) }, 
     mergeMsg = { (a, b) => (a._1, a._2 * a._3 + b._2 * b._3, 1) } 
    ) 

    sum.collect.foreach(println) 

Это возвращает следующее :

(20,(1,300,1)) 
(11,(1,1550,1)) 

Он выполняет сумму для вершины 11, но не свертывается с корневым узлом (вершина 20). Что мне не хватает или есть лучший способ сделать это? Конечно, дерево может иметь произвольный размер, и каждая вершина может иметь произвольное число дочерних ребер.

+0

Теперь мне интересно, является ли Pregel правильным методом использования. – will

ответ

2

Учитывая график направлен (как в вас, например, кажется, что) это должно быть возможно написать программу Преголя, что делает то, что вы просите:

val result = 
dataItemGraph.pregel(0, activeDirection = EdgeDirection.Out)(
    (_, vd, msg) => msg + vd, 
    t => Iterator((t.dstId, t.srcAttr)), 
    (x, y) => x + y 
) 

result.vertices.collect().foreach(println) 

// Output is: 
// (24,550) 
// (20,1850) 
// (14,1000) 
// (11,1550) 
// (911,300) 

Я использую EdgeDirection.Out так что сообщения отправляются только снизу вверх (иначе мы попали бы в бесконечный цикл).

Смежные вопросы