При работе с DataFrames Spark для отображения данных в столбцах требуются пользовательские функции (UDF). UDF требуют, чтобы типы аргументов были явно указаны. В моем случае мне нужно манипулировать столбцом, состоящим из массивов объектов, и я не знаю, какой тип использовать. Вот пример:Определение UDF, который принимает массив объектов в Spark DataFrame?
import sqlContext.implicits._
// Start with some data. Each row (here, there's only one row)
// is a topic and a bunch of subjects
val data = sqlContext.read.json(sc.parallelize(Seq(
"""
|{
| "topic" : "pets",
| "subjects" : [
| {"type" : "cat", "score" : 10},
| {"type" : "dog", "score" : 1}
| ]
|}
""")))
Это относительно просто использовать встроенные в org.apache.spark.sql.functions
для выполнения основных операций над данными в колонках
import org.apache.spark.sql.functions.size
data.select($"topic", size($"subjects")).show
+-----+--------------+
|topic|size(subjects)|
+-----+--------------+
| pets| 2|
+-----+--------------+
, и это как правило, легко писать пользовательские UDF, чтобы выполнить произвольные операции
import org.apache.spark.sql.functions.udf
val enhance = udf { topic : String => topic.toUpperCase() }
data.select(enhance($"topic"), size($"subjects")).show
+----------+--------------+
|UDF(topic)|size(subjects)|
+----------+--------------+
| PETS| 2|
+----------+--------------+
Но что, если я хочу использовать UDF для управления массивом объектов в столбце «предметы»? Какой тип я использую для аргумента в UDF? Например, если я хочу, чтобы переопределить функцию размера, вместо того, чтобы использовать одну представленную искрой:
val my_size = udf { subjects: Array[Something] => subjects.size }
data.select($"topic", my_size($"subjects")).show
Ясно Array[Something]
не работает ... какой тип я должен использовать !? Должен ли я вообще Array[]
? Понюхание говорит мне, что scala.collection.mutable.WrappedArray
может иметь какое-то отношение к нему, но все же есть другой тип, который мне нужно предоставить.
я получаю это: java.lang.UnsupportedOperationException: Схема для типа org.apache.spark.sql.Row не поддерживается на org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor (ScalaReflection. scala: 733) at org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor (ScalaReflection.scala: 671) at org.apache.spark.sql.functions $ .udf (functions.scala: 3076) . .. 134 elided –
@GuruprasadGV UDF должен возвращать 'Product' (' TupleN', класс case) для 'structs'. – zero323