2016-02-07 1 views
4

Я хочу переписать часть моего кода, написанного с помощью RDD, для использования DataFrames. Он работал довольно гладко, пока я не нашел это:Как выполнять пользовательские операции с GroupedData в Spark?

events 
    .keyBy(row => (row.getServiceId + row.getClientCreateTimestamp + row.getClientId, row)) 
    .reduceByKey((e1, e2) => if(e1.getClientSendTimestamp <= e2.getClientSendTimestamp) e1 else e2) 
    .values 

просто начать с

events 
    .groupBy(events("service_id"), events("client_create_timestamp"), events("client_id")) 

но что дальше? Что делать, если я хотел бы перебрать все элементы в текущей группе? Возможно ли это? Спасибо заранее.

ответ

3

GroupedData не может быть использован напрямую. Данные физически не группируются, и это всего лишь логическая операция. Вы должны применить некоторый вариант agg метода, например:

events 
.groupBy($"service_id", $"client_create_timestamp", $"client_id") 
.min("client_send_timestamp") 

или

events 
.groupBy($"service_id", $"client_create_timestamp", $"client_id") 
.agg(min($"client_send_timestamp")) 

где client_send_timestamp столбец вы хотите агрегировать.

Если вы хотите сохранить информацию, чем агрегат просто join или использовать функции Window - см Find maximum row per group in Spark DataFrame

Свечи также поддерживает пользовательские агрегатные функции - см How to define and use a User-Defined Aggregate Function in Spark SQL?

Спарк 2.0+

Вы могли бы используйте Dataset.groupByKey, который предоставляет группы в качестве итератора.

+0

получил, спасибо. – homar

Смежные вопросы