2016-03-29 2 views
1

Я хочу найти непрямые узлы, подключенные к определенному узлу. Я попытался с помощью класса связных компонент графа, как показано ниже ...Как найти непрямые узлы, подключенные к определенному узлу в Spark Graphx

graph.connectedComponents 

Однако он дает для всех graph..but я хочу для конкретного узла.

Я пробовал делать, как показано ниже.

graph.edges.filter(_.srcId == x).map(_.dstId) 

Это дает прямые узлы определенного узла, и я должен рекурсивно это использовать только с помощью операций RDD. Может ли кто-нибудь помочь в этом?

ответ

1

попробовать что-то вроде этого:

graph.edges.filter(_.srcId == x).map(e => (e.dstId, null)).join(
    graph.collectNeighborIds(EdgeDirection.Either) 
).flatMap{t => t._2._2}.collect.toSet 

Если вы хотите пойти глубже, чем это, я хотел бы использовать что-то вроде Pregel API. По сути, он позволяет вам повторно отправлять сообщения с узла на узел и агрегировать результаты.

Edit: Pregel Решение

я, наконец, получил итерации, чтобы остановить самостоятельно. Редактирование ниже. Учитывая этот график:

graph.vertices.collect 
res46: Array[(org.apache.spark.graphx.VertexId, Array[Long])] = Array((4,Array()), (8,Array()), (1,Array()), (9,Array()), (5,Array()), (6,Array()), (2,Array()), (3,Array()), (7,Array()))  

graph.edges.collect 
res47: Array[org.apache.spark.graphx.Edge[Double]] = Array(Edge(1,2,0.0), Edge(2,3,0.0), Edge(3,4,0.0), Edge(5,6,0.0), Edge(6,7,0.0), Edge(7,8,0.0), Edge(8,9,0.0), Edge(4,2,0.0), Edge(6,9,0.0), Edge(7,9,0.0)) 

Мы будем отправлять сообщения типа Array[Long] - массив из всех VertexIds подключенных узлов. Сообщения будут идти вверх по течению - dst отправит src его VertexId вместе со всем остальным нисходящим потоком VertexIds. Если восходящий узел уже знает о соединении, сообщение не будет отправлено. В конце концов, каждый узел знает о каждом подключенном узле и больше сообщений не будет отправлено.

Сначала мы определяем наш vprog. Согласно документации:

определенного пользователем вершинной программы, которая работает на каждую вершину и получает входящее сообщение и вычисляет новое значение вершины. На первой итерации программа вершин вызывается во всех вершинах и передается сообщение по умолчанию. При последующих итерациях программа вершин вызывается только на тех вершинах, которые получают сообщения.

def vprog(id: VertexId, orig: Array[Long], newly: Array[Long]) : Array[Long] = { 
    (orig ++ newly).toSet.toArray 
} 

Тогда мы определим наш sendMsg - отредактирован: сменил src & dst

питающих пользовательскую функцию, которая применяется к из краев вершин, полученных сообщений в текущей итерации

def sendMsg(trip: EdgeTriplet[Array[Long],Double]) : Iterator[(VertexId, Array[Long])] = { 
    if (trip.srcAttr.intersect(trip.dstAttr ++ Array(trip.dstId)).length != (trip.dstAttr ++ Array(trip.dstId)).toSet.size) { 
    Iterator((trip.srcId, (Array(trip.dstId) ++ trip.dstAttr).toSet.toArray)) 
    } else Iterator.empty } 

Следующая наша mergeMsg:

прилагаемую пользовательская функция, которая принимает два входящие сообщения типа А и объединяет их в одно сообщение типа А. Эта функция должна быть коммутативной и ассоциативной и в идеале размер A не должно увеличить.

К сожалению, мы будем нарушать правила в последнем предложении выше:

def mergeMsg(a: Array[Long], b: Array[Long]) : Array[Long] = { 
    (a ++ b).toSet.toArray 
} 

Тогда мы бежим pregel - отредактирован: удалены maxIterations, по умолчанию Int.MaxValue

val result = graph.pregel(Array[Long]())(vprog, sendMsg, mergeMsg) 

И вы можете посмотреть на результаты:

result.vertices.collect 
res48: Array[(org.apache.spark.graphx.VertexId, Array[Long])] = Array((4,Array(4, 2, 3)), (8,Array(8, 9)), (1,Array(1, 2, 3, 4)), (9,Array(9)), (5,Array(5, 6, 9, 7, 8)), (6,Array(6, 7, 9, 8)), (2,Array(2, 3, 4)), (3,Array(3, 4, 2)), (7,Array(7, 8, 9))) 
+0

Ну. Конечно, это не простая проблема. Есть API-интерфейс aggregateMessages, на который вы должны обратить внимание, и алгоритм под названием «Pregel», который вы могли бы использовать. По сути, он позволяет вам повторно отправлять сообщения с узла на узел и агрегировать результаты. Трудно объяснить в комментарии, но представьте, что в первой итерации алгоритма Node 7 отправил (8,9) в Node 2, а Node 8 отправил (10,11) на узел 7. Затем в вторая итерация, узел 7 отправит (10,11) в узел 2. Таким образом, две итерации Pregel у вас есть ваш ответ. –

+0

Уверенный Дэвид .. я буду работать с этим ... и обновит вас ... Спасибо – Devndra

+0

Итак, отлично - теперь принимайте и повышайте мой ответ !! –

Смежные вопросы