2016-09-15 2 views
2

Я использую pyspark для обработки данных 50Gb с использованием AWS EMR с ~ 15 m4.large ядрами.Spark coalesce vs collect, который быстрее?

Каждая строка данных содержит некоторую информацию в определенное время в день. Я использую следующий цикл for для извлечения и агрегирования информации за каждый час. Наконец, я получил данные, поскольку я хочу, чтобы мой результат сэкономил в один csv файл.

# daily_df is a empty pyspark DataFrame 
for hour in range(24): 
    hourly_df = df.filter(hourFilter("Time")).groupby("Animal").agg(mean("weights"), sum("is_male")) 
    daily_df = daily_df.union(hourly_df) 

Как мне известно, я должен выполнить следующие действия, чтобы заставить pyspark.sql.Dataframe объект для сохранения 1 CSV файлов (около 1 МБ), а не более 100 файлов:

daily_df.coalesce(1).write.csv("some_local.csv") 

Это кажется потребовалось около 70 минут, чтобы закончить этот прогресс, и мне интересно, могу ли я сделать это быстрее, используя метод collect()?

daily_df_pandas = daily_df.collect() 
daily_df_pandas.to_csv("some_local.csv") 

ответ

1

Оба coalesce(1) и collect довольно плохо в целом, но с ожидаемым размером выходного около 1 МБ это не имеет никакого значения. Это просто не должно быть узким местом здесь.

Одно простое усовершенствование является падение loop ->filter ->union и выполнить один агрегацию:

df.groupby(hour("Time"), col("Animal")).agg(mean("weights"), sum("is_male")) 

Если этого недостаточно, то, скорее всего, здесь речь идет конфигурация (хорошее место для начала может быть регулировка spark.sql.shuffle.partitions, если вы этого не сделаете).

+0

Я не очень хорошо знаком с 'spark.sql.shuffle.partitions'. Должен ли я увеличивать разделы или уменьшать? –

+0

Для этого нет очевидного ответа. Обычно увеличение по умолчанию (200) может решить некоторые проблемы, особенно если мощность выражения группировки высока. Если он низкий, и у вас небольшой кластер, это не поможет вам вообще. – zero323

+0

Я, наконец, понял, что узким местом является часть 'loop'->' filter'-> 'union'. Я следовал твоим советам ('groupby' hour и animal), что сократило время до 7 минут. Большое спасибо. –

1

Чтобы сохранить в качестве отдельного файла эти параметры

Вариант 1: coalesce (1) (без воспроизведения в случайном порядке данные по сети) или repartition (1) или collect может работать для небольших наборов данных, но большая данных- что он может не работать, как и ожидалось. Поскольку все данные будут перемещены в один раздел на одном узле

вариант 1 был бы прекрасен, если один исполнитель имеет больше ОЗУ для использования, чем драйвер.

Вариант 2: Другим вариантом будет FileUtil.copyMerge() - объединить выходы в один файл.

Вариант 3: после получения файлов деталей вы можете использовать HDFS getMerge команду

Теперь вы должны решить, основываясь на ваших требований ... которых один безопаснее/быстрее

также может иметь вид на Dataframe save after join is creating numerous part files

Смежные вопросы