2016-06-05 3 views
3

Формат файла паркета чувствителен к порядку записей. Его столбчатое кодирование может создавать значительно меньшие файлы в зависимости от порядка сортировки. С другой стороны, сортировка терабайта входных записей очень дорого.Как принудительно сортировать память в Spark SQL?

Разделение на куски, предположим, 10 ГБ позволяет сохранять в памяти, генерируя почти как небольшие паркетные файлы, как если бы весь 1 ТБ был полностью отсортирован.

Возможно ли проинструктировать Spark SQL для сортировки сортировки до создания паркетного файла?

Другим вариантом использования было бы слияние многих небольших файлов Паркета в один, в то время как с использованием сортировки сортировки, прежде чем писать унифицированный файл Паркета.

ответ

1

Насколько я знаю, такой вариант недоступен из-под коробки в Spark < 2.0.0. Одна вещь, вы можете попробовать это совместить coalesce с улья SORT BY пункта до того письма, которое должно иметь подобный эффект:

val df: DataFrame = ??? 
val n: Int = ??? // 

df.coalesce(n) 
df.coalesce(n).registerTempTable("df") 
sqlContext.sql("SELECT * FROM df SORT BY foo, bar").write.parquet(...) 

или

df.coalesce(n).sortWithinPartitions($"foo", $"bar").write.parquet(...) 

Имейте в виду, что SORT BY не эквивалентно DataFrame.sort.

Спарка 2.0.0 представил sortBy и bucketBy методы, где последний один сортирует выход в каждом ведре заданных столбцов и should support Parquet:

val df: DataFrame = ??? 
val nBuckets: Int = ??? 

df.write.bucketBy(nBuckets, "foo").sortBy("foo", "bar").saveAsTable(...) 

Примечание: Это похоже на работу только при сохранении файлов Паркета с saveAsTable, но не похоже, что он поддерживает паркетную запись напрямую (df.write.bucketBy(...).sortBy(...).parquet(...)) в spark-2.0.0-preview.

+0

Thanks zero323, Насколько я понял, sortWithinPartitions просто избегает окончательной сортировки слияния. Тем не менее, каждый раздел будет полностью сортировать свои данные и, если не поместится в память, будут использоваться файлы. Где я могу получить дополнительную информацию о bucketBy и версии 2.0.0 в целом? –

+0

'sortWithinPartitions' позволяет избежать как окончательного слияния, так и перетасовки, необходимой для выполнения полной сортировки. Насколько мне известно, Spark обычно не использует сортировку в памяти, потому что не предполагает, что данные для одного раздела помещаются в память. Я не уверен, хотя, если в игре есть какие-то специфические оптимизации SQL. Вы можете преобразовать Iterator в локальную структуру и отсортировать напрямую, используя 'rdd.mapPartitions'. Что касается 'sortBy', единственной ссылкой, которую я знаю, является [соответствующий JIRA] (https://issues.apache.org/jira/browse/SPARK-12539) и источник/тесты. – zero323

+0

'spark-2.2.0' не поддерживает' df.write.bucketBy (...). SortBy (...). Паркет (...) ', но вызывает правильное исключение. [SPARK-15718 SQL лучшее сообщение об ошибке для записи данных в квадратных скобках] (https://github.com/apache/spark/commit/f34aadc54ca1a9fd4236a928d342324b26fb3a12) – ruseel

Смежные вопросы