2015-12-28 3 views
1

У меня есть большой RDD (rdd_1) и отфильтрованное подмножество (rdd_2). Я хочу присоединиться к rdd_1 и rdd_2 в другом поле.Spark broadcast vs join

Предположим, что записи имеют формат {'first_name': <>, 'last_name': <>}. Мы хотим найти все имена, имеющие ту же самую фамилию, что и все «джек».

names = sc.textfile(RAW_DATA) 
jack = names.filter(lambda v: v['first_name'] == 'jack') 

Вариант 1

jack_last_names = jack.map(operator.itergetter('last_name').distinct().collect() 
last_names_bc = sc.broadcast(set(jack_last_names)) 
final = names.filter(lambda v:v['last_name'] in last_names_bc.value) 

В настоящее время я вещать rdd_2 и фильтр rdd_1 им. Проблема в том, что для того, чтобы транслировать rdd_2, я должен сначала собрать() его в драйвере, и это приведет к тому, что у драйвера закончится нехватка памяти.

Есть ли способ транслировать RDD без первого сбора() на нем водителю?

Вариант 2

final = jack.keyBy(operator.itemgetter('last_name').join(names.keyBy(operator.itemgetter('last_name') 

Мой другой вариант rdd_1.join (rdd_2), но rdd_1 является слишком большим, чтобы перетасовать.

Когда мы запускаем rdd_1.join (rdd_2), выполняем ли как rdd_1, так и rdd_2 хэширование и перетасовку?

Спасибо!

ответ

2

Есть ли способ транслировать RDD без первого сбора() на нем водителю?

Нет, не существует и даже если бы это не решило вашу проблему.

  • не представляется возможным выполнить вложенное действие или преобразование
  • , если вы можете создать локальную переменную широковещательную без сбора вы столкнулись с той же проблемой, но на рабочих

Когда мы запускаем rdd_1 .join (rdd_2) делает и rdd_1 и rdd_2 get hash разбитым и перетасованным?

Технически в PySpark это потребует union с последующим groupByKey так это означает, что все данные должны быть перемешаны.

На практике я просто принимаю стоимость перетасовки движения. В общем случае невозможно написать какое-либо сложное приложение и полностью избежать перетасовки. Кроме того, он не дороже, чем broadcasting a similar amount of data или даже копирование данных в распределенную файловую систему с репликацией.