Я пытаюсь создать общий файл для конечных пользователей, чтобы использовать их, чтобы не обрабатывать несколько источников с гораздо большими файлами. Для этого I: A) итерация по всем исходным папкам, удаление 12 полей, которые наиболее часто запрашиваются, разворачивание паркетных файлов в новом месте, где эти результаты расположены совместно. B) Я пытаюсь вернуться к файлам, созданным на шаге A, и повторно агрегировать их, группируя по 12 полям, чтобы уменьшить их до итоговой строки для каждой уникальной комбинации.Почему файлы Spark Parquet для совокупности больше оригинала?
Что я нахожу, так это то, что шаг A уменьшает полезную нагрузку 5: 1 (примерно 250 концертов становится 48,5 гигабайтами). Шаг B, однако, вместо дальнейшего уменьшения этого, увеличится на 50% за шаг А. Однако мои совпадения совпадают.
Использование Spark 1.5.2
Мой код, измененный только для замены имен полей полем1 ... поле12, чтобы сделать его более читаемым, ниже с результатами, которые я отметил.
Хотя я не обязательно ожидаю другого сокращения 5: 1, я не знаю, что я делаю неправильно, чтобы увеличить сторону хранилища для меньших строк с помощью той же схемы. Любой, кто может помочь мне понять, что я сделал неправильно?
Спасибо!
//for each eventName found in separate source folders, do the following:
//spit out one row with key fields from the original dataset for quicker availability to clients
//results in a 5:1 reduction in size
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, cast(1 as bigint) as rCount from table"
sqlContext.sql(sqlCommand).coalesce(20).write.parquet("<aws folder>" + dt + "/" + eventName + "/")
//results in over 700 files with a total of 16,969,050,506 rows consuming 48.65 gigs of storage space in S3, compressed
//after all events are processed, aggregate the results
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, sum(rCount) as rCount from results group by field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12"
//Use a wildcard to search all sub-folders created above
sqlContext.read.parquet("<aws folder>" + dt + "/*/").registerTempTable("results")
sqlContext.sql(sqlStatement).coalesce(20).saveAsParquetFile("<a new aws folder>" + dt + "/")
//This results in 3,295,206,761 rows with an aggregate value of 16,969,050,506 for rCount but consumes 79.32 gigs of storage space in S3, compressed
//The parquet schemas created (both tables match):
|-- field1: string (nullable = true) (10 characters)
|-- field2: string (nullable = true) (15 characters)
|-- field3: string (nullable = true) (50 characters max)
|-- field4: string (nullable = true) (10 characters)
|-- field5: string (nullable = true) (10 characters)
|-- field6: string (nullable = true) (10 characters)
|-- field7: string (nullable = true) (16 characters)
|-- field8: string (nullable = true) (10 characters)
|-- field9 string (nullable = true) (15 characters)
|-- field10: string (nullable = true)(20 characters)
|-- field11: string (nullable = true)(14 characters)
|-- field12: string (nullable = true)(14 characters)
|-- rCount: long (nullable = true)
|-- dt: string (nullable = true)
bucketBy кажется невозможным использовать с DataFrameWriter в Spark 2.0.0 – eliasah