2015-01-03 7 views
2

Я новичок в реактивном программировании с использованием rxjava, и, пройдя более простые примеры, я сейчас пытаюсь понять, как работать с непрерывными потоками. Проблема с приведенным ниже примером заключается в том, что программа не заканчивается после того, как я взял 3 элемента. Мое предположение заключается в том, что мне почему-то нужно отказаться от подписки на мои наблюдаемые, но я не совсем понимаю, как закончить цикл while и заставить программу выйти.rxjava и завершающие потоки

Я столкнулся с следующим сообщением RxJava -- Terminating Infinite Streams, но я до сих пор не могу понять, что мне не хватает.

class MyTwitterDataProvider { 
/* 
This example is written in Groovy 

Instance variables and constructor omitted 
*/ 

public Observable<String> getTweets() { 
    BufferedReader reader = new BufferedReader(new InputStreamReader(getTwitterStream())) 

    Observable.create({ observer -> 
     executor.execute(new Runnable() { 
      def void run() { 
       String newLine 
       while ((newLine = reader.readLine()) != null) { 
        System.out.println("printing tweet: $newLine") 
        observer.onNext(newLine) 
       } 

       observer.onCompleted() 
      } 
     }) 
    }) 
} 

def InputStream getTwitterStream() { 
// code omitted 
} 

public static void main (String [] args) { 
    MyTwitterDataProvider provider = new MyTwitterDataProvider() 
    Observable<String> myTweetsObservable = provider.getTweets().take(3) 

    Subscription myTweetSubscription = myTweetsObservable.subscribe({tweet-> println("client prints: $tweet")}) 
    // myTweetSubscription.unsubscribe() 
} 
} 

ответ

2

Вы должны добавить проверку в цикле, чтобы увидеть, если наблюдатель еще подписался:

  while ((newLine = reader.readLine()) != null && !observer.isUnsubsribed()) { 
       System.out.println("printing tweet: $newLine") 
       observer.onNext(newLine) 
      } 
Смежные вопросы