2016-06-30 3 views
2

Я пытаюсь получить средние оценки всех объектов json в файле. Я загрузил файл и преобразовывался в фрейм данных, но получал ошибку при разборе для avg. Пример запроса:Spark: как разобрать несколько json со списком массивов Struct?

{ 
     "country": "France", 
     "customerId": "France001", 
     "visited": [ 
      { 
       "placeName": "US", 
       "rating": "2.3", 
       "famousRest": "N/A", 
       "placeId": "AVBS34" 

      }, 
       { 
       "placeName": "US", 
       "rating": "3.3", 
       "famousRest": "SeriousPie", 
       "placeId": "VBSs34" 

      }, 
       { 
       "placeName": "Canada", 
       "rating": "4.3", 
       "famousRest": "TimHortons", 
       "placeId": "AVBv4d" 

      }   
    ] 
} 

так для этого JSON, рейтинг США Avg будет (2,3 + 3,3)/2 = 2,8

{ 
     "country": "Egypt", 
     "customerId": "Egypt009", 
     "visited": [ 
      { 
       "placeName": "US", 
       "rating": "1.3", 
       "famousRest": "McDonald", 
       "placeId": "Dedcf3" 

      }, 
       { 
       "placeName": "US", 
       "rating": "3.3", 
       "famousRest": "EagleNest", 
       "placeId": "CDfet3" 

      }, 


} 

{ 
     "country": "Canada", 
     "customerId": "Canada012", 
     "visited": [ 
      { 
       "placeName": "UK", 
       "rating": "3.3", 
       "famousRest": "N/A", 
       "placeId": "XSdce2" 

      }, 


    ] 
} 

для этого Avg для нас = (3,3 +1,3)/2 = 2,3

так над всеми, средний рейтинг будет: (2,8 + 2,3)/2 = 2,55 (только два запроса имеют «США» в их посещаемой списке)

моей схеме:

root 
|-- country: string(nullable=true) 
|-- customerId:string(nullable=true) 
|-- visited: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- placeId: string (nullable = true) 
| | |-- placeName: string (nullable = true) 
| | |-- famousRest: string (nullable = true) 
| | |-- rating: string (nullable = true) 

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
val df = sqlContext.jsonFile("temp.txt") 
df.show() 

так что в основном мне нужно получить среднее значение оценок, где placeName = 'US' in say, например. AVG_RATING = сумма рейтинга в каждом объекте json, где placeName - US/count такой посещенной записи, и FINAL_VALUE = сумма всех AVG_RATING в каждом объекте json с placeName 'US'/count всех json-объектов с placeName = 'US'.

До сих пор я пытался:

df.registerTempTable("people") 
    sqlContext.sql("select avg(expResults.rank) from people LATERAL VIEW explode(visited)people AS expResults where expResults.placeName = 'US' ").collect().foreach(println) 

    val result = df.select("*").where(array_contains (df("visited.placeName"), "US")); - gives the list where visited array contains US. But I am not sure how do parse through list of structs. 

Может кто-нибудь сказать мне, как я могу это сделать?

+2

Вы можете вставить только один образец json для попытки? – WoodChopper

+1

@WoodChopper обновлен с образцом объекта json –

+0

Вы нашли решение для этого? –

ответ

2

Похоже вы хотите что-то вроде этого:

import org.apache.spark.sql.functions.{avg, explode} 

val result = df 
    .withColumn("visit", explode($"visited")) // Explode visits 
    .groupBy($"customerId", $"visit.placeName") // Group by using dot syntax 
    .agg(avg($"visit.rating".cast("double")).alias("tmp")) 
    .groupBy($"placeName").agg(avg($"tmp").alias("value")) 

После этого вы можете фильтровать это для страны, по вашему выбору.

result.where($"placeName" === "US").show 
// +---------+-----+ 
// |placeName|value| 
// +---------+-----+ 
// |  US| 2.55| 
// +---------+-----+ 

Менее элегантный подход заключается в использовании UDF:

import org.apache.spark.sql.Row 
import org.apache.spark.sql.functions.udf 

def userAverage(country: String) = udf((visits: Seq[Row]) => Try { 
    val filtered = visits 
    .filter(_.getAs[String]("placeName") == country) 
    .map(_.getAs[String]("rating").toDouble) 
    filtered.sum/filtered.size 
}.toOption) 

df.select(userAverage("US")($"visited").as("tmp")).na.drop.agg(avg("tmp")) 

Примечания: Это следует за Decription представленного в вопросе путем вычисления средних значений в среднем, который отличается от принятого ответа. Для простых:

+1

Ну, вы можете попробовать UDF, чтобы избежать взрыва, но в целом поддержка Spark для вложенных структур довольно ограничена, поэтому я не ожидал впечатляющей производительности, особенно с JSON. – zero323

+1

Спасибо за объяснение ур.Могу ли я добавить фильтр здесь? как говорят, используйте рейтинг для нас в среднем средстве, только если он выше 3 (visit.rating)? –

+0

val result = df .withColumn ("visit", explode ($ "visited")). Filter ($ "visited.rating"> 3.0) .groupBy ($ "customerId", $ "visit.placeName") .gg (avg ($ "visit.rating" .cast ("double")). alias ("tmp")) .groupBy ($ "placeName"). agg (avg ($ "tmp"). alias (" значение ")) –

0

Выполняет мое решение проблемы.

val DF = sqlContext.jsonFile("sample.json") 


DF.registerTempTable("temp") 


sqlContext.sql("select place_and_rating.placeName as placeName, avg(place_and_rating.rating) as avg_rating from temp lateral view explode(visited) exploded_table as place_and_rating where place_and_rating.placeName='US' group by place_and_rating.placeName").show() 
+0

Я получаю отказ: ожидается «союз», но будет найден идентификатор для решения ур. Я что-то пропустил? –

+0

Мне нужно что-нибудь импортировать? –

+0

Вам не нужно ничего импортировать для этого решения. jsonFile & resisterTempTable - встроенные методы, доступные для sqlContext, а остальное - просто HQL. – Rahul

Смежные вопросы