2015-11-23 2 views
4

Я прочитал файл JSON в Spark. Этот файл имеет следующую структуру:Как создать Spark DataFrame из вложенного массива элемента структуры?

scala> tweetBlob.printSchema 
root 
|-- related: struct (nullable = true) 
| |-- next: struct (nullable = true) 
| | |-- href: string (nullable = true) 
|-- search: struct (nullable = true) 
| |-- current: long (nullable = true) 
| |-- results: long (nullable = true) 
|-- tweets: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- cde: struct (nullable = true) 
... 
... 
| | |-- cdeInternal: struct (nullable = true) 
... 
... 
| | |-- message: struct (nullable = true) 
... 
... 

Что бы я в идеале хочу это DataFrame с колоннами «CDE», «cdeInternal», «сообщение» ... как показано ниже

root 
|-- cde: struct (nullable = true) 
... 
... 
|-- cdeInternal: struct (nullable = true) 
... 
... 
|-- message: struct (nullable = true) 
... 
... 

У меня есть удалось использовать «взрываются» для извлечения элементов из массива «твитов» в колонке под названием «твитов»

scala> val tweets = tweetBlob.select(explode($"tweets").as("tweets")) 
tweets: org.apache.spark.sql.DataFrame = [tweets: struct<cde:struct<author:struct<gender:string,location:struct<city:string,country:string,state:string>,maritalStatus:struct<evidence:string,isMarried:string>,parenthood:struct<evidence:string,isParent:string>>,content:struct<sentiment:struct<evidence:array<struct<polarity:string,sentimentTerm:string>>,polarity:string>>>,cdeInternal:struct<compliance:struct<isActive:boolean,userProtected:boolean>,tracks:array<struct<id:string>>>,message:struct<actor:struct<displayName:string,favoritesCount:bigint,followersCount:bigint,friendsCount:bigint,id:string,image:string,languages:array<string>,link:string,links:array<struct<href:string,rel:string>>,listedCount:bigint,location:struct<displayName:string,objectType:string>,objectType:string,postedTime... 
scala> tweets.printSchema 
root 
|-- tweets: struct (nullable = true) 
| |-- cde: struct (nullable = true) 
... 
... 
| |-- cdeInternal: struct (nullable = true) 
... 
... 
| |-- message: struct (nullable = true) 
... 
... 

Как выбрать все столбцы внутри структуры и создать DataFrame из него? Explode не работает над структурой, если мое понимание верное.

Любая помощь приветствуется.

+0

Вопрос также спросил у [Databricks форуме] (https://forums.databricks.com/questions/2694/how- can-i-create-a-dataframe-from-a-nested-array-s.html), но ответа нет. – zapstar

ответ

11

Один из возможных способов справиться с этим - извлечь требуемую информацию из схемы. Давайте начнем с некоторых фиктивных данных:

import org.apache.spark.sql.DataFrame 
import org.apache.spark.sql.types._ 


case class Bar(x: Int, y: String) 
case class Foo(bar: Bar) 

val df = sc.parallelize(Seq(Foo(Bar(1, "first")), Foo(Bar(2, "second")))).toDF 

df.printSchema 

// root 
// |-- bar: struct (nullable = true) 
// | |-- x: integer (nullable = false) 
// | |-- y: string (nullable = true) 

и вспомогательной функции:

def children(colname: String, df: DataFrame) = { 
    val parent = df.schema.fields.filter(_.name == colname).head 
    val fields = parent.dataType match { 
    case x: StructType => x.fields 
    case _ => Array.empty[StructField] 
    } 
    fields.map(x => col(s"$colname.${x.name}")) 
} 

Наконец результаты:

df.select(children("bar", df): _*).printSchema 

// root 
// |-- x: integer (nullable = true) 
// |-- y: string (nullable = true) 
+0

Привет @ zero323 это может быть глупый вопрос, но что означает «children (« bar », df): _ *» этот синтаксис означает? –

+0

'children (" bar ", df)' - это просто вызов, который возвращает 'Seq [Column]'. ': _ *' выполняет распаковку varargs. – zero323

+0

объяснение –

6

Вы можете использовать

df 
    .select(explode(col("path_to_collection")).as("collection")) 
    .select(col("collection.*"))`: 

Пример:

scala> val json = """{"name":"Michael", "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}""" 

scala> val inline = sqlContext.read.json(sc.parallelize(json :: Nil)).select(explode(col("schools")).as("collection")).select(col("collection.*")) 

scala> inline.printSchema 
root 
|-- sname: string (nullable = true) 
|-- year: long (nullable = true) 

scala> inline.show 
+--------+----+ 
| sname|year| 
+--------+----+ 
|stanford|2010| 
|berkeley|2012| 
+--------+----+ 

Или, вы можете также использовать функцию SQL inline:

scala> val json = """{"name":"Michael", "schools":[{"sname":"stanford", "year":2010}, {"sname":"berkeley", "year":2012}]}""" 

scala> sqlContext.read.json(sc.parallelize(json :: Nil)).registerTempTable("tmp") 

scala> val inline = sqlContext.sql("SELECT inline(schools) FROM tmp") 

scala> inline.printSchema 
root 
|-- sname: string (nullable = true) 
|-- year: long (nullable = true) 

scala> inline.show 
+--------+----+ 
| sname|year| 
+--------+----+ 
|stanford|2010| 
|berkeley|2012| 
+--------+----+ 
3
scala> import org.apache.spark.sql.DataFrame 
import org.apache.spark.sql.DataFrame 

scala> import org.apache.spark.sql.types._ 
import org.apache.spark.sql.types._ 

scala> case class Bar(x: Int, y: String) 
defined class Bar 

scala> case class Foo(bar: Bar) 
defined class Foo 

scala> val df = sc.parallelize(Seq(Foo(Bar(1, "first")), Foo(Bar(2, "second")))).toDF 
df: org.apache.spark.sql.DataFrame = [bar: struct<x: int, y: string>] 


scala> df.printSchema 
root 
|-- bar: struct (nullable = true) 
| |-- x: integer (nullable = false) 
| |-- y: string (nullable = true) 


scala> df.select("bar.*").printSchema 
root 
|-- x: integer (nullable = true) 
|-- y: string (nullable = true) 


scala> 
Смежные вопросы