2016-03-18 3 views
2

У меня есть поток данных в формате avro (json encoded), который необходимо сохранить в виде паркетных файлов. Я мог только это сделать,Spark avro to parquet

val df = sqc.read.json(jsonRDD).toDF() 

и написать df как паркет.

Здесь схема выведена из json. Но у меня уже есть файл avsc, и я не хочу, чтобы искра выводила схему из json.

И в этом случае файлы паркета хранят информацию о схеме как StructType, а не как avro.record.type. Есть ли способ сохранить информацию об авро-схеме.

SPARK - 1.4.1

ответ

2

Завершено, используя ответ на этот вопрос avro-schema-to-spark-structtype

def getSparkSchemaForAvro(sqc: SQLContext, avroSchema: Schema): StructType = { 
    val dummyFIle = File.createTempFile("avro_dummy", "avro") 
    val datumWriter = new GenericDatumWriter[wuser]() 
    datumWriter.setSchema(avroSchema) 
    val writer = new DataFileWriter(datumWriter).create(avroSchema, dummyFIle) 
    writer.flush() 
    writer.close() 
    val df = sqc.read.format("com.databricks.spark.avro").load(dummyFIle.getAbsolutePath) 
    df.schema 
} 
0

можно программно Указание схемы

// The schema is encoded in a string 
val schemaString = "name age" 

// Import Row. 
import org.apache.spark.sql.Row; 

// Import Spark SQL data types 
import org.apache.spark.sql.types.{StructType,StructField,StringType}; 

// Generate the schema based on the string of schema 
val schema = 
    StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) 

// Convert records of the RDD (people) to Rows. 
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) 

// Apply the schema to the RDD. 
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) 

см: http://spark.apache.org/docs/latest/sql-programming-guide.html

искровой Avro затем используют типы схем для определения Avro типов следующим образом

  • Spark Тип SQL -> Тип Avro
  • ByteType -> Int
  • ShortType -> Int
  • DecimalType -> строка
  • BinaryType -> байты
  • TimestampType -> длинный
  • StructType -> запись

Вы можете написать Avro записывается следующим образом:

import com.databricks.spark.avro._ 

val sqlContext = new SQLContext(sc) 

import sqlContext.implicits._ 

val df = Seq((2012, 8, "Batman", 9.8), 
     (2012, 8, "Hero", 8.7), 
     (2012, 7, "Robot", 5.5), 
     (2011, 7, "Git", 2.0)) 
     .toDF("year", "month", "title", "rating") 

df.write.partitionBy("year", "month").avro("/tmp/output") 
Смежные вопросы