2016-01-22 2 views
1

У меня большой RDD, назовите его RDD1, который составляет приблизительно 300 миллионов строк после начального фильтра. То, что я хотел бы сделать, это взять идентификаторы из RDD1 и найти все другие экземпляры этого в другом большом наборе данных, называть его RDD2, который составляет приблизительно 3 миллиарда строк. RDD2 создается путем запроса таблицы паркета, которая хранится в Hive, а также RDD1. Количество уникальных идентификаторов от RDD1 составляет около 10 миллионов элементов.Фильтр большого RDD, итерации по другому большому RDD - pySpark

Мой подход заключается в том, чтобы в настоящее время собирать идентификаторы и передавать их, а затем фильтровать RDD2.

Мой вопрос: есть ли более эффективный способ сделать это? Или это лучшая практика?

У меня есть следующий код -

hiveContext = HiveContext(sc) 
RDD1 = hiveContext("select * from table_1") 
RDD2 = hiveContext.sql("select * from table_2") 

ids = RDD1.map(lambda x: x[0]).distinct() # This is approximately 10 million ids 
ids = sc.broadcast(set(ids.collect())) 

RDD2_filter = RDD2.rdd.filter(lambda x: x[0] in ids.value)) 
+0

Почему вы выбираете все строки из таблицы_1, только чтобы отбросить их с помощью операции «RDD1.map()»? Это не очень эффективно. Два оператора 'ids =' сбивают с толку. Почему вы собираете его на головной компьютер, только чтобы отправить его в качестве широковещательной переменной? Вы пробовали это без 'collect()' и 'sc.broadcast'? – vy32

+0

@ vy32 - Я использовал столбцы из RDD1 в последующих вычислениях, но мне нужны идентификаторы из него для запроса второй таблицы. Что касается коллекции, вы не можете транслировать RDD, вы получаете прекрасное исключение. – RDizzl3

+0

Исключение: похоже, что вы пытаетесь транслировать RDD или ссылаетесь на RDD из действия или преобразования. Драйверы и действия RDD могут быть вызваны только драйвером, а не внутри других преобразований; см. SPARK-5063. – RDizzl3

ответ

1

Что я хотел бы сделать, это принять 300 milion ids из RDD1, постройте фильтр цветения (Bloom filter), используйте это как широковещательную переменную для фильтрации RDD2, и вы получите RDD2Partial, который содержит все палитры ключевого значения для ключа, которые находятся в RDD1, плюс некоторые ложные срабатывания. Если вы ожидаете, что результат будет в пределах порядка миллионов, то вы сможете использовать обычные операции, такие как join, cogroup и т. Д. На RDD1 и RDD2Partial, чтобы получить точный результат без каких-либо проблем.

Таким образом, вы значительно сократите время операции соединения, если вы ожидаете, что результат будет иметь разумный размер, поскольку сложность остается неизменной. Вы можете получить некоторые разумные ускорения (например, 2-10x), даже если результат будет в пределах порядка сотен миллионов.

РЕДАКТИРОВАТЬ

Расцвет фильтр может быть собран эффективно, так как вы можете комбинировать биты, установленные на один элемент с битами, установленными другим элементом с OR, которая является ассоциативной и коммутативной.

+0

спасибо, мне очень нравится это решение! Это, вероятно, вопрос о нобе, но использует ли битаррей меньше памяти, чем набор в python? – RDizzl3

2

Я думаю, было бы лучше просто использовать один оператор SQL, чтобы сделать присоединиться:

RDD2_filter = hiveContext.sql("""select distinct t2.* 
           from table_1 t1 
           join table_2 t2 on t1.id = t2.id""") 
Смежные вопросы