2015-07-21 6 views
6

Я пытаюсь загрузить файл CSV в фрейм данных Spark с помощью spark-csv [1] с помощью ноутбука Apache Zeppelin и при загрузке числового поля, которое не имеет значения, синтаксический анализатор не работает для этой строки, и линия пропускается.Как указать отсутствующее значение в dataframe

Я бы ожидал, что линия будет загружена, а значение в кадре данных загрузит строку и будет иметь значение NULL, чтобы агрегации просто игнорировали значение.

%dep 
z.reset() 
z.addRepo("my-nexus").url("<my_local_nexus_repo_that_is_a_proxy_of_public_repos>") 
z.load("com.databricks:spark-csv_2.10:1.1.0") 


%spark 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.types._ 
import com.databricks.spark.csv._ 
import org.apache.spark.sql.functions._ 

val schema = StructType(
    StructField("identifier", StringType, true) :: 
    StructField("name", StringType, true) :: 
    StructField("height", DoubleType, true) :: 
    Nil) 

val sqlContext = new SQLContext(sc) 
val df = sqlContext.read.format("com.databricks.spark.csv") 
         .schema(schema) 
         .option("header", "true") 
         .load("file:///home/spark_user/data.csv") 

df.describe("height").show() 

Вот содержимое файла данных: /home/spark_user/data.csv

identifier,name,height 
1,sam,184 
2,cath,180 
3,santa,  <-- note that there is not height recorded for Santa ! 

Здесь выход:

+-------+------+ 
|summary|height| 
+-------+------+ 
| count|  2| <- 2 of 3 lines loaded, ie. sam and cath 
| mean| 182.0| 
| stddev| 2.0| 
| min| 180.0| 
| max| 184.0| 
+-------+------+ 

В логах дирижабле я могу см. следующую ошибку при разборе строки santa:

ERROR [2015-07-21 16:42:09,940] ({Executor task launch worker-45} CsvRelation.scala[apply]:209) - Exception while parsing line: 3,santa,. 
     java.lang.NumberFormatException: empty String 
     at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842) 
     at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) 
     at java.lang.Double.parseDouble(Double.java:538) 
     at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:232) 
     at scala.collection.immutable.StringOps.toDouble(StringOps.scala:31) 
     at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:42) 
     at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV$1.apply(CsvRelation.scala:198) 
     at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV$1.apply(CsvRelation.scala:180) 
     at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
     at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:129) 
     at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126) 
     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) 
     at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
     at org.apache.spark.scheduler.Task.run(Task.scala:70) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

Итак, вы можете сказать мне, что так хорошо ... и вы были бы правы;)

Теперь я хочу добавить дополнительную колонку, скажем, возраст, и у меня всегда есть данные в этом поле.

identifier,name,height,age 
1,sam,184,30 
2,cath,180,32 
3,santa,,70 

Теперь вежливо попросить для некоторых статистики о возрасте:

%spark 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.types._ 
import com.databricks.spark.csv._ 
import org.apache.spark.sql.functions._ 

val schema = StructType(
    StructField("identifier", StringType, true) :: 
    StructField("name", StringType, true) :: 
    StructField("height", DoubleType, true) :: 
    StructField("age", DoubleType, true) :: 
    Nil) 

val sqlContext = new SQLContext(sc) 
val df = sqlContext.read.format("com.databricks.spark.csv") 
         .schema(schema) 
         .option("header", "true") 
         .load("file:///home/spark_user/data2.csv") 

df.describe("age").show() 

Результаты

+-------+----+ 
|summary| age| 
+-------+----+ 
| count| 2| 
| mean|31.0| 
| stddev| 1.0| 
| min|30.0| 
| max|32.0| 
+-------+----+ 

ВСЕ НЕПРАВИЛЬНО! Поскольку высота санты неизвестна, вся линия теряется, и расчет возраста основан только на Сэме и Кэт, а у Санта есть совершенно правильный возраст.

Вопрос: какое значение мне нужно, чтобы подключить высоту Санта, чтобы можно было загрузить CSV. Я попытался установить схему, чтобы быть все StringType но

Следующий вопрос больше о

Я нашел в API, которые можно обрабатывать значения N/A с помощью искры. Поэтому я подумал, что я мог бы загрузить свои данные со всеми столбцами, установленными в StringType, а затем сделать некоторые очистки, а затем установить только схему правильно, как написано ниже:

%spark 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.types._ 
import com.databricks.spark.csv._ 
import org.apache.spark.sql.functions._ 

val schema = StructType(
StructField("identifier", StringType, true) :: 
StructField("name", StringType, true) :: 
StructField("height", StringType, true) :: 
StructField("age", StringType, true) :: 
Nil) 

val sqlContext = new SQLContext(sc) 
val df = sqlContext.read.format("com.databricks.spark.csv").schema(schema).option("header", "true").load("file:///home/spark_user/data.csv") 

// eg. for each column of my dataframe, replace empty string by null 
df.na.replace("*", Map("" -> null)) 

val toDouble = udf[Double, String](_.toDouble) 
df2 = df.withColumn("age", toDouble(df("age"))) 

df2.describe("age").show() 

Но df.na.replace() бросает исключение и остановки:

java.lang.IllegalArgumentException: Unsupported value type java.lang.String(). 
     at org.apache.spark.sql.DataFrameNaFunctions.org$apache$spark$sql$DataFrameNaFunctions$$convertToDouble(DataFrameNaFunctions.scala:417) 
     at org.apache.spark.sql.DataFrameNaFunctions$$anonfun$4.apply(DataFrameNaFunctions.scala:337) 
     at org.apache.spark.sql.DataFrameNaFunctions$$anonfun$4.apply(DataFrameNaFunctions.scala:337) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
     at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) 
     at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
     at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
     at org.apache.spark.sql.DataFrameNaFunctions.replace0(DataFrameNaFunctions.scala:337) 
     at org.apache.spark.sql.DataFrameNaFunctions.replace(DataFrameNaFunctions.scala:304) 

Любая помощь, советы & ценится !!

[1] https://github.com/databricks/spark-csv

ответ

5

искрового CSV отсутствует этот параметр. Это has been fixed в мастер-ветке. Я думаю, вы должны использовать его или ждать следующей стабильной версии.

+0

Я протестировал последнюю версию в ветке мастера и действительно решил эту проблему. Благодаря ! –

Смежные вопросы