2016-11-30 2 views
1

Я хочу разделить dataframe на две dataframes и записать их в два отдельных паркетных файлы как этотКак написать несколько паркетных файлов параллельно в pyspark?

df = attachment_df.flatMap(process_attachment).toDF() 

large_df = df.filter(df.is_large_file == True) 
small_df = df.filter(df.is_large_file == False) 

(large_df.write 
    .mode("overwrite") 
    .parquet('large_dummy')) 

(small_df.write 
    .mode("overwrite") 
    .parquet('small_dummy')) 

Однако приведенный выше код будет писать в последовательном, и это выглядит как функция process_attachment вызывается дважды для каждого вложения , Я действительно хочу избежать дублирования вычислений, потому что очень дорого обрабатывать вложение.

Есть ли способ избежать дублирования обработки вложения и написать параллельно? Я не хочу писать в один файл паркета, используя раздел по столбцу is_large_file.

Спасибо,

ответ

2

Когда пишет искра, он пишет параллельно для каждого dataframe (на основе количества разделов). Таким образом, вы в основном выполняете последовательно две параллельные задачи (т. Е. Не должны иметь большого эффекта). Основная проблема заключается в том, что в настоящее время вы дважды перечитываете df.

Причина в том, что DAG рассчитывается отдельно для каждого действия (запись - это действие).

Если у вас достаточно памяти, вы можете улучшить это просто, выполнив df.cache() перед первой записью и df.unpersist после второй записи. Это позволит сохранить в памяти вычисление df, когда это возможно (то есть достаточно памяти).

Если у вас недостаточно памяти, а process_attachment действительно длинный, вы можете попытаться использовать persist с параметром MEMORY_AND_DISK, который бы разлил вычисления на диск, если он слишком велик (т. Е. Вы будете перезагружать с диска вместо пересчета).

Смежные вопросы