2015-11-14 2 views
1

У меня есть 2 RDD, каждый из которых представляет собой набор строк, содержащих дубликаты. Я хочу найти пересечение двух множеств сохранение дубликатов. Пример:Spark: Как эффективно иметь пересечения, сохраняющие дубликаты (в Scala)?

RDD1 : a, b, b, c, c, c, c

RDD2 : a, a, b, c, c

Пересечение Я хочу есть множество a, b, c, c т.е. пересечение будет содержать каждый элемент минимальное количество раз, что он присутствует в обоих наборах.

По умолчанию intersection трансформация не сохраняет дубликаты AFAIK. Есть ли способ для эффективно вычислить пересечение, используя некоторые другие преобразования и/или преобразование пересечения? Я стараюсь избегать этого алгоритмически, что вряд ли будет столь же эффективным, как это делает Spark. (Для заинтересованных, я пытаюсь вычислить Jaccard bag similarity для набора файлов).

ответ

3

Заимствование немного от реализации intersection, вы могли бы сделать что-то вроде:

(val rdd1 = sc.parallelize(Seq("a", "b", "b", "c", "c", "c", "c"))) 
(val rdd2 = sc.parallelize(Seq("a", "a", "b", "c", "c"))) 

val cogrouped = rdd1.map(k => (k, null)).cogroup(rdd2.map(k => (k, null))) 
val groupSize = cogrouped.map { case (key, (buf1, buf2)) => (key, math.min(buf1.size, buf2.size)) } 
val finalSet = groupSize.flatMap { case (key, size) => List.fill(size)(key) } 

(finalSet.collect = Array(a, b, c, c)) 

Это работает, потому что cogroup будет поддерживать повторяющиеся вхождения значений пары для каждой группы (в данном случае, все ваши обнуляет). Также обратите внимание, что здесь мы больше не перетасовываем, чем с первоначальным использованием intersection.

+0

Отлично - спасибо! Есть небольшая синтаксическая ошибка - отредактировано 'iter1' =>' buf1' и 'iter2 => buf2', чтобы заставить его работать. – TCSGrad

+0

Кроме того, у меня есть запрос. Когда вы используете 'parallelize' явно vs, ожидаете, что Spark будет обрабатывать его внутренне через RDD? – TCSGrad

+0

Вы правы - извините. Метод 'parallelize()' фактически загружает существующую коллекцию в RDD по сравнению с методом 'textFile()', который загружает файлы с диска в RDD. –

Смежные вопросы