2016-01-18 4 views
2

В настоящее время я пишу программу, в которой я решаю, следует ли использовать groupByKey, за которым следует объединение или просто соединение.Дублирование значений при использовании join() in spark

По существу, у меня есть одно RDD со многими значениями на ключ и другое RDD с одним значением для каждой клавиши, но это значение очень велико. Мой вопрос заключается в том, когда я присоединяюсь к этим значениям вместе, искривится, создавая много копий большого значения (по одному для каждого отдельного экземпляра меньшего значения) или будет испускать только одну копию большого значения, давая ссылки на все от первоначальных значений.

По сути, я бы такую ​​ситуацию:

val InvIndexes:RDD[(Int,InvertedIndex)] //InvertedIndex is very large 
val partitionedVectors:RDD[(Int, Vector)] 

val partitionedTasks:RDD[(Int, (Iterator[Vector], InvertedIndex))] = partitionedvectors.groupByKey().join(invIndexes) 


val similarities = partitionedTasks.map(//calculate similarities) 

Мой вопрос, если бы на самом деле быть больших различий пространство сложности между кодом до и делать это:

val InvIndexes:RDD[(Int,InvertedIndex)] 
val partitionedVectors:RDD[(Int, Vector)] 

val partitionedTasks:RDD[(Int, (Vector, InvertedIndex))] = partitionedvectors.join(invIndexes) 

val similarities = partitionedTasks.map(//calculate similarities) 

ответ

1

Технически говоря не должно быть никакого копирования. join в основном cogroup, а затем flatMapValues с вложенным пониманием for. Если предположить, что в качестве одного со-сгруппированных элемент выглядит следующим образом:

val pair = (1, (
    Seq(Vectors.dense(Array(1.0)), Vectors.dense(Array(2.0))), 
    Seq(Vectors.dense(Array(3.0)), Vectors.dense(Array(4.0))) 
)) 

последующие операции эквивалентно:

val result = pair match { 
    case (k, (xs, ys)) => xs.flatMap(x => ys.map(y => (k, (x, y)))) 
} 

и, как ожидается, векторы не копируются:

require(result(0)._2._1 eq result(1)._2._1) 

Пока данные обрабатываются в памяти или весь раздел сериализуется/десериализован (например, в collect) этот статус должен быть сохранен, но лично я не буду зависеть от этого. Даже если вы игнорируете детали реализации на низком уровне, простая перетасовка может потребовать полной копии.

+0

спасибо @ zero323. Итак, два вопроса: 1. Вы говорите, что данные не копируются только в том случае, если весь раздел достаточно мал, чтобы вписаться в память одного компьютера? Или вы говорите, что я должен беспокоиться о копировании объекта в будущие функции (в этом случае это не будет представлять серьезной проблемы, поскольку, как только я нахожу сходство, мне больше не понадобится перевернутый индекс) –

+0

2. Если вы считаете, что это метод был бы ненадежным, было бы лучше сделать что-то в соответствии с моим первым фрагментом кода, где я groupByKey, а затем удерживать InvertedIndex, пока я пересекаю итератор? –

+0

Лично я бы рассмотрел cogroup и flatMap. Вы можете напрямую использовать InvertedIndex и сохранить пространство, необходимое для хранения всех ссылок. – zero323

Смежные вопросы