2016-03-17 4 views
5

Я хочу сделать некоторую предварительную обработку моих данных, и я хочу сбросить строки, которые являются разреженными (для некоторого порогового значения).Как удалить строки со слишком большим количеством значений NULL?

Например, у меня есть таблица данных данных с 10 функциями, и у меня есть строка с 8 нулевым значением, а затем я хочу ее сбросить.

Я нашел некоторые связанные темы, но я не могу найти полезную информацию для своей цели.

stackoverflow.com/questions/3473778/count-number-of-nulls-in-a-row

Примеры, как в ссылке выше, не будет работать для меня, потому что я хочу сделать, это предварительная обработка автоматически. Я не могу писать имена столбцов и делать что-то соответствующим образом.

Так или иначе, чтобы выполнить операцию удаления без использования имен столбцов в Apache Spark с scala?

ответ

3

Дата испытания:

case class Document(a: String, b: String, c: String) 
val df = sc.parallelize(Seq(new Document(null, null, null), new Document("a", null, null), new Document("a", "b", null), new Document("a", "b", "c"), new Document(null, null, "c"))).df 

С UDF

ремиксы ответ на David и моей версии RDD ниже, вы можете сделать это, используя пользовательскую функцию, которая принимает строку:

def nullFilter = udf((x:Row) => {Range(0, x.length).count(x.isNullAt(_)) < 2}) 
df.filter(nullFilter(struct(df.columns.map(df(_)) : _*))).show 

С RDD

Вы можете превратить его в цикл rdd из столбцов в строке и подсчитать, сколько из них равно null.

sqlContext.createDataFrame(df.rdd.filter(x=> Range(0, x.length).count(x.isNullAt(_)) < 2), df.schema).show 
+1

Я могу сделать это без преобразования в RDD. Оставайтесь на линии. –

2

Это пылесос с ОДС:

import org.apache.spark.sql.functions.udf 
def countNulls = udf((v: Any) => if (v == null) 1; else 0;)) 
df.registerTempTable("foo") 

sqlContext.sql(
    "select " + df.columns.mkString(", ") + ", " + df.columns.map(c => { 
    "countNulls(" + c + ")" 
    }).mkString(" + ") + "as nullCount from foo" 
).filter($"nullCount" > 8).show 

Если создание строки запроса заставляет вас нервничать, то вы можете попробовать это:

var countCol: org.apache.spark.sql.Column = null 
df.columns.foreach(c => { 
    if (countCol == null) countCol = countNulls(col(c)) 
    else countCol = countCol + countNulls(col(c)) 
}); 

df.select(Seq(countCol as "nullCount") ++ df.columns.map(c => col(c)):_*) 
    .filter($"nullCount" > 8) 
+0

Я все для решений, которые не связаны с переключением на RDD, я не слишком увлекаюсь строковым построением запросов. Можно ли это сделать без этого? –

+0

Полностью согласен с вами. Попробуйте это сейчас. В худшем случае, я думаю, что могу сделать UDF, который принимает массив столбцов и выплескивает счет одним махом. Дай мне секунду. –

+0

Я думаю, что у меня может быть решение, смешивающее оба ответа. Оставайтесь на линии. –

1

Вот альтернатива в Спарк 2.0:

val df = Seq((null,"A"),(null,"B"),("1","C")) 
     .toDF("foo","bar") 
     .withColumn("foo", 'foo.cast("Int")) 

df.show() 

+----+---+ 
| foo|bar| 
+----+---+ 
|null| A| 
|null| B| 
| 1| C| 
+----+---+ 

df.where('foo.isNull).groupBy('foo).count().show() 

+----+-----+ 
| foo|count| 
+----+-----+ 
|null| 2| 
+----+-----+ 
+0

Возможно, у меня есть вопрос не так, но это, похоже, не отвечает на вопрос: 1) Смотрит только на одно поле. 2) Требуется указать поле. –

1

Я удивлен, что ответов нет d, что Спарк SQL поставляется с несколькими стандартными функциями, которые отвечают требованиям:

Например, у меня есть таблица dataframe с 10 функциями, и у меня есть строка со значением 8 нуль, то я хочу, чтобы бросить его.

Вы можете использовать один из вариантов DataFrameNaFunctions.drop метода с minNonNulls установлен надлежащим образом, скажем 2.

падение (minNonNulls: Int, COLS: Seq [String]): DataFrame Возвращает новый DataFrame, который помещает строки, содержащие меньше, чем minNonNulls значения, отличные от нуля и не NaN, в указанных столбцах.

И встретить изменчивость имен столбцов, как в требовании:

Я не могу писать имена столбцов и сделать что-то соответствующим образом.

Вы можете просто использовать Dataset.columns:

столбцов: Array [String] Возвращает все имена столбцов в виде массива.


Пусть говорят, что у вас есть следующий набор данных с 5 функциями (столбцы) и несколько рядов почти все null с.

val ns: String = null 
val features = Seq(("0","1","2",ns,ns), (ns, ns, ns, ns, ns), (ns, "1", ns, "2", ns)).toDF 
scala> features.show 
+----+----+----+----+----+ 
| _1| _2| _3| _4| _5| 
+----+----+----+----+----+ 
| 0| 1| 2|null|null| 
|null|null|null|null|null| 
|null| 1|null| 2|null| 
+----+----+----+----+----+ 

// drop rows with more than (5 columns - 2) = 3 nulls 
scala> features.na.drop(2, features.columns).show 
+----+---+----+----+----+ 
| _1| _2| _3| _4| _5| 
+----+---+----+----+----+ 
| 0| 1| 2|null|null| 
|null| 1|null| 2|null| 
+----+---+----+----+----+ 
Смежные вопросы