У меня большой RDD, назовите его RDD1, который составляет приблизительно 300 миллионов строк после начального фильтра. То, что я хотел бы сделать, это взять идентификаторы из RDD1 и найти все другие экземпляры этого в другом большом наборе данных, называть его RDD2, который составляет приблизительно 3 миллиарда строк. RDD2 создается путем запроса таблицы паркета, которая хранится в Hive, а также RDD1. Количество уникальных идентификаторов от RDD1 составляет около 10 миллионов элементов.Фильтр большого RDD, итерации по другому большому RDD - pySpark
Мой подход заключается в том, чтобы в настоящее время собирать идентификаторы и передавать их, а затем фильтровать RDD2.
Мой вопрос: есть ли более эффективный способ сделать это? Или это лучшая практика?
У меня есть следующий код -
hiveContext = HiveContext(sc)
RDD1 = hiveContext("select * from table_1")
RDD2 = hiveContext.sql("select * from table_2")
ids = RDD1.map(lambda x: x[0]).distinct() # This is approximately 10 million ids
ids = sc.broadcast(set(ids.collect()))
RDD2_filter = RDD2.rdd.filter(lambda x: x[0] in ids.value))
Почему вы выбираете все строки из таблицы_1, только чтобы отбросить их с помощью операции «RDD1.map()»? Это не очень эффективно. Два оператора 'ids =' сбивают с толку. Почему вы собираете его на головной компьютер, только чтобы отправить его в качестве широковещательной переменной? Вы пробовали это без 'collect()' и 'sc.broadcast'? – vy32
@ vy32 - Я использовал столбцы из RDD1 в последующих вычислениях, но мне нужны идентификаторы из него для запроса второй таблицы. Что касается коллекции, вы не можете транслировать RDD, вы получаете прекрасное исключение. – RDizzl3
Исключение: похоже, что вы пытаетесь транслировать RDD или ссылаетесь на RDD из действия или преобразования. Драйверы и действия RDD могут быть вызваны только драйвером, а не внутри других преобразований; см. SPARK-5063. – RDizzl3