2015-07-29 3 views
1

Я пишу твиттер-коннектор, используя искровой поток.
я столкнулся follwing исключениеОшибка в twitter streaming

ОШИБКА ReceiverTracker: дерегистрировано приемник для потока 0: Перезапуск приемник с задержкой 2000 мс: Ошибка при запуске Twitter поток - java.lang.NullPointerException в org.apache.spark.streaming .twitter.TwitterReceiver.onStart (TwitterInputDStream.scala: 89) at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver (ReceiverSupervisor.scala: 121) at org.apache.spark.streaming.receiver.ReceiverSupervisor $$ anonfun $ restartReceiver $ 1.apply $ mcV $ sp (ReceiverSupervisor.scala: 159) at org.apache.spark.streaming.receiver.Receive rSupervisor $$ anonfun $ restartReceiver $ 1.apply (ReceiverSupervisor.scala: 152) at org.apache.spark.streaming.receiver.ReceiverSupervisor $$ anonfun $ restartReceiver $ 1.apply (ReceiverSupervisor.scala: 152) at scala.concurrent. impl.Future $ PromiseCompletingRunnable.liftedTree1 $ 1 (Future.scala: 24) at scala.concurrent.impl.Future $ PromiseCompletingRunnable.run (Future.scala: 24) at scala.concurrent.impl.ExecutionContextImpl $$ anon $ 3.exec (ExecutionContextImpl.scala: 107) на scala.concurrent.forkjoin.ForkJoinTask.doExec (ForkJoinTask.java:260) в scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask (ForkJoinPool.java:1339) в scala.concurrent .forkjoin.ForkJoinPool.runWorker (ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread .run (ForkJoinWorkerThread.java:107)

Ниже приведен фрагмент кода.

val config = new twitter4j.conf.ConfigurationBuilder() 
    .setOAuthConsumerKey("*********************") 
       .setOAuthConsumerSecret("**********************************************") 
    .setOAuthAccessToken("****************************************************") 
    .setOAuthAccessTokenSecret("**********************************************************") 
    .build 

val twitter_auth = new TwitterFactory(config) 
val a = new twitter4j.auth.OAuthAuthorization(config) 
val atwitter : Option[twitter4j.auth.Authorization] = Some(twitter_auth.getInstance(a).getAuthorization()) 

val sparkConf = new SparkConf().setAppName("TwitterPopularTags").setMaster("local[*]") 
val ssc = new StreamingContext(sparkConf, Seconds(2)) 
// ssc.checkpoint("D:/test") 
val stream = TwitterUtils.createStream(ssc, atwitter, null, StorageLevel.MEMORY_AND_DISK_2) 

val hashTags = stream.map(status => status.getUser().getName()) 
hashTags.foreachRDD(rdd => { 
    rdd.foreach(println) 
}) 

ssc.start() 
ssc.awaitTermination() 

Может ли кто-нибудь помочь мне решить эту проблему?
Спасибо :)

ответ

1

Идя к линии, где выбрасывается исключение, мы можем видеть:

if (filters.size > 0) {

Для этой линии кидать NPE, фильтры должен быть нулевым, что это именно то, что происходит на инстанциации TwitterStream:

вал поток = TwitterUtils.createStream (ГКС, atwitter, нуль, StorageLevel.MEMORY_AND_DISK_2)

Будучи filter последовательно, инициализировать его с Seq() вместо null.

+0

спасибо. он решил проблему :) – Sadaf

+0

@Sadaf рассмотреть возможность принятия ответа. – maasg