Я новичок в реактивном программировании с использованием rxjava, и, пройдя более простые примеры, я сейчас пытаюсь понять, как работать с непрерывными потоками. Проблема с приведенным ниже примером заключается в том, что программа не заканчивается после того, как я взял 3 элемента. Мое предположение заключается в том, что мне почему-то нужно отказаться от подписки на мои наблюдаемые, но я не совсем понимаю, как закончить цикл while и заставить программу выйти.rxjava и завершающие потоки
Я столкнулся с следующим сообщением RxJava -- Terminating Infinite Streams, но я до сих пор не могу понять, что мне не хватает.
class MyTwitterDataProvider {
/*
This example is written in Groovy
Instance variables and constructor omitted
*/
public Observable<String> getTweets() {
BufferedReader reader = new BufferedReader(new InputStreamReader(getTwitterStream()))
Observable.create({ observer ->
executor.execute(new Runnable() {
def void run() {
String newLine
while ((newLine = reader.readLine()) != null) {
System.out.println("printing tweet: $newLine")
observer.onNext(newLine)
}
observer.onCompleted()
}
})
})
}
def InputStream getTwitterStream() {
// code omitted
}
public static void main (String [] args) {
MyTwitterDataProvider provider = new MyTwitterDataProvider()
Observable<String> myTweetsObservable = provider.getTweets().take(3)
Subscription myTweetSubscription = myTweetsObservable.subscribe({tweet-> println("client prints: $tweet")})
// myTweetSubscription.unsubscribe()
}
}