2016-07-01 2 views
2

Я пытаюсь создать общий файл для конечных пользователей, чтобы использовать их, чтобы не обрабатывать несколько источников с гораздо большими файлами. Для этого I: A) итерация по всем исходным папкам, удаление 12 полей, которые наиболее часто запрашиваются, разворачивание паркетных файлов в новом месте, где эти результаты расположены совместно. B) Я пытаюсь вернуться к файлам, созданным на шаге A, и повторно агрегировать их, группируя по 12 полям, чтобы уменьшить их до итоговой строки для каждой уникальной комбинации.Почему файлы Spark Parquet для совокупности больше оригинала?

Что я нахожу, так это то, что шаг A уменьшает полезную нагрузку 5: 1 (примерно 250 концертов становится 48,5 гигабайтами). Шаг B, однако, вместо дальнейшего уменьшения этого, увеличится на 50% за шаг А. Однако мои совпадения совпадают.

Использование Spark 1.5.2
Мой код, измененный только для замены имен полей полем1 ... поле12, чтобы сделать его более читаемым, ниже с результатами, которые я отметил.

Хотя я не обязательно ожидаю другого сокращения 5: 1, я не знаю, что я делаю неправильно, чтобы увеличить сторону хранилища для меньших строк с помощью той же схемы. Любой, кто может помочь мне понять, что я сделал неправильно?

Спасибо!

//for each eventName found in separate source folders, do the following: 
//spit out one row with key fields from the original dataset for quicker availability to clients 
//results in a 5:1 reduction in size 
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, cast(1 as bigint) as rCount from table" 
sqlContext.sql(sqlCommand).coalesce(20).write.parquet("<aws folder>" + dt + "/" + eventName + "/") 
//results in over 700 files with a total of 16,969,050,506 rows consuming 48.65 gigs of storage space in S3, compressed 

//after all events are processed, aggregate the results 
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, sum(rCount) as rCount from results group by field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12" 
//Use a wildcard to search all sub-folders created above 
sqlContext.read.parquet("<aws folder>" + dt + "/*/").registerTempTable("results") 
sqlContext.sql(sqlStatement).coalesce(20).saveAsParquetFile("<a new aws folder>" + dt + "/") 
//This results in 3,295,206,761 rows with an aggregate value of 16,969,050,506 for rCount but consumes 79.32 gigs of storage space in S3, compressed 

//The parquet schemas created (both tables match): 
|-- field1: string (nullable = true) (10 characters) 
|-- field2: string (nullable = true) (15 characters) 
|-- field3: string (nullable = true) (50 characters max) 
|-- field4: string (nullable = true) (10 characters) 
|-- field5: string (nullable = true) (10 characters) 
|-- field6: string (nullable = true) (10 characters) 
|-- field7: string (nullable = true) (16 characters) 
|-- field8: string (nullable = true) (10 characters) 
|-- field9 string (nullable = true) (15 characters) 
|-- field10: string (nullable = true)(20 characters) 
|-- field11: string (nullable = true)(14 characters) 
|-- field12: string (nullable = true)(14 characters) 
|-- rCount: long (nullable = true) 
|-- dt: string (nullable = true) 

ответ

1

В общем столбчатых форматы хранения данных, как паркет очень чувствительны, когда речь идет о распределении данных (организации данных) и мощности отдельных столбцов. Чем более организованы данные, тем ниже мощность, тем эффективнее хранилище.

Агрегация, как тот, который вы применяете, должна перетасовывать данные. Когда вы проверите план выполнения, вы увидите, что он использует хэш-разделитель. Это означает, что после распределения агрегации может быть менее эффективным, чем распределение исходных данных. В то же время sum может уменьшить количество строк, но увеличить мощность для столбца rCount.

Вы можете попробовать различные инструменты, чтобы исправить для этого, но не все они доступны в Спарке 1.5.2:

  • Сортировать полный набор данные по столбцам, имеющим низкую мощность (довольно дорого из-за полную перетасовку) или sortWithinPartitions.
  • Использование partitionBy метода DataFrameWriter для разделения данных с использованием столбцов с низкой мощностью.
  • Использование bucketBy и sortBy методов DataFrameWriter (Spark 2.0.0+) для улучшения распределения данных с использованием bucketing и локальной сортировки.
+0

bucketBy кажется невозможным использовать с DataFrameWriter в Spark 2.0.0 – eliasah

Смежные вопросы