2015-08-11 3 views
9

Apache Spark's DataFrameReader.json() может обрабатывать файлы gzipped JSONlines автоматически, но, похоже, нет способа получить DataFrameWriter.json() для записи сжатых файлов JSONlines. Дополнительный сетевой ввод-вывод очень дорог в облаке.Spark: запись DataFrame в сжатом формате JSON

Есть ли способ обойти эту проблему?

+0

Вы обнаружили способ сжать выход json? Я также ищу решение. –

+0

Я еще не нашел способ сделать это. – Sim

ответ

11

Следующие решения используют pyspark, но я предполагаю, что код в Scala будет аналогичным.

Первый вариант, чтобы установить следующее, когда вы инициализирует свой SparkConf:

conf = SparkConf() 
conf.set("spark.hadoop.mapred.output.compress", "true") 
conf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") 
conf.set("spark.hadoop.mapred.output.compression.type", "BLOCK") 

С выше любого файла вы производите с помощью этого sparkContext автоматически сжимаются с помощью GZIP кода.

Второй вариант, если вы хотите сжать только выбранные файлы в вашем контексте. Допустим, «ДФ» Ваш dataframe и имя файла вашего назначения:

df_rdd = self.df.toJSON() 
df_rdd.saveAsTextFile(filename,compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec") 
+0

Scala RDD API - это 'def saveAsTextFile (путь: String, codec: Class [_ <: CompressionCodec]), поэтому класс кода должен передаваться напрямую, а не как строка. – Sim

+0

Удовлетворение возможности избежать формата hadoopish при хранении данных в файле. Я не могу использовать каталог с файлами '_SUCCES' и' part- * '. Мне просто нужен конкретный одноименный файл ... – lisak

+0

Жаль о воскресении, но мне трудно поверить, что '' 'conf.set (" spark.hadoop.mapred.output.compression.codec "," true ") '' 'необходимо – tarzan

7

с искровым 2.X (и, возможно, раньше, я не проверял) есть более простой способ, чтобы написать сжатую JSON, который не требует изменение конфигурации:

val df: DataFrame = ... 
df.write.option("compression", "gzip").json("/foo/bar") 

Это также работает для CSV и для паркета, просто используйте .csv() и .parquet() вместо .json() для записи файла после установки опции сжатия.

Возможные кодеки: none, bzip2, deflate, gzip, lz4 и snappy.

Смежные вопросы