2016-08-17 3 views
13

При работе с DataFrames Spark для отображения данных в столбцах требуются пользовательские функции (UDF). UDF требуют, чтобы типы аргументов были явно указаны. В моем случае мне нужно манипулировать столбцом, состоящим из массивов объектов, и я не знаю, какой тип использовать. Вот пример:Определение UDF, который принимает массив объектов в Spark DataFrame?

import sqlContext.implicits._ 

// Start with some data. Each row (here, there's only one row) 
// is a topic and a bunch of subjects 
val data = sqlContext.read.json(sc.parallelize(Seq(
    """ 
    |{ 
    | "topic" : "pets", 
    | "subjects" : [ 
    | {"type" : "cat", "score" : 10}, 
    | {"type" : "dog", "score" : 1} 
    | ] 
    |} 
    """))) 

Это относительно просто использовать встроенные в org.apache.spark.sql.functions для выполнения основных операций над данными в колонках

import org.apache.spark.sql.functions.size 
data.select($"topic", size($"subjects")).show 

+-----+--------------+ 
|topic|size(subjects)| 
+-----+--------------+ 
| pets|    2| 
+-----+--------------+ 

, и это как правило, легко писать пользовательские UDF, чтобы выполнить произвольные операции

import org.apache.spark.sql.functions.udf 
val enhance = udf { topic : String => topic.toUpperCase() } 
data.select(enhance($"topic"), size($"subjects")).show 

+----------+--------------+ 
|UDF(topic)|size(subjects)| 
+----------+--------------+ 
|  PETS|    2| 
+----------+--------------+ 

Но что, если я хочу использовать UDF для управления массивом объектов в столбце «предметы»? Какой тип я использую для аргумента в UDF? Например, если я хочу, чтобы переопределить функцию размера, вместо того, чтобы использовать одну представленную искрой:

val my_size = udf { subjects: Array[Something] => subjects.size } 
data.select($"topic", my_size($"subjects")).show 

Ясно Array[Something] не работает ... какой тип я должен использовать !? Должен ли я вообще Array[]? Понюхание говорит мне, что scala.collection.mutable.WrappedArray может иметь какое-то отношение к нему, но все же есть другой тип, который мне нужно предоставить.

ответ

16

Что вы ищете является Seq[o.a.s.sql.Row]:

import org.apache.spark.sql.Row 

val my_size = udf { subjects: Seq[Row] => subjects.size } 

Объяснение:

  • Текущее представление ArrayType, как вы уже знаете, WrappedArray так Array не будет работать, и это лучше оставаться на безопасной стороне.
  • Местный тип для StructType - . К сожалению, это означает, что доступ к отдельным полям не безопасен для типов.

Примечание:

  • Чтобы создать struct функцию переданных udf должен вернуть Product тип (Tuple* или case class), а не .
+0

я получаю это: java.lang.UnsupportedOperationException: Схема для типа org.apache.spark.sql.Row не поддерживается на org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor (ScalaReflection. scala: 733) at org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor (ScalaReflection.scala: 671) at org.apache.spark.sql.functions $ .udf (functions.scala: 3076) . .. 134 elided –

+0

@GuruprasadGV UDF должен возвращать 'Product' (' TupleN', класс case) для 'structs'. – zero323

Смежные вопросы