Я использую искру 2.0.1 и хочу наполнить значения nan последним хорошо известным значением в столбце.Spark/Scala: fill nan с последним хорошим наблюдением
Единственная ссылка для искры я могу найти Spark/Scala: forward fill with last observation или Fill in null with previously known good value with pyspark, которые, похоже, используют RDD.
Я бы предпочел остаться в мире данных и данных, а также обработать несколько значений nan. Возможно ли это?
Мое предположение состоит в том, что данные (первоначально загруженные, например, из файла CSV упорядочены по времени, и этот порядок сохраняется в распределенной настройке, например, заполнение знаком close/last хорошо известно. Возможно, заполнение предыдущим значением является достаточно, как для большинства записей нет 2 или более нан записей в строке. Значит ли это на самом деле держать? Дело в том, что
myDf.sort("foo").show
уничтожить бы любой заказ, например, все null
значений будет первым.
Небольшой пример:
import java.sql.{ Date, Timestamp }
case class FooBar(foo:Date, bar:String)
val myDf = Seq(("2016-01-01","first"),("2016-01-02","second"),("2016-wrongFormat","noValidFormat"), ("2016-01-04","lastAssumingSameDate"))
.toDF("foo","bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
Результаты в
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
| null| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
Я хотел бы, чтобы зафиксировать значение с последним известным значением хорошего. Как я могу это достичь?
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
|2016-01-02| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
редактировать
в моем случае, это было бы достаточно, чтобы заполнить значение из приведенного выше ряда, так как есть только очень ограниченные дефектные значения.
edit2
Я пытаюсь добавить столбец
val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), ("2016-wrongFormat", "noValidFormat"), ("2016-01-04", "lastAssumingSameDate"))
.toDF("foo", "bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
.withColumn("rowId", monotonically_increasing_id())
А потом заправить последнее значение.
myDf.withColumn("fooLag", lag('foo, 1) over Window.orderBy('rowId)).show
Но читает следующее предупреждение: Нет Partition Defined для работы окна! Перемещение всех данных в один раздел может привести к серьезной деградации производительности. Как я могу представить значимые разделы?
+----------+--------------------+-----+----------+
| foo| bar|rowId| fooLag|
+----------+--------------------+-----+----------+
|2016-01-01| first| 0| null|
|2016-01-02| second| 1|2016-01-01|
| null| noValidFormat| 2|2016-01-02|
|2016-01-04|lastAssumingSameDate| 3| null|
+----------+--------------------+-----+----------+
Это лучше, но только компилируется, если установлено значение 'df.filter ('foo.isNull) .count'. У вас есть мысли о второй части? –
Но это имело бы фиксированное значение/столбец. Я бы предпочел использовать последнее значение хорошего/не-NaN для каждого столбца, чтобы исправить недостающие числа. –
Я не был уверен, что является точным критерием для замены 'NaN' /' null'. поэтому pls проверит эту ссылку [pyspark - fillna()] (http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.fillna) – mrsrinivas