2015-12-30 2 views
0

Пусть иметь простой график, как:как вычислить вершину сходства с соседями в Graphx

val users = sc.parallelize(Array(
       (1L, Seq("M", 2014, 40376, null, "N", 1, "Rajastan")), 
       (2L, Seq("M", 2009, 20231, null, "N", 1, "Rajastan")), 
       (3L, Seq("F", 2016, 40376, null, "N", 1, "Rajastan")) 
      ))         
val edges = sc.parallelize(Array(
       Edge(1L, 2L, ""), 
       Edge(1L, 3L, ""), 
       Edge(2L, 3L, ""))) 
val graph = Graph(users, edges) 

я хотел бы, чтобы вычислить, сколько каждая вершина похожа на своих соседей по каждому параметру.

Идеальный выход (РД или DataFrame) проведут следующие результаты:

1L: 0.5, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0 
2L: 0.5, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0 
3L: 0.0, 0.5, 0.5, 1.0, 1.0, 1.0, 1.0 

Например, первое значение 1L означает, что на 2 соседей, только 1 одних и то же значение ...

Я играю с aggregateMessage просто подсчитать, сколько соседей имеют сходное значение атрибута, но без толка до сих пор:

val result = graph.aggregateMessages[(Int, Seq[Any])](
    // build the message 
    sendMsg = { 
     // map function 
     triplet => 
     // send message to destination vertex 
     triplet.sendToDst(1, triplet.srcAttr) 
     // send message to source vertex 
     triplet.sendToSrc(1, triplet.dstAttr) 
    }, // trying to count neighbors with similar property 
    { case ((cnt1, sender), (cnt2, receiver)) => 
     val prop1 = if(sender(0) == receiver(0)) 1d else 0d 
     val prop2 = if(Math.abs(sender(1).asInstanceOf[Int] - receiver(1).asInstanceOf[Int])<3) 1d else 0d 
     val prop3 = if(sender(2) == receiver(2)) 1d else 0d 
     val prop4 = if(sender(3) == receiver(3)) 1d else 0d 
     val prop5 = if(sender(4) == receiver(4)) 1d else 0d 
     val prop6 = if(sender(5) == receiver(5)) 1d else 0d 
     val prop7 = if(sender(6) == receiver(6)) 1d else 0d 
     (cnt1 + cnt2, Seq(prop1, prop2, prop3, prop4, prop5, prop6, prop7)) 
    } 
) 

это дает мне правильную окрестность размер для каждой вершины, но не суммированием значений справа:

//> (1,(2,List(0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0))) 
//| (2,(2,List(0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))) 
//| (3,(2,List(1.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0))) 

ответ

1

Это не суммируется значения, поскольку нет суммы в вашем коде. Более того, ваша логика ошибочна. mergeMsg принимает сообщения не (message, current). Попробуйте что-то вроде этого:

import breeze.linalg.DenseVector 

def compareAttrs(xs: Seq[Any], ys: Seq[Any]) = 
    DenseVector(xs.zip(ys).map{ case (x, y) => if (x == y) 1L else 0L}.toArray) 

val result = graph.aggregateMessages[(Long, DenseVector[Long])](
    triplet => { 
    val comparedAttrs = compareAttrs(triplet.dstAttr, triplet.srcAttr) 
    triplet.sendToDst(1L, comparedAttrs) 
    triplet.sendToSrc(1L, comparedAttrs) 
    }, 
    { case ((cnt1, v1), (cnt2, v2)) => (cnt1 + cnt2, v1 + v2) } 
) 

result.mapValues(kv => (kv._2.map(_.toDouble)/kv._1.toDouble)).collect 
// Array(
// (1,DenseVector(0.5, 0.0, 0.5, 1.0, 1.0, 1.0, 1.0)), 
// (2,DenseVector(0.5, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0)), 
// (3,DenseVector(0.0, 0.0, 0.5, 1.0, 1.0, 1.0, 1.0))) 
Смежные вопросы