Как вы используете Apache Спарк, вы можете использовать встроенный в Graphx компонент, чтобы сделать работу для вас.
import org.apache.spark.graphx._
def cross[Y](xs: Traversable[Y], ys: Traversable[Y]) = for { x <- xs; y <- ys } yield (x, y)
val data = sc.parallelize(List(
"1\t5\t3",
"3\t9\t30",
"7\t10\t12",
"10\t7\t13"
))
val prep = data.map(x => x.split("\t").map(_.toLong).toList)
val vertex = prep
.flatMap(x => x)
.map(x => x -> s"ID=$x")
val edges = prep
.map(x => cross(x, x))
.flatMap(x => x)
.map(x => new Edge(x._1, x._2, "likes"))
val graph = Graph(vertex, edges)
val linked = graph
.connectedComponents
.vertices
.map(_.swap)
.groupByKey
linked.take(10).foreach(println)
напечатает следующий результат:
(1,CompactBuffer(30, 3, 9, 1, 5))
(7,CompactBuffer(7, 10, 12, 13))
Крест просто создает перекрестное произведение двух списков, чтобы мы могли создать ребра между всеми вершинами.
Функция connectedComponents будет проходить через график и находить все вершины, которые разделяют ребро и создают новый график, где каждая вершина является кортежем идентификатора вершин Id -> «Первичный» идентификатор вершины.
Итак:
graph.connectedComponents.vertices.take(10).foreach(println)
распечатать бы из
(30,1)
(1,1)
(3,1)
(5,1)
(7,7)
(9,1)
(10,7)
(12,7)
(13,7)
Как вы можете видеть, 1 и 7 были выбраны в качестве «первичного Vertex» и связан со всеми подключенными Вершины в первом графике , Таким образом, простой своп и группа объединят все связанные идентификаторы.
Аналогично: HTTP: // StackOverflow.com/questions/40240409/apache-spark-rdd-replacement/40256149 # 40256149 – maasg