2014-12-01 2 views
2

Как я могу объединить элементы в Spark RDD при пользовательском состоянии?слияния элементов в искровом RDD при настраиваемом состоянии

Предположим, что есть RDD [Seq [Int]], где некоторые Seq [Int] в этом RDD содержат перекрывающиеся элементы. Задача состоит в том, чтобы объединить все перекрывающиеся Seq [Int] в этом RDD и сохранить результат в новом RDD.

Например, предположим, что RDD [Seq [Int]] = [[1,2,3], [2,4,5], [1,2], [7,8,9]], результат должен быть [[1,2,3,4,5], [7,8,9]].

Поскольку RDD [Seq [Int]] очень большой, я не могу это сделать в программе драйвера. Можно ли это сделать, используя распределенные groupBy/map/reduce и т. Д.?

+0

@maasg Я думаю, что эта проблема может быть преобразована в вычислениях всех подключенных график, образованных элементами в RDD [Seq [Int]], так как условие слияния (два Seq [Int] имеет перекрывающиеся целые числа) обозначает соединение между двумя Seq [Int] –

+0

Действительно, это была бы идея. Я вижу, что вы новичок в SO. Добро пожаловать !. Это часть «правил дома», чтобы показать свои собственные усилия в решении проблемы, отсюда и предыдущий комментарий. – maasg

ответ

1

Наконец-то это удалось решить самостоятельно.

Эта проблема может быть преобразована в вычисление всех связанных компонентов, образованных элементами в RDD [Seq [Int]], так как условие слияния (два Seq [Int] имеют перекрывающиеся целые числа) обозначает связность между двумя Seq [Int].

Основная идея заключается в:

  1. Дайте каждый элемент в RDD [Seq [Int]] уникальный ключ (.zipWithUniqueId)
  2. группы целых чисел в Seq [Int] с помощью генерируемого ключа, таким образом, целых чисел которые появляются в несколько Seq [Int] будет иметь соответствующую клавишу сгруппирована вместе
  3. Генерировать график RDD, где края являются ключевым паром из одной и тех же групп в шаге 2
  4. Использование Graphx для вычисления компонент связности, и присоединиться к Результаты

    val sets = Seq(Seq(1,2,3,4), Seq(4,5), Seq(1,2,3), Seq(6,7,8), Seq(9,10), Seq(7,9)) 
    val rddSets = sc.parallelize(sets) 
           .zipWithUniqueId 
           .map(x => (x._2, x._1)).cache() 
    val edges = rddSets.flatMap(s => s._2.map(i => (i, s._1))) 
            .groupByKey.flatMap(g => { 
             var first = g._2.head 
             for (v <- g._2.drop(1)) yield { 
             val pair = (first, v) 
             first = v 
             pair 
             } 
            }).flatMap(e => Seq((e._1, e._2), (e._2, e._1))) 
    
    val vertices = Graph.fromEdgeTuples[Long](edges, defaultValue = 0) 
            .connectedComponents.vertices 
    rddSets.join(vertices).map(x => (x._2._2, x._2._1)) 
         .reduceByKey((s1, s2) => s1.union(s2).distinct) 
         .collect().foreach(x => println (x._2.toString())) 
    
Смежные вопросы