Самый простой способ отобразить более РДУ в DataFrame и использовать mkString:
df.rdd.map(x=>x.mkString(","))
На Спарк 1.5 (или даже до этого) df.map(r=>r.mkString(","))
будет делать то же самое , если вы хотите, чтобы сбрасывание CSV вы можете использовать для этого apache commons lang. например вот код, который мы используем
def DfToTextFile(path: String,
df: DataFrame,
delimiter: String = ",",
csvEscape: Boolean = true,
partitions: Int = 1,
compress: Boolean = true,
header: Option[String] = None,
maxColumnLength: Option[Int] = None) = {
def trimColumnLength(c: String) = {
val col = maxColumnLength match {
case None => c
case Some(len: Int) => c.take(len)
}
if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
}
def rowToString(r: Row) = {
val st = r.mkString("~-~").replaceAll("[\\p{C}|\\uFFFD]", "") //remove control characters
st.split("~-~").map(trimColumnLength).mkString(delimiter)
}
def addHeader(r: RDD[String]) = {
val rdd = for (h <- header;
if partitions == 1; //headers only supported for single partitions
tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
rdd.getOrElse(r)
}
val rdd = df.map(rowToString).repartition(partitions)
val headerRdd = addHeader(rdd)
if (compress)
headerRdd.saveAsTextFile(path, classOf[GzipCodec])
else
headerRdd.saveAsTextFile(path)
}
Я попробовал объединительную вещь, которую вы упомянули. Он создает каталог по указанному пути с файлом «part» и файлом «_SUCCESS». Вы знаете способ получить только один файл? –
Нет, я думаю, что нет никакого способа сделать это. – sag