2015-08-11 6 views
30

Эта команда работает с HiveQL:Как экспортировать данные из искрового SQL в CSV

insert overwrite directory '/data/home.csv' select * from testtable; 

Но Спарк SQL я получаю сообщение об ошибке с трассы org.apache.spark.sql.hive.HiveQl стека:

java.lang.RuntimeException: Unsupported language features in query: 
    insert overwrite directory '/data/home.csv' select * from testtable 

Пожалуйста, руководство мне написать экспорт в CSV-функцию в Spark SQL.

ответ

45

Вы можете использовать ниже заявление, чтобы записать содержимое dataframe в формате CSV df.write.csv("/data/home/csv")

Если вам нужно записать весь dataframe в один файл CSV, а затем использовать df.coalesce(1).write.csv("/data/home/sample.csv")

Для искры 1 .x, вы можете использовать spark-csv для записи результатов в CSV-файлы

Ниже Скале сниппет будет HEL р

import org.apache.spark.sql.hive.HiveContext 
// sc - existing spark context 
val sqlContext = new HiveContext(sc) 
val df = sqlContext.sql("SELECT * FROM testtable") 
df.write.format("com.databricks.spark.csv").save("/data/home/csv") 

Чтобы записать содержимое в один файл

import org.apache.spark.sql.hive.HiveContext 
// sc - existing spark context 
val sqlContext = new HiveContext(sc) 
val df = sqlContext.sql("SELECT * FROM testtable") 
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv") 
+0

Я попробовал объединительную вещь, которую вы упомянули. Он создает каталог по указанному пути с файлом «part» и файлом «_SUCCESS». Вы знаете способ получить только один файл? –

+0

Нет, я думаю, что нет никакого способа сделать это. – sag

1

Сообщение об ошибке предполагает, что это не поддерживается в языке запросов. Но вы можете сохранить DataFrame в любом формате, как обычно, через интерфейс RDD (df.rdd.saveAsTextFile). Или вы можете проверить https://github.com/databricks/spark-csv.

+0

Скала> df.write.format ("com.databricks.spark.csv"), за исключением ("/ данные/home.csv") :. 18: ошибка: значение записи не является членом org.apache.spark.sql.SchemaRDD Нужно ли мне снова создавать текущую банку с пакетом databricks? – shashankS

+0

'DataFrame.write' был добавлен в Apache Spark 1.4.0. –

8

Самый простой способ отобразить более РДУ в DataFrame и использовать mkString:

df.rdd.map(x=>x.mkString(",")) 

На Спарк 1.5 (или даже до этого) df.map(r=>r.mkString(",")) будет делать то же самое , если вы хотите, чтобы сбрасывание CSV вы можете использовать для этого apache commons lang. например вот код, который мы используем

def DfToTextFile(path: String, 
        df: DataFrame, 
        delimiter: String = ",", 
        csvEscape: Boolean = true, 
        partitions: Int = 1, 
        compress: Boolean = true, 
        header: Option[String] = None, 
        maxColumnLength: Option[Int] = None) = { 

    def trimColumnLength(c: String) = { 
     val col = maxColumnLength match { 
     case None => c 
     case Some(len: Int) => c.take(len) 
     } 
     if (csvEscape) StringEscapeUtils.escapeCsv(col) else col 
    } 
    def rowToString(r: Row) = { 
     val st = r.mkString("~-~").replaceAll("[\\p{C}|\\uFFFD]", "") //remove control characters 
     st.split("~-~").map(trimColumnLength).mkString(delimiter) 
    } 

    def addHeader(r: RDD[String]) = { 
     val rdd = for (h <- header; 
        if partitions == 1; //headers only supported for single partitions 
        tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1) 
     rdd.getOrElse(r) 
    } 

    val rdd = df.map(rowToString).repartition(partitions) 
    val headerRdd = addHeader(rdd) 

    if (compress) 
     headerRdd.saveAsTextFile(path, classOf[GzipCodec]) 
    else 
     headerRdd.saveAsTextFile(path) 
    } 
+2

Хотя это самый простой ответ (и хороший), если текст имеет двойные кавычки, вам придется учитывать их. – devonlazarus

+0

Просто получить сообщение об ошибке после создания RDD для таблицы scala> df.rdd.map (x => x.mkString (",")); : 18: error: value rdd не является членом org.apache.spark.sql.SchemaRDD df.rdd.map (x => x.mkString (",")); – shashankS

22

Ответ на этот вопрос выше искровым CSV является правильным, но есть проблема - библиотека создает несколько файлов на основе кадра данных разделов. И это не то, что нам обычно нужно. Таким образом, вы можете объединить все разделы к одному:

df.coalesce(1). 
    write. 
    format("com.databricks.spark.csv"). 
    option("header", "true"). 
    save("myfile.csv") 

и переименовать вывод Lib (название «часть-00000») на желание файла.

Этот блог содержит более подробную информацию: https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/

+2

Должно ли df.repartition.write вместо df.write.repartition? –

+0

@ Cedric вы правы, спасибо! Передел первым! Ред. –

+2

Можно также добавить модель, если вы хотите продолжить запись в существующий файл. 'resultDF.repartition (1) .write.mode (" append "). format (" com.databricks.spark.csv "). option (" header "," true ") .save (" s3: // .. . ")' – Pramit

24

С искрой 2.Xspark-csv интегрирован в native datasource. Таким образом, необходимо утверждение упрощается (окон)

df.write 
    .option("header", "true") 
    .csv("file:///C:/out.csv") 

или UNIX

df.write 
    .option("header", "true") 
    .csv("/var/out.csv") 
+1

Это должен быть принятый ответ. –

+0

Привет всем, Есть ли способ заменить файл, поскольку он терпит неудачу, когда он пытается переписать файл. – user3341078

+0

Конечно! '.mode ("перезапись"). CSV ("/ Var/out.csv")' – Boern

0

С помощью искрового CSV можно записать в файл CSV.

val dfsql = sqlContext.sql("select * from tablename") 
dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")` 
Смежные вопросы