попробовать что-то вроде этого:
graph.edges.filter(_.srcId == x).map(e => (e.dstId, null)).join(
graph.collectNeighborIds(EdgeDirection.Either)
).flatMap{t => t._2._2}.collect.toSet
Если вы хотите пойти глубже, чем это, я хотел бы использовать что-то вроде Pregel API. По сути, он позволяет вам повторно отправлять сообщения с узла на узел и агрегировать результаты.
Edit: Pregel Решение
я, наконец, получил итерации, чтобы остановить самостоятельно. Редактирование ниже. Учитывая этот график:
graph.vertices.collect
res46: Array[(org.apache.spark.graphx.VertexId, Array[Long])] = Array((4,Array()), (8,Array()), (1,Array()), (9,Array()), (5,Array()), (6,Array()), (2,Array()), (3,Array()), (7,Array()))
graph.edges.collect
res47: Array[org.apache.spark.graphx.Edge[Double]] = Array(Edge(1,2,0.0), Edge(2,3,0.0), Edge(3,4,0.0), Edge(5,6,0.0), Edge(6,7,0.0), Edge(7,8,0.0), Edge(8,9,0.0), Edge(4,2,0.0), Edge(6,9,0.0), Edge(7,9,0.0))
Мы будем отправлять сообщения типа Array[Long]
- массив из всех VertexIds
подключенных узлов. Сообщения будут идти вверх по течению - dst
отправит src
его VertexId
вместе со всем остальным нисходящим потоком VertexIds
. Если восходящий узел уже знает о соединении, сообщение не будет отправлено. В конце концов, каждый узел знает о каждом подключенном узле и больше сообщений не будет отправлено.
Сначала мы определяем наш vprog
. Согласно документации:
определенного пользователем вершинной программы, которая работает на каждую вершину и получает входящее сообщение и вычисляет новое значение вершины. На первой итерации программа вершин вызывается во всех вершинах и передается сообщение по умолчанию. При последующих итерациях программа вершин вызывается только на тех вершинах, которые получают сообщения.
def vprog(id: VertexId, orig: Array[Long], newly: Array[Long]) : Array[Long] = {
(orig ++ newly).toSet.toArray
}
Тогда мы определим наш sendMsg
- отредактирован: сменил src
& dst
питающих пользовательскую функцию, которая применяется к из краев вершин, полученных сообщений в текущей итерации
def sendMsg(trip: EdgeTriplet[Array[Long],Double]) : Iterator[(VertexId, Array[Long])] = {
if (trip.srcAttr.intersect(trip.dstAttr ++ Array(trip.dstId)).length != (trip.dstAttr ++ Array(trip.dstId)).toSet.size) {
Iterator((trip.srcId, (Array(trip.dstId) ++ trip.dstAttr).toSet.toArray))
} else Iterator.empty }
Следующая наша mergeMsg
:
прилагаемую пользовательская функция, которая принимает два входящие сообщения типа А и объединяет их в одно сообщение типа А. Эта функция должна быть коммутативной и ассоциативной и в идеале размер A не должно увеличить.
К сожалению, мы будем нарушать правила в последнем предложении выше:
def mergeMsg(a: Array[Long], b: Array[Long]) : Array[Long] = {
(a ++ b).toSet.toArray
}
Тогда мы бежим pregel
- отредактирован: удалены maxIterations
, по умолчанию Int.MaxValue
val result = graph.pregel(Array[Long]())(vprog, sendMsg, mergeMsg)
И вы можете посмотреть на результаты:
result.vertices.collect
res48: Array[(org.apache.spark.graphx.VertexId, Array[Long])] = Array((4,Array(4, 2, 3)), (8,Array(8, 9)), (1,Array(1, 2, 3, 4)), (9,Array(9)), (5,Array(5, 6, 9, 7, 8)), (6,Array(6, 7, 9, 8)), (2,Array(2, 3, 4)), (3,Array(3, 4, 2)), (7,Array(7, 8, 9)))
Ну. Конечно, это не простая проблема. Есть API-интерфейс aggregateMessages, на который вы должны обратить внимание, и алгоритм под названием «Pregel», который вы могли бы использовать. По сути, он позволяет вам повторно отправлять сообщения с узла на узел и агрегировать результаты. Трудно объяснить в комментарии, но представьте, что в первой итерации алгоритма Node 7 отправил (8,9) в Node 2, а Node 8 отправил (10,11) на узел 7. Затем в вторая итерация, узел 7 отправит (10,11) в узел 2. Таким образом, две итерации Pregel у вас есть ваш ответ. –
Уверенный Дэвид .. я буду работать с этим ... и обновит вас ... Спасибо – Devndra
Итак, отлично - теперь принимайте и повышайте мой ответ !! –