2016-04-10 2 views
0

Я новичок в SparkStreaming, когда пытался представить потоковую работу искрового Twitter, получают следующее сообщение об ошибке:Получение NPE при попытке сделать искру залитым Twitter

Lost task 0.0 in stage 0.0 (TID 0,sandbox.hortonworks.com):java.lang.NullPointerException 
at org.apache.spark.util.Utils$.decodeFileNameInURI(Utils.scala:340) 
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:365) 
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:404) 
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:396) 
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) 
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) 
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) 
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) 
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) 
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) 
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) 
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:396) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745 

Вот фрагмент кода:

val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) 
val filters = args.takeRight(args.length - 4) 

System.setProperty("twitter4j.oauth.consumerKey", consumerKey) 
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret) 
System.setProperty("twitter4j.oauth.accessToken", accessToken) 
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret) 
val sparkConf = new SparkConf().setAppName("TwitterPopularTags") 
val ssc = new StreamingContext(sparkConf, Seconds(2)) 
val stream = TwitterUtils.createStream(ssc,None, filters) 
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) 
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) 
       .map{case (topic, count) => (count, topic)} 
       .transform(_.sortByKey(false)) 
topCounts60.foreachRDD(rdd => { 
    val topList = rdd.take(10) 
    println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) 
    topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} 
}) 
ssc.start() 
ssc.awaitTermination() 

Любой ключ, почему я получаю этот NPE? Любая помощь в том, как отладить это дальше?

+0

этот 'val stream = TwitterUtils.createStream (ssc, None, filters)' выглядит нарушенным для меня, должен быть какой-то объект 'twitterAuth' там – avloss

+0

Заменен« None »на этот вызов метода и его все еще сбой при той же ошибке: def creds(): Option [twitter4j.auth.Authorization] = {object auth { val config = new twitter4j.conf.ConfigurationBuilder(). setOAuthConsumerKey ("****"). setOAuthConsumerSecret ("****") .setOAuthAccessToken ("****"). setOAuthAccessTokenSecret ("****"). build} val twitter_auth = new TwitterFactory (auth.config) val a = new twitter4j.auth.OAuthAuthorization (auth.config) val atwitter: Option [twitter4j.auth.Authorization] = Некоторые (twitter_auth.getInstance (a) .getAuthorization()) at Witter} – krisgari

ответ

0

После некоторой отладки, в моем сценарии с инициацией испуска, файл jar задания добавлен в список -jar и получает эту ошибку. Но это, похоже, ошибка в пакете искрового ядра.