2015-07-29 2 views
0

Предположим, у меня есть DataFrame со следующими четырьмя колоннами:Обновление Dataframe со следующим непустым значением в столбце или значением prev в столбце для ключа.

Employee  Action  Updated on   Salaried on 
1    emailed  2015-07-01  2015-07-12 
1    worked  2015-07-03  null 
1    played  2015-07-06  2015-07-28 
1    finished  2015-07-07  null 
2    food   2015-07-09  null 
2    cool   2015-07-11  2015-07-10 

Ответ должен быть:

Employee  Action  Updated on   Salaried on 
1    emailed  2015-07-01  2015-07-12 
1    worked  2015-07-03  2015-07-28 
1    played  2015-07-06  2015-07-28 
1    finished  2015-07-07  2015-07-28 
2    food   2015-07-09  2015-07-10 
2    cool   2015-07-11  2015-07-10 

Что происходит?

Для каждого сотрудника на основе Обновленного по времени, если какая-либо запись в «Заплачено» равна нулю, для ближайшего будущего для этого же сотрудника потребуется значение «Заработная плата», иначе оно будет стоить того же столбца из ближайшего прошлого.

Например 5-я строка принимает значение с 6-й строки. 4-я строка принимает значение из 3-го ряда Вторая строка принимает значение из третьей строки. Примечание: Будущее получит преимущество

Мои попытки: я попытался использовать карту & уменьшить, но есть ли у нас хорошая техника, чтобы решить ее с силой искры лучше?

+0

Сколько записей на сотрудника вы ожидаете? Как долго в среднем может быть разрыв? Что произойдет, если первая запись null? Вы хотите искать неограниченные предыдущие записи (скажем, на год старше)? – zero323

+0

нет ограничений на количество записей на одного сотрудника. Средним разрывом может быть что угодно. Как я уже сказал, он подберет следующий непустой доступный, если не будет иметь нулевого значения, он будет искать только предыдущее ненулевое значение. Таким образом, каждое нулевое значение будет пытаться обновить только следующее ненулевое значение для конкретного сотрудника, иначе просто предыдущее ненулевое значение, поэтому, если в «salaRIED on» для конкретного сотрудника нет ненулевых значений, каждое значение будет null, иначе значение не будет равно null. Все остальные столбцы будут такими, как есть. – user1735076

ответ

2

Если вы принимаете неограниченное количество записей, возможный размер пробела и вас интересуют значения без ограничения временного окна, как вы описали в a comment, тогда все, что вы можете сделать, - это надеяться, что оптимизатор Catalyst сможет что-то сделать умная. Первая позволяет воспроизводить данные Пример:

import org.apache.spark.sql.functions.{coalesce, not} 
case class Record(employee: Int, action: String, updated_on: java.sql.Date, salaried_on: java.sql.Date) 

val rdd = sc.parallelize(List(
    Record(1, "emailed" , java.sql.Date.valueOf("2015-07-01"), java.sql.Date.valueOf("2015-07-12")), 
    Record(1, "worked" , java.sql.Date.valueOf("2015-07-03"), null), 
    Record(1, "played" , java.sql.Date.valueOf("2015-07-06"), java.sql.Date.valueOf("2015-07-28")), 
    Record(1, "finished", java.sql.Date.valueOf("2015-07-07"), null), 
    Record(2, "food" , java.sql.Date.valueOf("2015-07-09"), null), 
    Record(2, "cool" , java.sql.Date.valueOf("2015-07-11"), java.sql.Date.valueOf("2015-07-10")))) 

val df = sqlContext.createDataFrame(rdd) 

Первое, что мы можем сделать, это разделить данные в нулям и не аннулирует:

val dfNotNull = df.where(not($"salaried_on".isNull)) 
val dfNull = df.where($"salaried_on".isNull) 
val dfNotNullRenamed = dfNotNull. 
    withColumnRenamed("employee", "emp"). 
    withColumnRenamed("updated_on", "upd"). 
    withColumnRenamed("salaried_on", "sal"). 
    select("emp", "upd", "sal") 

Теперь мы можем использовать левое внешнее объединение по обоим и заполнить пробелы:

val joinedWithFuture = dfNull.join(
    dfNotNullRenamed, df("employee") <=> dfNotNullRenamed("emp") && 
    dfNotNullRenamed("sal") >= df("updated_on"), 
    "left_outer" 
).withColumn("salaried_on", coalesce($"salaried_on", $"sal")). 
    drop("emp").drop("sal") 

Наконец, мы можем фильтровать с помощью row_number и слияния с не нулями:

joinedWithFuture.registerTempTable("joined_with_future") 

val query = """SELECT * FROM (SELECT *, row_number() OVER (
    PARTITION BY employee, action, updated_on 
    ORDER BY ABS(CAST(timestamp(upd) as INT) - CAST(timestamp(updated_on) as INT)) 
) rn FROM joined_with_future) tmp WHERE rn = 1""" 

val dfNullImputed = sqlContext. 
    sql(query). 
    drop("rn"). 
    drop("upd"). 
    unionAll(dfNotNull). 
    orderBy("employee", "updated_on") 

Если есть еще пробелы, повторите всю процедуру с dfNotNullRenamed("sal") >= df("updated_on"), замененным на dfNotNullRenamed("sal") < df("updated_on").

+0

Что такое действие в: PARTITION BY employee, action, updated_on ?? – user1735076

+0

Что значит? Эффект искры? Нет, если вы не соберете/не увидите/не увидите или что-то подобное. – zero323

Смежные вопросы