2016-03-14 2 views
1

Я переношу следующую функцию в функцию sql udf spark.Перенос специфического SQL-запроса на Spark's UDF

DROP FUNCTION IF EXISTS anyarray_enumerate(anyarray); 
CREATE FUNCTION anyarray_enumerate(anyarray) 
RETURNS TABLE (index bigint, value anyelement) AS 
$$ 
SELECT 
    row_number() OVER(), 
    value 
FROM (
    SELECT unnest($1) AS value 
) AS unnested 
$$ 
LANGUAGE sql IMMUTABLE; 

Я не понимаю, что искровой вывод sql подобен полученному в SQL. Любая помощь или идея?

demo=# select anyarray_enumerate(array[599,322,119,537]); 
anyarray_enumerate 
-------------------- 
(1,599) 
(2,322) 
(3,119) 
(4,537) 
(4 rows) 

Мой текущий код:

def anyarray_enumerate[T](anyarray: WrappedArray[T]) = anyarray.zipWithIndex 
// Registers a function as a UDF so it can be used in SQL statements. 
sqlContext.udf.register("anyarray_enumerate", anyarray_enumerate(_:WrappedArray[Int])) 

Спасибо

ответ

1

Ваш UDF возвращает весь массив кортежей в одной строке:

spark.sql("select anyarray_enumerate(array(599, 322, 119, 537)) as foo").show() 
+--------------------+ 
|     foo| 
+--------------------+ 
|[[599,0], [322,1]...| 
+--------------------+ 

, но вы можете использовать explode() функция разбиения на несколько строк:

spark.sql("select explode(anyarray_enumerate(array(599, 322, 119, 537))) as foo").show() 
+-------+ 
| foo| 
+-------+ 
|[599,0]| 
|[322,1]| 
|[119,2]| 
|[537,3]| 
+-------+ 

Кроме того, метод zipWithIndex возвращает значение сначала и второе значение, в отличие от вашей команды SQL, но это легко фиксируется в UDF.

Смежные вопросы