1

Я использую искру 2.0.1 и хочу наполнить значения nan последним хорошо известным значением в столбце.Spark/Scala: fill nan с последним хорошим наблюдением

Единственная ссылка для искры я могу найти Spark/Scala: forward fill with last observation или Fill in null with previously known good value with pyspark, которые, похоже, используют RDD.

Я бы предпочел остаться в мире данных и данных, а также обработать несколько значений nan. Возможно ли это?

Мое предположение состоит в том, что данные (первоначально загруженные, например, из файла CSV упорядочены по времени, и этот порядок сохраняется в распределенной настройке, например, заполнение знаком close/last хорошо известно. Возможно, заполнение предыдущим значением является достаточно, как для большинства записей нет 2 или более нан записей в строке. Значит ли это на самом деле держать? Дело в том, что

myDf.sort("foo").show 

уничтожить бы любой заказ, например, все null значений будет первым.

Небольшой пример:

import java.sql.{ Date, Timestamp } 
case class FooBar(foo:Date, bar:String) 
val myDf = Seq(("2016-01-01","first"),("2016-01-02","second"),("2016-wrongFormat","noValidFormat"), ("2016-01-04","lastAssumingSameDate")) 
     .toDF("foo","bar") 
     .withColumn("foo", 'foo.cast("Date")) 
     .as[FooBar] 

Результаты в

+----------+--------------------+ 
|  foo|     bar| 
+----------+--------------------+ 
|2016-01-01|    first| 
|2016-01-02|    second| 
|  null|  noValidFormat| 
|2016-01-04|lastAssumingSameDate| 
+----------+--------------------+ 

Я хотел бы, чтобы зафиксировать значение с последним известным значением хорошего. Как я могу это достичь?

+----------+--------------------+ 
|  foo|     bar| 
+----------+--------------------+ 
|2016-01-01|    first| 
|2016-01-02|    second| 
|2016-01-02|  noValidFormat| 
|2016-01-04|lastAssumingSameDate| 
+----------+--------------------+ 

редактировать

в моем случае, это было бы достаточно, чтобы заполнить значение из приведенного выше ряда, так как есть только очень ограниченные дефектные значения.

edit2

Я пытаюсь добавить столбец

val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), ("2016-wrongFormat", "noValidFormat"), ("2016-01-04", "lastAssumingSameDate")) 
    .toDF("foo", "bar") 
    .withColumn("foo", 'foo.cast("Date")) 
    .as[FooBar] 
    .withColumn("rowId", monotonically_increasing_id()) 

А потом заправить последнее значение.

myDf.withColumn("fooLag", lag('foo, 1) over Window.orderBy('rowId)).show 

Но читает следующее предупреждение: Нет Partition Defined для работы окна! Перемещение всех данных в один раздел может привести к серьезной деградации производительности. Как я могу представить значимые разделы?

+----------+--------------------+-----+----------+ 
|  foo|     bar|rowId| fooLag| 
+----------+--------------------+-----+----------+ 
|2016-01-01|    first| 0|  null| 
|2016-01-02|    second| 1|2016-01-01| 
|  null|  noValidFormat| 2|2016-01-02| 
|2016-01-04|lastAssumingSameDate| 3|  null| 
+----------+--------------------+-----+----------+ 
+0

Это лучше, но только компилируется, если установлено значение 'df.filter ('foo.isNull) .count'. У вас есть мысли о второй части? –

+0

Но это имело бы фиксированное значение/столбец. Я бы предпочел использовать последнее значение хорошего/не-NaN для каждого столбца, чтобы исправить недостающие числа. –

+0

Я не был уверен, что является точным критерием для замены 'NaN' /' null'. поэтому pls проверит эту ссылку [pyspark - fillna()] (http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.fillna) – mrsrinivas

ответ

0

Это промежуточный ответ. Однако это не так хорошо, как нет разделов/только один раздел используется. Я до сих пор ищу лучший способ решить проблему

df 
    .withColumn("rowId", monotonically_increasing_id()) 
    .withColumn("replacement", lag('columnWithNull, 1) over Window.orderBy('rowId)) 
    .withColumn("columnWithNullReplaced", 
     when($"columnWithNull" isNull, "replacement").otherwise($"columnWithNull") 

    ) 

Редактировать

Я работаю на построение лучшего решения с использованием mapPartitionsWithIndex https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2 еще не завершен.

edit2

добавления

if (i == 0) { 
      lastNotNullRow = toCarryBd.value.get(i + 1).get 
     } else { 
      lastNotNullRow = toCarryBd.value.get(i - 1).get 
     } 

приведет к желаемому результату.

+0

Я тоже - Должен быть более простой/чистый способ. – codeaperature