2014-07-30 3 views
17

Я новичок в искру и искрах sql, и я пытался запросить некоторые данные, используя искру SQL.Создание пользовательской функции в Spark-SQL

Мне нужно получить месяц с даты, заданной в виде строки.

Я думаю, что невозможно запросить месяц непосредственно из sparkqsl, поэтому я подумывал написать определенную пользователем функцию в scala.

Можно ли написать udf в sparkSQL, и, если возможно, кто-нибудь может предложить лучший способ записи udf.

Пожалуйста, помогите

+0

Они говорят на [официальный сайт] (https://spark.apache.org/ docs/latest/sql-programming-guide.html), что Spark SQL по-прежнему является альфа-вещью, которая [по-видимому, не имеет официальной ссылки на синтаксис] (http://apache-spark-user-list.1001560.n3.nabble.com /Supported-SQL-syntax-in-Spark-SQL-td9538.html) на данный момент. Люди, которые действительно знают что-то о Spark SQL *, могут здесь повсюду, но на данный момент у вас может быть больше шансов узнать о нем новые вещи после списка рассылки Apache Spark (http: // apache-spark-user-list. 1001560.n3.nabble.com/). –

ответ

10

Вы можете сделать это, по крайней мере, для фильтрации, если вы готовы использовать запрос языка интегрированы.

Для файла данных dates.txt, содержащий:

one,2014-06-01 
two,2014-07-01 
three,2014-08-01 
four,2014-08-15 
five,2014-09-15 

Вы можете упаковать как много Scala даты магии в вашей UDF, как вы хотите, но я буду держать это просто:

def myDateFilter(date: String) = date contains "-08-" 

Set все это выглядит следующим образом: это много от Programming guide.

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
import sqlContext._ 

// case class for your records 
case class Entry(name: String, when: String) 

// read and parse the data 
val entries = sc.textFile("dates.txt").map(_.split(",")).map(e => Entry(e(0),e(1))) 

Вы можете использовать UDF как часть вашего ИНЕКЕ:

val augustEntries = entries.where('when)(myDateFilter).select('name, 'when) 

и увидеть результаты:

augustEntries.map(r => r(0)).collect().foreach(println) 

Обратите внимание на версию метода where я использовал, в документе указано следующее:

def where[T1](arg1: Symbol)(udf: (T1) ⇒ Boolean): SchemaRDD 

Итак, UDF может принимать только один аргумент, но вы можете составить несколько вызовов .where() для фильтрации по нескольким столбцам.

Edit для Спарк 1.2.0 (и на самом деле 1.1.0 тоже)

Хотя это на самом деле не документированы, Спарк теперь поддерживает регистрации UDF, так что может быть запрошена от SQL.

выше UDF может быть зарегистрирован с помощью:

sqlContext.registerFunction("myDateFilter", myDateFilter) 

и если таблица была зарегистрирована

sqlContext.registerRDDAsTable(entries, "entries") 

это может быть запрошена с помощью

sqlContext.sql("SELECT * FROM entries WHERE myDateFilter(when)") 

Для получения дополнительной информации см this example.

+0

как насчет 'UDAF' (функция агрегации пользовательского определения)? – chenzhongpu

+1

Я тоже об этом задумывался, но не нашел никаких доказательств того, что он поддерживается до сих пор. Он ** поддерживается **, если вы хотите написать запрос на прием, как вы можете видеть в [тестах] (https://github.com/apache/spark/blob/master/sql/hive/src/ test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala) –

+1

Оказалось, что это отслеживается в [SPARK-3947] (https://issues.apache.org/jira/browse/SPARK -3947) - пока не поддерживается. –

1

В PySpark 1.5 и выше, мы можем легко достичь этого с помощью встроенной функции.

Ниже приведен пример:

raw_data = 
[ 

("2016-02-27 23:59:59", "Gold", 97450.56), 

("2016-02-28 23:00:00", "Silver", 7894.23), 

("2016-02-29 22:59:58", "Titanium", 234589.66)] 


Time_Material_revenue_df = 
sqlContext.createDataFrame(raw_data, ["Sold_time", "Material", "Revenue"]) 

from pyspark.sql.functions import * 

Day_Material_reveneu_df = Time_Material_revenue_df.select(to_date("Sold_time").alias("Sold_day"), "Material", "Revenue") 
1

В Спарк 2.0, вы можете сделать это:

// define the UDF 
def convert2Years(date: String) = date.substring(7, 11) 
// register to session 
sparkSession.udf.register("convert2Years", convert2Years(_: String)) 
val moviesDf = getMoviesDf // create dataframe usual way 
moviesDf.createOrReplaceTempView("movies") // 'movies' is used in sql below 
val years = sparkSession.sql("select convert2Years(releaseDate) from movies") 
Смежные вопросы