Я вижу в этом DataBricks post, есть поддержка функций окна в SparkSql, в частности я пытаюсь использовать функцию окна lag().Функция SparkSQL - Lag?
У меня есть строки транзакций по кредитным картам, и я их отсортировал, теперь я хочу перебирать строки и для каждой строки отображают сумму транзакции и разницу в сумме текущей строки и предыдущей количество ряда.
Вслед за DataBricks пост, я придумал этот запрос, но он бросает исключение на меня, и я не могу совсем undestand почему ..
Это в PySpark .. ТХ мой dataframe уже созданный при регистрации в качестве временной таблицы.
test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag(tx.amt) OVER (PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time ROW BETWEEN PRECEDING AND CURRENT ROW)) as prev_amt from tx")
и исключение (усеченный) ..
py4j.protocol.Py4JJavaError: An error occurred while calling o76.sql.
: java.lang.RuntimeException: [1.67] failure: ``)'' expected but identifier OVER found
Я действительно apprecaite любое представление, эта функция является относительно новым и там не так много, чтобы идти дальше, насколько существующие примеры или другие Похожие сообщения.
Редактировать
Я также попытался сделать это без SQL заявления, как показано ниже, но по-прежнему получаю сообщение об ошибке. Я использовал это с Hive и SQLContext и получал ту же ошибку.
windowSpec = \
Window \
.partitionBy(h_tx_df_ordered['cc_num']) \
.orderBy(h_tx_df_ordered['cc_num'],h_tx_df_ordered['trans_date'],h_tx_df_ordered['trans_time'])
windowSpec.rowsBetween(-1, 0)
lag_amt = \
(lag(h_tx_df_ordered['amt']).over(windowSpec) - h_tx_df_ordered['amt'])
tx_df_ordered.select(
h_tx_df_ordered['cc_num'],
h_tx_df_ordered['trans_date'],
h_tx_df_ordered['trans_time'],
h_tx_df_ordered['amt'],
lag_amt.alias("prev_amt")).show()
Traceback (most recent call last):
File "rdd_raw_data.py", line 116, in <module>
lag_amt.alias("prev_amt")).show()
File "/opt/spark/python/pyspark/sql/dataframe.py", line 721, in select
jdf = self._jdf.select(self._jcols(*cols))
File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/java_gateway.py", line 813, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/protocol.py", line 308, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o152.select.
: org.apache.spark.sql.AnalysisException: Could not resolve window function 'lag'. Note that, using window functions currently requires a HiveContext;
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
Благодарим за отзыв! Я обновил сообщение с дополнительным синтаксисом не-sql (новая ошибка), а также попытался обновить синтаксис SQL в соответствии с вашими предложениями (такая же ошибка). Возможно, вы могли бы рассмотреть мою вторую попытку и синтаксис SQL и, в частности, найти то, что я сделал неправильно? 'test = sqlContext.sql (" SELECT tx.cc_num, tx.trans_date, tx.trans_time, tx.amt, (lag() OVER (PARTITION by tx.cc_num ORDER BY tx.trans_date, tx.trans_time ROWS МЕЖДУ НЕОГРАНИЧЕННЫМ PRECDING AND CURRENT ROW)) от tx ")' – nameBrandon