2016-11-17 3 views
4

Учитывая файл, заполненный строками идентификаторов, например.Объединение/объединение строк идентификаторов в искровом

i1, i2, i5 
i3, i4 
i2, i6, i7 
i4, i8 
i9, i3 

Как вы можете присоединиться к ним, связав те же ID? Итак, для примера выше строка 1 связана с строкой 3 через i2, а строка 2 связана с строками 4 и 5 через i4 и i3 соответственно. Это даст вам следующие (дубликаты удалены)

i1, i2, i5, i6, i7 
i3, i4, i8, i9 

Я мог бы сделать это с помощью цикла по строкам, но было интересно, как вы бы об этом в функциональном образом?

+2

Аналогично: HTTP: // StackOverflow.com/questions/40240409/apache-spark-rdd-replacement/40256149 # 40256149 – maasg

ответ

1

Как вы используете Apache Спарк, вы можете использовать встроенный в Graphx компонент, чтобы сделать работу для вас.

import org.apache.spark.graphx._ 

def cross[Y](xs: Traversable[Y], ys: Traversable[Y]) = for { x <- xs; y <- ys } yield (x, y) 

val data = sc.parallelize(List(
    "1\t5\t3", 
    "3\t9\t30", 
    "7\t10\t12", 
    "10\t7\t13" 
)) 

val prep = data.map(x => x.split("\t").map(_.toLong).toList) 

val vertex = prep 
    .flatMap(x => x) 
    .map(x => x -> s"ID=$x") 

val edges = prep 
    .map(x => cross(x, x)) 
    .flatMap(x => x) 
    .map(x => new Edge(x._1, x._2, "likes")) 

val graph = Graph(vertex, edges) 
val linked = graph 
    .connectedComponents 
    .vertices 
    .map(_.swap) 
    .groupByKey 

linked.take(10).foreach(println) 

напечатает следующий результат:

(1,CompactBuffer(30, 3, 9, 1, 5)) 
(7,CompactBuffer(7, 10, 12, 13)) 

Крест просто создает перекрестное произведение двух списков, чтобы мы могли создать ребра между всеми вершинами.

Функция connectedComponents будет проходить через график и находить все вершины, которые разделяют ребро и создают новый график, где каждая вершина является кортежем идентификатора вершин Id -> «Первичный» идентификатор вершины.

Итак:

graph.connectedComponents.vertices.take(10).foreach(println) 

распечатать бы из

(30,1) 
(1,1) 
(3,1) 
(5,1) 
(7,7) 
(9,1) 
(10,7) 
(12,7) 
(13,7) 

Как вы можете видеть, 1 и 7 были выбраны в качестве «первичного Vertex» и связан со всеми подключенными Вершины в первом графике , Таким образом, простой своп и группа объединят все связанные идентификаторы.

0

Вместо того чтобы использовать O(n * n) решение, которое перебирает все строки, мы можем использовать это решение, которое O(n * k) где к Ваш номер ID. Нравится так:

val input = ...//I will assume your input is an RDD[List] 

val idArray = Array(id1, id2, id3, id4, id5, id6, id6)//Array containing all IDs 
val result = sc.parallelize(idArray, k).map(x => (x, x)) 
input = input.map(x => (x(0), if(x.length > 0) x.slice(1, x.length) else null)) 

//If you can afford to persist it would help greatly: 
result.persist 
input.persist 

//We can make this loop smaller if k is large and your lists are small 
//by setting the upper bound of the range to the length of the longest list. 
//I'll leave this decision up to you. 
for (i <- 0 to k){ 
    result = result.cogroup(input) 
    input = input.map((t: (x, y)) => (y(0), if(y.length > 0) y.slice(1, y.length) else null)) 
} 
result.map((t: (x, y)) => y.distinct)//we want distinct lists in output 

result.unpersist 
input.unpersist 
0

Так что это, вероятно, не оптимально, но я полагал, что это будет стоить независимо. Он предполагает, что ваш входной файл достаточно мал, чтобы его можно было сохранить в памяти (потому что это все ванила Scala).

Я решил решить эту проблему, обработав данные идентификаторы как смежности на графике, а затем используя BFS, чтобы перечислить все подключенные компоненты.

/* Input, can be read from file easily by splitting on ", " */ 
val lines = List(List("i1", "i2", "i5"), 
    List("i3", "i4"), 
    List("i2", "i6", "i7"), 
    List("i4", "i8"), 
    List("i9", "i3")) 

/* finds all sequential pairs */ 
val pairs = lines.flatMap(x => x.dropRight(1).zip(x.drop(1))) 

/* create an empty adjacency map: id -> (Set of adjacent vertices) */ 
val defMap = Map[String, Set[String]]().withDefaultValue(Set[String]()) 

/* populate the default map with the actual (symmetric) adjacencies */ 
val adjMap = pairs.foldLeft{defMap}(
    (acc, x) => acc + (x._1 -> (acc(x._1) + x._2)) + (x._2 -> (acc(x._2) + x._1))) 

/* BFS algo on map representation of graph */ 
def mapBFS(adjMap: Map[String, Set[String]]): List[List[String]] = 
{ 
    val v = adjMap.keys 
    var globalVisits = List[String]() 
    def BFS_r(elems: List[String], visited: List[List[String]]): List[String] = 
    { 
     val newNeighbors = elems.flatMap(adjMap(_)).filterNot(visited.flatten.contains).distinct 
     if (newNeighbors.isEmpty) 
      visited.flatten 
     else 
      BFS_r(newNeighbors, newNeighbors :: visited) 
    } 
    v.flatMap(x =>{ 
     if (globalVisits.contains(x)) 
      None 
     else 
     { 
      val vi: List[String] = BFS_r(List(x), List(List(x))) 
      globalVisits = globalVisits ++ vi 
      Some(vi) 
     } 
    }).toList 
} 
mapBFS(adjMap).foreach{println} 

Что дает следующий вывод:

List(i7, i1, i6, i2, i5) 
List(i8, i4, i3, i9) 
+0

Я разместил это решение на [Обзор кода] (http://codereview.stackexchange.com/questions/147446/print-connected-components-scala), если у вас есть мысли о том, как улучшить. –

1

код, который работает с искрой 2.0+

val spark: SparkSession = SparkSession.builder.master("local").getOrCreate; 
val df = spark.sparkContext.parallelize(
    List(
    "i1, i2, i5", 
    "i3, i4", 
    "i2, i6, i7", 
    "i4, i8") 
) 

//Group lines with tokens count (determing by the last occurence of comma) 
val rddGroupByTokensCount = df.map(row => (row.lastIndexOf(','), row.split(", "))) 
    .groupBy(_._1) 

//Now gather all the token to single place with flatMap and drop duplicates 
val rddUniqueTokens = rddGroupByTokensCount.map(_._2.flatMap(_._2).toSet) 

//print grouped unique tokens by the count in each line 
rddUniqueTokens.collect().map(println) 

Выход:

Set(i5, i1, i7, i2, i6) 
Set(i3, i4, i8) 
Смежные вопросы