2016-10-20 5 views
7

Я использую Spark 1.6.0 и Scala.Как сохранить DataFrame как сжатый (gzipped) CSV?

Я хочу сохранить DataFrame как сжатый формат CSV.

Вот то, что я до сих пор (предположим, что у меня уже есть df и sc, как SparkContext):

//set the conf to the codec I want 
sc.getConf.set("spark.hadoop.mapred.output.compress", "true") 
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "true") 
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") 
sc.getConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK") 

df.write 
    .format("com.databricks.spark.csv") 
    .save(my_directory) 

Выход не в формате gz.

+0

Связанный вопрос о RDD: http://stackoverflow.com/questions/32231650/spark-rdd-saveastextfile-gzip –

ответ

4

На искровым CSV GitHub: https://github.com/databricks/spark-csv

можно прочитать:

codec: кодек сжатия, используемый при сохранении файла. Должно быть полное имя класса, реализующего org.apache.hadoop.io.compress.CompressionCodec или один из нечувствительных к регистру сокращений имен (bzip2, gzip, lz4 и snappy). По умолчанию отсутствует сжатие, когда кодек не указан.

В вашем случае, это должно работать: df.write.format("com.databricks.spark.csv").codec("gzip")\ .save('my_directory/my_file.gzip')

10

Этот код работает для Спарк 2.1, где .codec не доступен.

df.write 
    .format("com.databricks.spark.csv") 
    .option("codec", "org.apache.hadoop.io.compress.GzipCodec") 
    .save(my_directory) 

Для Спарк 2.2, вы можете использовать опцию df.write.csv(...,codec="gzip") описано здесь: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=codec

+1

Хотя этот код может ответить на вопрос, предоставляя дополнительный контекст относительно того, почему и/или как этот код отвечает на вопрос улучшает его долгосрочную ценность. – manniL

+0

В случае использования формата «json» сжатие не получается. – Disha

+0

Похоже, что аргумент ключевого слова был изменен на 'compression'. https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=codec#pyspark.sql.DataFrameWriter.csv – volker238

4

С искрой 2.0+, это стало немного проще:

df.write.csv("path", compression="gzip") 

Вам не нужен внешний пакет Databricks CSV.

Писатель csv() поддерживает ряд удобных опций. Например:

  • sep: Установить символ разделителя.
  • quote: Как и как указывать значения.
  • header: Включить ли строку заголовка.

Есть также ряд других кодеков сжатия, которые можно использовать в дополнение к gzip:

  • bzip2
  • lz4
  • snappy
  • deflate

Полные Свечи документы для csv() писателя здесь: Python/Scala

+0

Благодарим за ссылку на документацию csv writer и не предоставляем только данные ответ! –

+0

@LaurensKoppenol - Ну, если честно, поддержка CSV, добавленная к исходному искры, первоначально начиналась как внешний CSV-пакет Databricks [связанный с] (https://github.com/databricks/spark-csv) в принятом ответе. :) Этот пакет доступен любому пользователю Spark, но, начиная с Spark 2.0, он больше не нужен. –

1

Чтобы написать CSV-файл с заголовками и переименовать файл часть-000 в .csv.gzip

DF.coalesce(1).write.format("com.databricks.spark.csv").mode("overwrite") 
.option("header","true") 
.option("codec",org.apache.hadoop.io.compress.GzipCodec").save(tempLocationFileName) 

copyRename(tempLocationFileName, finalLocationFileName) 

def copyRename(srcPath: String, dstPath: String): Unit = { 
    val hadoopConfig = new Configuration() 
    val hdfs = FileSystem.get(hadoopConfig) 
    FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
    // the "true" setting deletes the source files once they are merged into the new output 
} 

Если вы не нужен заголовок, а затем установите значение false, и вам тоже не нужно будет делать коалесценцию. Это будет быстрее писать тоже.

Смежные вопросы