2016-05-05 3 views
2

Я столкнулся с проблемой, когда пытаюсь получить некоторые продукты из таблицы улья и обрабатывать/применять рулоны в искры.Spark RDD throwing NullPointerException

//function which return products from Hive table 
def getProductsList(hiveContext: org.apache.spark.sql.hive.HiveContext): scala.collection.mutable.MutableList[Product] = { 
     val products = scala.collection.mutable.MutableList[Product]()  
       val results = hiveContext.sql("select item_id,value from details where type_id=12"); 
     val collection = results.collect(); 
     var i = 0; 
     results.collect.foreach(t => { 
      val product = new Product(collection(i)(0).asInstanceOf[Long], collection(i)(1).asInstanceOf[String]); 
      i = i+ 1; 
      products += product 
     })  
     products 
     } 

Вызов функции getProductsList и применение рулонов на продукты.

val randomProducts = this.getProductsList(hiveContext) 
     val rdd = ssc.sparkContext.parallelize(randomProducts)   
     val evaluatedProducts = rdd.mapPartitions(incomingProducts => {  
    print("Hello"); 
    rulesExecutor.evalRules(incomingProducts) }) 
     val productdf = hiveContext.applySchema(evaluatedProducts, classOf[Product]) 
    }) 

Как showin в выше РДУ mapPartitions итерации не происходит, и он бросает следующее сообщение об ошибке. Но я уверен, что rdd не пуст.

Exception in thread "main" java.lang.NullPointerException 
     at org.spark-project.guava.reflect.TypeToken.method(TypeToken.java:465) 
     at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:103) 
     at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
     at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
     at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) 
     at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102) 
     at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:47) 
     at org.apache.spark.sql.SQLContext.getSchema(SQLContext.scala:995) 
     at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:488) 
     at org.apache.spark.sql.SQLContext.applySchema(SQLContext.scala:1028) 
     at com.cloudera.sprue.ValidateEan$.main(ValidateEan.scala:70) 
     at com.cloudera.sprue.ValidateEan.main(ValidateEan.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:497) 
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
16/05/05 07:44:48 INFO SparkContext: Invoking stop() from shutdown hook 

Пожалуйста, помогите мне решить эту проблему.

+0

Какую версию Spark вы используете? 'applySchema' в настоящее время устарел, вы должны использовать' .toDF() 'или' sqlContext.createDataFrame' –

+0

Спасибо Daniel. Я попытался использовать createDataFrame, но ту же ошибку, с которой я столкнулся. Ниже приведен код, как я использовал. val productdf = hiveContext.createDataFrame (rdd, classOf [Product]); –

+0

Возможно, вы можете попробовать 'hiveContext.createDataFrame (rdd)' или сопоставить продукт с кортежем: 'rdd.map ({p: Product => (p.getVal1, p.getVal2)}). ToDF (" col1 ", "col2") ' –

ответ

1

Поскольку нам нужен конечный результат как DataFrame, Давайте использовать SchemaRDD, который вернулся с hiveContext.sql().

//defining schema 
case class Product(id: Long, value: String) 

//loading data from Hive table 
val results: DataSet[Row] = hiveContext.sql("select item_id,value from details where type_id=12") 

//convert ROW type to Product type then pass it to rulesExecutor.evalRules() 
val evaluatedProducts = results.map(productRow => rulesExecutor.evalRules(Product(productRow.getLong(0), productRow.getString(1)))).toDF() 

Я бы предположить, rulesExecutor.evalRules() будет aceept Product типа. Если нет, мы можем пойти в голову с (без явного преобразования в map()).

Смежные вопросы