2015-10-16 2 views
7

, например, вследствие этогоКак сохранить искру DataFrame как csv на диске?

df.filter("project = 'en'").select("title","count").groupBy("title").sum() 

, который будет возвращать массив.

Как сохранить свечу DataFrame как csv на диске?

+1

btw это не возвращает массив, а DataFrame! [ссылка здесь] (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.GroupedData) – eliasah

+0

Если предоставленный ответ решает ваш вопрос, пожалуйста, примите его и поэтому мы можем классифицировать этот вопрос как разрешенный! – eliasah

ответ

13

Apache Spark не поддерживает собственный CSV-выход на диске.

У вас есть четыре доступных решений, хотя:

  1. Вы можете преобразовать ваш Dataframe в РДУ:

    def convertToReadableString(r : Row) = ??? 
    df.rdd.map{ convertToReadableString }.saveAsTextFile(filepath) 
    

    Это позволит создать папку путь_к_файл. Под путь к файлу, вы найдете Перегородки файлы (например, часть-000 *)

    Что я обычно делаю, если я хочу, чтобы добавить все разделы в большой CSV является

    cat filePath/part* > mycsvfile.csv 
    

    Некоторые будут использовать coalesce(1,false) создать один раздел из RDD. Обычно это плохая практика, так как она может подавить водителя, потянув все данные, которые вы собираете.

    Обратите внимание, что df.rdd вернет RDD[Row].

  2. Вы можете использовать Databricks искровым CSV library:

    • Спарк 1.4+:

      df.write.format("com.databricks.spark.csv").save(filepath) 
      
    • Спарк 1.3:

      df.save(filepath,"com.databricks.spark.csv") 
      
  3. Wit h Spark 2.xspark-csv упаковка не требуется, так как она включена в состав Spark.

    df.write.format("csv").save(filepath) 
    
  4. Вы можете конвертировать в локальной системе координат Панды данных и использовать to_csv метод (PySpark только).

Примечание: решения 1, 2 и 3 приведет файлы в формате CSV (part-*), порожденных базовой Hadoop API, которые зажигают вызовов при вызове save. У вас будет один файл part- для каждого раздела.

+1

Я думаю, что 'spark-csv' является предпочтительным решением. Нелегко создать правильную строку csv с нуля. Все диалекты и правильное экранирование могут быть довольно сложными. – zero323

+0

Я полностью согласен – eliasah

+1

В PySpark вы также можете конвертировать маленький стол в Pandas и сохранять локально. но это, вероятно, вопрос Скалы. – zero323

0

У меня была аналогичная проблема. Мне нужно было записать файл csv на драйвер, когда я подключался к кластеру в режиме клиента.

Я хотел повторно использовать тот же код синтаксического анализа CSV, что и Apache Spark, чтобы избежать потенциальных ошибок.

Я проверил код искра-csv и нашел код, ответственный за преобразование данных в исходный код csv RDD[String] в com.databricks.spark.csv.CsvSchemaRDD.

К сожалению, это жестко закодировано с помощью sc.textFile и конца соответствующего метода.

Я скопировал этот код и удалил последние строки с помощью sc.textFile и вместо этого вернул RDD.

Мой код:

/* 
    This is copypasta from com.databricks.spark.csv.CsvSchemaRDD 
    Spark's code has perfect method converting Dataframe -> raw csv RDD[String] 
    But in last lines of that method it's hardcoded against writing as text file - 
    for our case we need RDD. 
*/ 
object DataframeToRawCsvRDD { 

    val defaultCsvFormat = com.databricks.spark.csv.defaultCsvFormat 

    def apply(dataFrame: DataFrame, parameters: Map[String, String] = Map()) 
      (implicit ctx: ExecutionContext): RDD[String] = { 
    val delimiter = parameters.getOrElse("delimiter", ",") 
    val delimiterChar = if (delimiter.length == 1) { 
     delimiter.charAt(0) 
    } else { 
     throw new Exception("Delimiter cannot be more than one character.") 
    } 

    val escape = parameters.getOrElse("escape", null) 
    val escapeChar: Character = if (escape == null) { 
     null 
    } else if (escape.length == 1) { 
     escape.charAt(0) 
    } else { 
     throw new Exception("Escape character cannot be more than one character.") 
    } 

    val quote = parameters.getOrElse("quote", "\"") 
    val quoteChar: Character = if (quote == null) { 
     null 
    } else if (quote.length == 1) { 
     quote.charAt(0) 
    } else { 
     throw new Exception("Quotation cannot be more than one character.") 
    } 

    val quoteModeString = parameters.getOrElse("quoteMode", "MINIMAL") 
    val quoteMode: QuoteMode = if (quoteModeString == null) { 
     null 
    } else { 
     QuoteMode.valueOf(quoteModeString.toUpperCase) 
    } 

    val nullValue = parameters.getOrElse("nullValue", "null") 

    val csvFormat = defaultCsvFormat 
     .withDelimiter(delimiterChar) 
     .withQuote(quoteChar) 
     .withEscape(escapeChar) 
     .withQuoteMode(quoteMode) 
     .withSkipHeaderRecord(false) 
     .withNullString(nullValue) 

    val generateHeader = parameters.getOrElse("header", "false").toBoolean 
    val headerRdd = if (generateHeader) { 
     ctx.sparkContext.parallelize(Seq(
     csvFormat.format(dataFrame.columns.map(_.asInstanceOf[AnyRef]): _*) 
    )) 
    } else { 
     ctx.sparkContext.emptyRDD[String] 
    } 

    val rowsRdd = dataFrame.rdd.map(row => { 
     csvFormat.format(row.toSeq.map(_.asInstanceOf[AnyRef]): _*) 
    }) 

    headerRdd union rowsRdd 
    } 

} 
0

У меня был подобный вопрос, где я должен был сохранить содержимое dataframe в CSV файл имя, которое я определил. df.write("csv").save("<my-path>") создавал каталог, чем файл. Так что приходите к следующим решениям. Большая часть кода взята из следующих dataframe-to-csv с небольшими изменениями в логике.

def saveDfToCsv(df: DataFrame, tsvOutput: String, sep: String = ",", header: Boolean = false): Unit = { 
    val tmpParquetDir = "Posts.tmp.parquet" 

    df.repartition(1).write. 
     format("com.databricks.spark.csv"). 
     option("header", header.toString). 
     option("delimiter", sep). 
     save(tmpParquetDir) 

    val dir = new File(tmpParquetDir) 
    val newFileRgex = tmpParquetDir + File.separatorChar + ".part-00000.*.csv" 
    val tmpTsfFile = dir.listFiles.filter(_.toPath.toString.matches(newFileRgex))(0).toString 
    (new File(tmpTsvFile)).renameTo(new File(tsvOutput)) 

    dir.listFiles.foreach(f => f.delete) 
    dir.delete 
    } 
Смежные вопросы