2016-11-08 3 views
1

У меня есть большой (около 85 ГБ сжатый) gzipped файл из s3, который я пытаюсь обработать с помощью Spark на AWS EMR (прямо сейчас с m4.xlarge master экземпляром и двумя m4. 10xlarge ядра каждый с объемом EBS объемом 100 ГБ). Я знаю, что gzip является нерасщепляемым файловым форматом и I'veseenitsuggested что нужно перераспределить сжатый файл, потому что Spark изначально дает RDD с одним разделом. Тем не менее, после выполненияРабота с большим gzip-файлом в Spark

scala> val raw = spark.read.format("com.databricks.spark.csv"). 
    | options(Map("delimiter" -> "\\t", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")). 
    | load("s3://path/to/file.gz"). 
    | repartition(sc.defaultParallelism * 3) 
raw: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: string, _c1: string ... 48 more fields 
scala> raw.count() 

и взгляд на интерфейс приложения Спарк, я все еще вижу только один активный исполнитель (остальные 14 мертвы) с одной задачей, и работа никогда не заканчивается (или, по крайней мере, я вы не дождались достаточно долго для этого).

  • Что здесь происходит? Может ли кто-нибудь помочь мне понять, как Spark работает в этом примере?
  • Должен ли я использовать другую конфигурацию кластера?
  • К сожалению, у меня нет контроля над режимом сжатия, но есть ли альтернативный способ борьбы с таким файлом?

ответ

3

Если формат файла не является разделяемым, то нет возможности избежать полного чтения файла на одном ядре. Чтобы распараллелить работу, вы должны знать, как назначать куски работы на разные компьютеры. В случае gzip предположим, что вы разделите его на 128M кусков. N-й кусок зависит от информации о позиции n-1-го куска, чтобы знать, как декомпрессировать, что зависит от n-2-го куска и так далее до первого.

Если вы хотите распараллеливать, вам необходимо сделать этот файл разделимым. Один из способов - распаковать его и обработать без сжатия, или вы можете разархивировать его, разделить на несколько файлов (один файл для каждой требуемой параллельной задачи) и gzip каждый файл.

+1

У меня создалось впечатление, что Spark распаковывает файл перед его переделкой. Разве это не так? Каковы четыре ссылки, о которых я говорил? – user4601931

+0

Да, Spark распаковывает файл сначала целиком (80G на одном ядре), прежде чем он сможет перетасовать его, чтобы увеличить параллельность. – Tim

+0

Хорошо, спасибо. Как вы думаете, мой кластер сможет справиться с этой задачей? Если это так, если я хочу распаковать весь файл, переделать его и затем продолжить обработку, вы думаете, что установка «spark.dynamicAllocation.enabled = true» гарантирует, что я получу одного исполнителя (с максимально возможной памятью) до выполнить декомпрессию, а затем больше исполнителей (с меньшим объемом памяти, но с несколькими ядрами) после выполнения обработки? – user4601931

Смежные вопросы