2016-10-16 3 views

ответ

5

Это возможно.

В RDD есть saveAsObjectFile и saveAsTextFile функции. Кортежи хранятся как (value1, value2), так что вы можете позже разобрать его.

Чтение может быть сделано с textFile функцией от SparkContext, а затем .map устранить ()

Итак: Версия 1:

rdd.saveAsTextFile ("hdfs:///test1/"); 
// later, in other program 
val newRdds = sparkContext.textFile("hdfs:///test1/part-*").map (x => { 
    // here remove() and parse long/strings 
}) 

Версия 2:

rdd.saveAsObjectFile ("hdfs:///test1/"); 
// later, in other program - watch, you have tuples out of the box :) 
val newRdds = sparkContext.sc.sequenceFile("hdfs:///test1/part-*", classOf[Long], classOf[String]) 
+0

VOW, то есть в чистом виде раствора :). Но как мы читаем, используя textFile, поскольку saveAsText создавал много разных файлов. – pythonic

+0

@pythonic См. Мое обновление - вы можете прочитать диапазон файлов. Каждая часть RDD сохраняется в файле 'part-XYZŹŻ', поэтому мы можем читать только каждый файл такого имени –

3

Я бы рекомендовал используйте DataFrame, если ваш RDD находится в табличном формате. кадр данных представляет собой таблицу или двухмерную структуру, подобную массиву, в которой каждый столбец содержит измерения для одной переменной, и каждая строка содержит один случай. DataFrame имеет дополнительные метаданные из-за его табличного формата, что позволяет Spark выполнять определенные оптимизации по завершенному запросу. , где RDD - это гибкий распределенный набор данных, который представляет собой большую часть абстракции черных ящиков или ячеек данных, которые невозможно оптимизировать. Однако вы можете перейти от DataFrame к RDD и наоборот, и вы можете перейти от RDD к DataFrame (если RDD находится в табличном формате) с помощью метода toDF.

Ниже приведен пример для создания/хранить DataFrame в CSV и паркета в формате HDFS,

val conf = { 
    new SparkConf() 
    .setAppName("Spark-HDFS-Read-Write") 
} 

val sqlContext = new SQLContext(sc) 

val sc = new SparkContext(conf) 

val hdfs = "hdfs:///" 
val df = Seq((1, "Name1")).toDF("id", "name") 

// Writing file in CSV format 
df.write.format("com.databricks.spark.csv").mode("overwrite").save(hdfs + "user/hdfs/employee/details.csv") 

// Writing file in PARQUET format 
df.write.format("parquet").mode("overwrite").save(hdfs + "user/hdfs/employee/details") 

// Reading CSV files from HDFS 
val dfIncsv = sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", "true").load(hdfs + "user/hdfs/employee/details.csv") 

// Reading PQRQUET files from HDFS 
val dfInParquet = sqlContext.read.parquet(hdfs + "user/hdfs/employee/details") 
Смежные вопросы