2017-01-11 1 views
3

Есть ли простой способ преобразования заданного объекта Row в json?Как преобразовать строку в json в Spark 2 Scala

Нашел о преобразовании всей Dataframe к выходу JSon: Spark Row to JSON

Но я просто хочу, чтобы преобразовать одну строку в формате JSON. Вот псевдокод для того, что я пытаюсь сделать.

Точнее, я читаю json как входной сигнал в Dataframe. Я создаю новый вывод, который в основном основан на столбцах, но с одним полем json для всей информации, которая не вписывается в столбцы.

Мой вопрос, что это самый простой способ, чтобы написать эту функцию: convertRowToJson()

def convertRowToJson(row: Row): String = ??? 

def transformVenueTry(row: Row): Try[Venue] = { 
    Try({ 
    val name = row.getString(row.fieldIndex("name")) 
    val metadataRow = row.getStruct(row.fieldIndex("meta")) 
    val score: Double = calcScore(row) 
    val combinedRow: Row = metadataRow ++ ("score" -> score) 
    val jsonString: String = convertRowToJson(combinedRow) 
    Venue(name = name, json = jsonString) 
    }) 
} 

Решения Psidom в:

def convertRowToJSON(row: Row): String = { 
    val m = row.getValuesMap(row.schema.fieldNames) 
    JSONObject(m).toString() 
} 

работает только если строка имеет только один уровень не с вложенной Row. Это схема:

StructType(
    StructField(indicator,StringType,true), 
    StructField(range, 
    StructType(
     StructField(currency_code,StringType,true), 
     StructField(maxrate,LongType,true), 
     StructField(minrate,LongType,true)),true)) 

Также попробовал Артем предложение, но это не компилировать:

def row2DataFrame(row: Row, sqlContext: SQLContext): DataFrame = { 
    val sparkContext = sqlContext.sparkContext 
    import sparkContext._ 
    import sqlContext.implicits._ 
    import sqlContext._ 
    val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil) 
    val dataFrame = rowRDD.toDF() //XXX does not compile 
    dataFrame 
} 

ответ

1

Мне нужно прочитать вход json и произвести выход json. Большинство полей обрабатываются индивидуально, но несколько вспомогательных объектов json нужно просто сохранить.

Когда Spark считывает данные, он превращает запись в строку. Строка - подобная json структура. Это может быть преобразовано и записано в json.

Но мне нужно взять некоторые структуры sub json для строки, используемой в качестве нового поля.

Это может быть сделано следующим образом:

dataFrameWithJsonField = dataFrame.withColumn("address_json", to_json($"location.address")) 

location.address путь к югу от JSon объекта dataframe на основе входящего JSon. address_json - это имя столбца этого объекта, преобразованного в строчную версию json.

to_json выполнен в Spark 2.1.

Если генерация этого вывода json с использованием json4s address_json должна быть проанализирована в соответствии с представлением AST, иначе выход json будет иметь escape-страницу address_json.

1

По существу, вы можете иметь dataframe, который содержит только одну строку. Таким образом, вы можете попытаться отфильтровать исходный фрейм данных, а затем проанализировать его на json.

+0

Спасибо за ваше предложение. Я попытался подойти к вам: четкости row2DataFrame (строка: Строка, sqlContext: SQLContext): DataFrame = { вал sparkContext = sqlContext.sparkContext импорта sparkContext._ импорта sqlContext.implicits._ импорта sqlContext._ VAL rowRDD: РДД [строка] = sqlContext.sparkContext.makeRDD (строка :: Ноль) вал dataFrame = rowRDD.toDF() // XXX не компилируется dataFrame } Это не компилировать. –

5

Вы можете использовать getValuesMap для преобразования объекта строки в карту, а затем преобразовать его JSON:

import scala.util.parsing.json.JSONObject 
import org.apache.spark.sql._ 

val df = Seq((1,2,3),(2,3,4)).toDF("A", "B", "C")  
val row = df.first()   // this is an example row object 

def convertRowToJSON(row: Row): String = { 
    val m = row.getValuesMap(row.schema.fieldNames) 
    JSONObject(m).toString() 
} 

convertRowToJSON(row) 
// res46: String = {"A" : 1, "B" : 2, "C" : 3} 
+0

Это отлично работало. Благодаря! –

+2

Исправление: Фактически это работает только для первого уровня карты/структуры, а не для вложенной Карты, вы увидите только значения, а не ключи. –

+1

@SamiBadawi Где вы можете найти решение для вложенной карты? –

1

JSon имеет схему, но строка не имеет схемы, поэтому необходимо применить схему на Их & конвертировать в JSon. Вот как вы можете это сделать.

import org.apache.spark.sql.Row 
import org.apache.spark.sql.types._ 

def convertRowToJson(row: Row): String = { 

    val schema = StructType(
     StructField("name", StringType, true) :: 
     StructField("meta", StringType, false) :: Nil) 

     return sqlContext.applySchema(row, schema).toJSON 
} 
0

Я комбинирую предложение от: Артема, Киранма и Псидома. Сделал много следов и ошибок и пришли с этим решения, которые я тестировал для вложенных структур:

def row2Json(row: Row, sqlContext: SQLContext): String = { 
    import sqlContext.implicits 
    val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil) 
    val dataframe = sqlContext.createDataFrame(rowRDD, row.schema) 
    dataframe.toJSON.first 
} 

Это решение работает, но только во время работы в режиме драйвера.

1

Обратите внимание, что класс scala scala.util.parsing.json.JSONObject устарел и не поддерживает нулевые значения.

@deprecated ("Этот класс будет удален.", "2.11.0")

"JSONFormat.defaultFormat не обрабатывает нулевые значения"

https://issues.scala-lang.org/browse/SI-5092

+0

Спасибо Арнон. Были некоторые разговоры о модернизации поддержки json в Scala. –

0

У меня было то же самое вопрос, у меня были паркетные файлы с канонической схемой (без массивов), и я хочу только получить json-события. Я сделал следующее, и, похоже, все работает нормально (Spark 2.1):

import org.apache.spark.sql.types.StructType 
import org.apache.spark.sql.{DataFrame, Dataset, Row} 
import scala.util.parsing.json.JSONFormat.ValueFormatter 
import scala.util.parsing.json.{JSONArray, JSONFormat, JSONObject} 

def getValuesMap[T](row: Row, schema: StructType): Map[String,Any] = { 
    schema.fields.map { 
    field => 
     try{ 
     if (field.dataType.typeName.equals("struct")){ 
      field.name -> getValuesMap(row.getAs[Row](field.name), field.dataType.asInstanceOf[StructType]) 
     }else{ 
      field.name -> row.getAs[T](field.name) 
     } 
     }catch {case e : Exception =>{field.name -> null.asInstanceOf[T]}} 
    }.filter(xy => xy._2 != null).toMap 
} 

def convertRowToJSON(row: Row, schema: StructType): JSONObject = { 
    val m: Map[String, Any] = getValuesMap(row, schema) 
    JSONObject(m) 
} 
//I guess since I am using Any and not nothing the regular ValueFormatter is not working, and I had to add case jmap : Map[String,Any] => JSONObject(jmap).toString(defaultFormatter) 
val defaultFormatter : ValueFormatter = (x : Any) => x match { 
    case s : String => "\"" + JSONFormat.quoteString(s) + "\"" 
    case jo : JSONObject => jo.toString(defaultFormatter) 
    case jmap : Map[String,Any] => JSONObject(jmap).toString(defaultFormatter) 
    case ja : JSONArray => ja.toString(defaultFormatter) 
    case other => other.toString 
} 

val someFile = "s3a://bucket/file" 
val df: DataFrame = sqlContext.read.load(someFile) 
val schema: StructType = df.schema 
val jsons: Dataset[JSONObject] = df.map(row => convertRowToJSON(row, schema)) 
Смежные вопросы