2015-08-10 3 views
6

Я начинаю с PySpark, и у меня возникают проблемы с созданием DataFrames с вложенными объектами.Spark - Создание вложенных DataFrame

Это мой пример.

У меня есть пользователи.

$ cat user.json 
{"id":1,"name":"UserA"} 
{"id":2,"name":"UserB"} 

У пользователей есть заказы.

$ cat order.json 
{"id":1,"price":202.30,"userid":1} 
{"id":2,"price":343.99,"userid":1} 
{"id":3,"price":399.99,"userid":2} 

И мне нравится присоединяться к нему, чтобы получить такую ​​структуру, где заказы массива вложены в пользователей.

$ cat join.json 
{"id":1, "name":"UserA", "orders":[{"id":1,"price":202.30,"userid":1},{"id":2,"price":343.99,"userid":1}]} 
{"id":2,"name":"UserB","orders":[{"id":3,"price":399.99,"userid":2}]} 

Как я могу это сделать? Есть ли какое-либо вложенное соединение или что-то подобное?

>>> user = sqlContext.read.json("user.json") 
>>> user.printSchema(); 
root 
|-- id: long (nullable = true) 
|-- name: string (nullable = true) 

>>> order = sqlContext.read.json("order.json") 
>>> order.printSchema(); 
root 
|-- id: long (nullable = true) 
|-- price: double (nullable = true) 
|-- userid: long (nullable = true) 

>>> joined = sqlContext.read.json("join.json") 
>>> joined.printSchema(); 
root 
|-- id: long (nullable = true) 
|-- name: string (nullable = true) 
|-- orders: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- id: long (nullable = true) 
| | |-- price: double (nullable = true) 
| | |-- userid: long (nullable = true) 

EDIT: Я знаю, что есть возможность сделать это с помощью соединения и foldByKey, но есть ли более простой способ?

EDIT2: Я использую решение по @ zero323

def joinTable(tableLeft, tableRight, columnLeft, columnRight, columnNested, joinType = "left_outer"): 
    tmpTable = sqlCtx.createDataFrame(tableRight.rdd.groupBy(lambda r: r.asDict()[columnRight])) 
    tmpTable = tmpTable.select(tmpTable._1.alias("joinColumn"), tmpTable._2.data.alias(columnNested)) 
    return tableLeft.join(tmpTable, tableLeft[columnLeft] == tmpTable["joinColumn"], joinType).drop("joinColumn") 

добавить вторую вложенную структуру

>>> lines = sqlContext.read.json(path + "lines.json") 
>>> lines.printSchema(); 
root 
|-- id: long (nullable = true) 
|-- orderid: long (nullable = true) 
|-- product: string (nullable = true) 

orders = joinTable(order, lines, "id", "orderid", "lines") 
joined = joinTable(user, orders, "id", "userid", "orders") 
joined.printSchema() 

root 
|-- id: long (nullable = true) 
|-- name: string (nullable = true) 
|-- orders: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- id: long (nullable = true) 
| | |-- price: double (nullable = true) 
| | |-- userid: long (nullable = true) 
| | |-- lines: array (nullable = true) 
| | | |-- element: struct (containsNull = true) 
| | | | |-- _1: long (nullable = true) 
| | | | |-- _2: long (nullable = true) 
| | | | |-- _3: string (nullable = true) 

После этого столбца имена "строк форматирования строк теряются. Любые идеи?

EDIT 3: Я попытался вручную указать схему.

from pyspark.sql.types import * 
fields = [] 
fields.append(StructField("_1", LongType(), True)) 
inner = ArrayType(lines.schema) 
fields.append(StructField("_2", inner)) 
new_schema = StructType(fields) 
print new_schema 

grouped = lines.rdd.groupBy(lambda r: r.orderid) 
grouped = grouped.map(lambda x: (x[0], list(x[1]))) 
g = sqlCtx.createDataFrame(grouped, new_schema) 

Ошибка:

TypeError: StructType(List(StructField(id,LongType,true),StructField(orderid,LongType,true),StructField(product,StringType,true))) can not accept object in type <class 'pyspark.sql.types.Row'> 

ответ

6

Это будет работать только в Spark 2.0 или более поздняя версия

Сначала нужно пару импорта:

from pyspark.sql.functions import struct, collect_list 

Остальное простое агрегирование и присоединиться:

orders = spark.read.json("/path/to/order.json") 
users = spark.read.json("/path/to/user.json") 

combined = users.join(
    orders 
     .groupBy("userId") 
     .agg(collect_list(struct(*orders.columns)).alias("orders")) 
     .withColumnRenamed("userId", "id"), ["id"]) 

Для примера данных результат является:

combined.show(2, False) 
+---+-----+---------------------------+ 
|id |name |orders      | 
+---+-----+---------------------------+ 
|1 |UserA|[[1,202.3,1], [2,343.99,1]]| 
|2 |UserB|[[3,399.99,2]]    | 
+---+-----+---------------------------+ 

со схемой:

combined.printSchema() 
root 
|-- id: long (nullable = true) 
|-- name: string (nullable = true) 
|-- orders: array (nullable = true) 
| |-- element: struct (containsNull = true) 
| | |-- id: long (nullable = true) 
| | |-- price: double (nullable = true) 
| | |-- userid: long (nullable = true) 

и JSON представление:

for x in combined.toJSON().collect(): 
    print(x)  
{"id":1,"name":"UserA","orders":[{"id":1,"price":202.3,"userid":1},{"id":2,"price":343.99,"userid":1}]} 
{"id":2,"name":"UserB","orders":[{"id":3,"price":399.99,"userid":2}]} 
-1

Во-первых, вам нужно использовать userid в качестве ключа присоединиться ко второму DataFrame:

user.join(order, user.id == order.userid) 

Затем вы можете использовать map шаг для преобразования в результате записи в желаемый формат.

+0

Не совсем. «Карта» недостаточно. Если я присоединяюсь к пользователям и заказам, у меня будет 3 записи. (а я хочу всего 2). Поэтому мне также нужна какая-то агрегация (например, foldByKey) –

Смежные вопросы