0
Я новичок в SparkStreaming, когда пытался представить потоковую работу искрового Twitter, получают следующее сообщение об ошибке:Получение NPE при попытке сделать искру залитым Twitter
Lost task 0.0 in stage 0.0 (TID 0,sandbox.hortonworks.com):java.lang.NullPointerException
at org.apache.spark.util.Utils$.decodeFileNameInURI(Utils.scala:340)
at org.apache.spark.util.Utils$.fetchFile(Utils.scala:365)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:404)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:396)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$updateDependencies(Executor.scala:396)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:192)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745
Вот фрагмент кода:
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
val filters = args.takeRight(args.length - 4)
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
val sparkConf = new SparkConf().setAppName("TwitterPopularTags")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val stream = TwitterUtils.createStream(ssc,None, filters)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
.map{case (topic, count) => (count, topic)}
.transform(_.sortByKey(false))
topCounts60.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
ssc.start()
ssc.awaitTermination()
Любой ключ, почему я получаю этот NPE? Любая помощь в том, как отладить это дальше?
этот 'val stream = TwitterUtils.createStream (ssc, None, filters)' выглядит нарушенным для меня, должен быть какой-то объект 'twitterAuth' там – avloss
Заменен« None »на этот вызов метода и его все еще сбой при той же ошибке: def creds(): Option [twitter4j.auth.Authorization] = {object auth { val config = new twitter4j.conf.ConfigurationBuilder(). setOAuthConsumerKey ("****"). setOAuthConsumerSecret ("****") .setOAuthAccessToken ("****"). setOAuthAccessTokenSecret ("****"). build} val twitter_auth = new TwitterFactory (auth.config) val a = new twitter4j.auth.OAuthAuthorization (auth.config) val atwitter: Option [twitter4j.auth.Authorization] = Некоторые (twitter_auth.getInstance (a) .getAuthorization()) at Witter} – krisgari