4

Я вижу в этом DataBricks post, есть поддержка функций окна в SparkSql, в частности я пытаюсь использовать функцию окна lag().Функция SparkSQL - Lag?

У меня есть строки транзакций по кредитным картам, и я их отсортировал, теперь я хочу перебирать строки и для каждой строки отображают сумму транзакции и разницу в сумме текущей строки и предыдущей количество ряда.

Вслед за DataBricks пост, я придумал этот запрос, но он бросает исключение на меня, и я не могу совсем undestand почему ..

Это в PySpark .. ТХ мой dataframe уже созданный при регистрации в качестве временной таблицы.

test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag(tx.amt) OVER (PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time ROW BETWEEN PRECEDING AND CURRENT ROW)) as prev_amt from tx") 

и исключение (усеченный) ..

py4j.protocol.Py4JJavaError: An error occurred while calling o76.sql. 
: java.lang.RuntimeException: [1.67] failure: ``)'' expected but identifier OVER found 

Я действительно apprecaite любое представление, эта функция является относительно новым и там не так много, чтобы идти дальше, насколько существующие примеры или другие Похожие сообщения.

Редактировать

Я также попытался сделать это без SQL заявления, как показано ниже, но по-прежнему получаю сообщение об ошибке. Я использовал это с Hive и SQLContext и получал ту же ошибку.

windowSpec = \ 
Window \ 
    .partitionBy(h_tx_df_ordered['cc_num']) \ 
    .orderBy(h_tx_df_ordered['cc_num'],h_tx_df_ordered['trans_date'],h_tx_df_ordered['trans_time']) 

windowSpec.rowsBetween(-1, 0) 

lag_amt = \ 
    (lag(h_tx_df_ordered['amt']).over(windowSpec) - h_tx_df_ordered['amt']) 
    tx_df_ordered.select(
    h_tx_df_ordered['cc_num'], 
    h_tx_df_ordered['trans_date'], 
    h_tx_df_ordered['trans_time'], 
    h_tx_df_ordered['amt'], 
    lag_amt.alias("prev_amt")).show() 

Traceback (most recent call last): 
    File "rdd_raw_data.py", line 116, in <module> 
    lag_amt.alias("prev_amt")).show() 
    File "/opt/spark/python/pyspark/sql/dataframe.py", line 721, in select 
    jdf = self._jdf.select(self._jcols(*cols)) 
    File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/java_gateway.py", line 813, in __call__ 
    answer, self.gateway_client, self.target_id, self.name) 
    File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/protocol.py", line 308, in get_return_value 
    format(target_id, ".", name), value) 
py4j.protocol.Py4JJavaError: An error occurred while calling o152.select. 
: org.apache.spark.sql.AnalysisException: Could not resolve window function 'lag'. Note that, using window functions currently requires a HiveContext; 
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38) 

ответ

4
  1. спецификация кадры должна начинаться с ключевым словом ROWSROW не
  2. спецификации кадров требует либо Нижнего значения

    ROWS BETWEEN 1 PRECEDING AND CURRENT ROW 
    

    или UNBOUNDED ключевое слова

    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW 
    
  3. LAG функция не принимает кадр на все так правильно SQL-запрос с запаздыванием может выглядеть следующим образом

    SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, LAG(tx.amt) OVER (
        PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time 
    ) as prev_amt from tx 
    

Edit:

Что касается SQL использования DSL :

  1. Как вы можете прочитать в сообщении об ошибке

    Обратите внимание, что, используя оконные функции в настоящее время требует HiveContex

    Обязательно инициализировать sqlContext используя HiveContext не SQLContext

  2. windowSpec.rowsBetween(-1, 0) делает ничего , но еще раз спецификация кадра не поддерживается функцией lag.

+1

Благодарим за отзыв! Я обновил сообщение с дополнительным синтаксисом не-sql (новая ошибка), а также попытался обновить синтаксис SQL в соответствии с вашими предложениями (такая же ошибка). Возможно, вы могли бы рассмотреть мою вторую попытку и синтаксис SQL и, в частности, найти то, что я сделал неправильно? 'test = sqlContext.sql (" SELECT tx.cc_num, tx.trans_date, tx.trans_time, tx.amt, (lag() OVER (PARTITION by tx.cc_num ORDER BY tx.trans_date, tx.trans_time ROWS МЕЖДУ НЕОГРАНИЧЕННЫМ PRECDING AND CURRENT ROW)) от tx ")' – nameBrandon

Смежные вопросы