2016-11-01 3 views
2

Я работаю над искрой 2.0.0, где мое требование - использовать функцию com.facebook.hive.udf.UDFNumberRows в моем контексте sql для использования в одном из запросов. В моем кластере с запросом Hive я использую это как временную функцию, просто определяя: CREATE TEMPORARY FUNCTION myFunc AS 'com.facebook.hive.udf.UDFNumberRows', что довольно просто.Регистрация Hive Custom UDF с Spark (Spark SQL) 2.0.0

Я попытался регистрации это с sparkSession, как показано ниже, но получил сообщение об ошибке:

sparkSession.sql("""CREATE TEMPORARY FUNCTION myFunc AS 'com.facebook.hive.udf.UDFNumberRows'""") 

Ошибка:

CREATE TEMPORARY FUNCTION rowsequence AS 'com.facebook.hive.udf.UDFNumberRows' 
16/11/01 20:46:17 ERROR ApplicationMaster: User class threw exception: java.lang.UnsupportedOperationException: Use sqlContext.udf.register(...) instead. 
java.lang.UnsupportedOperationException: Use sqlContext.udf.register(...) instead. 
    at org.apache.spark.sql.catalyst.catalog.SessionCatalog.makeFunctionBuilder(SessionCatalog.scala:751) 
    at org.apache.spark.sql.execution.command.CreateFunctionCommand.run(functions.scala:61) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:60) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:58) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86) 
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:186) 
    at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167) 
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65) 
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582) 
    at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin$.delayedEndpoint$com$mediamath$spark$attribution$sparkjob$SparkVideoCidJoin$1(SparkVideoCidJoin.scala:75) 
    at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin$delayedInit$body.apply(SparkVideoCidJoin.scala:22) 
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34) 
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) 
    at scala.App$$anonfun$main$1.apply(App.scala:76) 
    at scala.App$$anonfun$main$1.apply(App.scala:76) 
    at scala.collection.immutable.List.foreach(List.scala:381) 
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) 
    at scala.App$class.main(App.scala:76) 
    at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin$.main(SparkVideoCidJoin.scala:22) 
    at com.mediamath.spark.attribution.sparkjob.SparkVideoCidJoin.main(SparkVideoCidJoin.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627) 

Кто-нибудь есть идеи, как зарегистрировать его как искра просит, то есть с регистром апи in sparkSession и SQLContext:

sqlContext.udf.register(...) 

ответ

1

Вы можете зарегистрировать UDF напрямую u петь SparkSession как в sparkSession.udf.register("myUDF", (arg1: Int, arg2: String) => arg2 + arg1). Посмотрите на подробной документации here

3

В Спарк 2.0

sparkSession.udf.register(...) 

позволяет зарегистрировать Java или Scala, UDF (функции типа Long => Long), но не Hive GenericUDFs, что справиться с LongWritable вместо Лонга, и это может иметь переменное количество аргументов.

Чтобы зарегистрировать улей UDF, ваша первая подход был правильным:

sparkSession.sql("""CREATE TEMPORARY FUNCTION myFunc AS 'com.facebook.hive.udf.UDFNumberRows'""") 

Однако вы должны включить улей поддержку первого:

SparkSession.builder().enableHiveSupport() 

и убедитесь, что «искровой улей» зависимости присутствуют в вашем пути к классу.

Объяснение:

Ваше сообщение об ошибке

java.lang.UnsupportedOperationException: Use sqlContext.udf.register(...) instead 

приходит из класса SessionCatalog.

По звонку SparkSession.builder().enableHiveSupport(), искра заменит SessionCatalog на HiveSessionCatalog, в котором реализован метод makeFunctionBuilder.

Наконец:

ОДС вы хотите использовать, «com.facebook.hive.udf.UDFNumberRows», была написана в то время, когда Оконная функции не были доступны в улье. Я предлагаю вам использовать их вместо этого. Вы можете проверить Hive Reference, Spark-SQL intro, или this if you want to stick to the scala syntax.

+0

Извините, но это не ответ на этот вопрос - это должно быть просто комментарий –

+0

@ T.Gawęda спасибо, что указал, что мой ответ был недостаточно ясен. Я потратил время, чтобы переписать его более четко. – FurryMachine

0

Проблема, с которой вы сталкиваетесь, заключается в том, что Spark не загружает библиотеку jar в своем классеPath.

В нашей команде мы загружаем внешние библиотеки с параметром --jars.

/usr/bin/spark-submit --jars external_library.jar our_program.py --our_params 

Вы можете проверить, если вы загружаете внешние библиотеки в Спарк истории - Environment Tab. (spark.yarn.secondary.jars)

Тогда вы сможете зарегистрировать свой UDF, как вы сказали. После включения HiveSupport, как говорит FurryMachine.

sparkSession.sql(""" 
    CREATE TEMPORARY FUNCTION myFunc AS 
    'com.facebook.hive.udf.UDFNumberRows' 
""") 

Вы можете нашли больше информации в искровым саммит --help

hadoop:~/projects/neocortex/src$ spark-submit --help 
Usage: spark-submit [options] <app jar | python file> [app arguments] 
Usage: spark-submit --kill [submission ID] --master [spark://...] 
Usage: spark-submit --status [submission ID] --master [spark://...] 
Usage: spark-submit run-example [options] example-class [example args] 

Options: 
    --master MASTER_URL   spark://host:port, mesos://host:port, yarn, or local. 
    --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or 
           on one of the worker machines inside the cluster ("cluster") 
           (Default: client). 
    --class CLASS_NAME   Your application's main class (for Java/Scala apps). 
    --name NAME     A name of your application. 
    --jars JARS     Comma-separated list of local jars to include on the driver 
           and executor classpaths. 
Смежные вопросы