2016-05-14 1 views
0

Я использую ноутбук Jupyter с PySpark. Внутри этого я имею блок данных, который имеет схему с именами столбцов и типами (integer, ...) для этих столбцов. Теперь я использую такие методы, как flatMap, но это возвращает список кортежей, которые больше не имеют фиксированного типа. Есть ли способ достичь этого?Плоская карта PySpark должна возвращать кортежи с типизированными значениями

df.printSchema() 
root 
|-- name: string (nullable = true) 
|-- ... 
|-- ... 
|-- ratings: integer (nullable = true) 

Затем я использую flatMap сделать некоторые расчеты со значениями рейтинга (затемненные здесь):

df.flatMap(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings)) 
y_rate.toDF().printSchema() 

И теперь я получаю ошибку:

TypeError: Can not infer schema for type:

Есть ли способ использовать карту/flatMap/уменьшить, сохранив схему? или по крайней мере возвращающие кортежи, которые имеют значения определенного типа?

ответ

1

Прежде всего, вы используете неправильную функцию. flatMap будет map и flatten так предполагая, ваши данные выглядят следующим образом:

df = sc.parallelize([("foo", 0), ("bar", 10)]).toDF(["id", "ratings"]) 

выход flatMap будет эквивалентен:

sc.parallelize(['foo', 0, 'bar', 5]) 

Отсюда ошибки вы видите. Если вы действительно хотите, чтобы он не работает, вы должны использовать map:

df.rdd.map(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings)).toDF() 
## DataFrame[_1: string, _2: bigint] 

Далее, отображение над DataFrame больше не поддерживается в версии 2.0. Вы должны сначала извлечь rdd (см. df.rdd.map выше).

И, наконец, передача данных между Python и JVM крайне неэффективна. Это не только требует передачи данных между Python и JVM с соответствующей сериализации/десериализации и вывода схемы (если схема явно не предусмотрена), что также ломает лень. Лучше использовать выражения SQL для таких вещей:

from pyspark.sql.functions import when 

df.select(df.id, when(df.ratings > 5, 5).otherwise(df.ratings)) 

Если по какой-то причине вам нужно простой код Python UDF может быть лучшим выбором.

+0

Очень полезно. спасибо за ваш пример кода. Я просто не получил участие в FlatMap vs Map. – Matthias

+1

'flatMap' - это функция' RDD [T] => (T => Iterable [U]) => RDD [U] '. Другими словами, он ожидает, что функция вернет 'Itereble' (кортеж Python) и объединит эти (flattens) результат. – zero323

+0

Есть ли способ дать столбцу when/else имя в этом выражении? см. 'df.select (df.id, когда (df.ratings> 5, 5) .otherwise (df.ratings))' @ zero323 – Matthias

Смежные вопросы