2017-01-13 2 views
2

Предполагая, что у меня есть hot, наблюдаемый за списками некоторых предметов;RxJava. Преобразование горячего наблюдаемого списка в поток отдельных элементов, обработка его и преобразование в список назад

Observable<List<Item>> observable = ...; 

мне нужно преобразовать его в единый поток предметов и выполнять некоторые операции по каждому пункту, как фильтрация, после того, что я должен преобразовать его обратно в список и процесс его в onNext методы абонента:

observable.flatMap(Observable::from) 
    .filter(Item::isFiltered) 
    .toList() 
    .subscribe(this::onNext, this::onError) 

public void onNext(List<Item> items) {...} 

С первого взгляда это выглядит нормально, но это не так, потому что наш наблюдаемый горячий, поэтому toList() никогда не будет выполнен (потому что он ждет наблюдаемого по завершении источника) и целые потоки.

Как решить эту проблему? Также обратите внимание, что рядом с filter может быть любое количество дополнительных операций над отдельным элементом.

ответ

3

Вы можете сделать все ваши операции на отдельных элементах и ​​окончательного toList оператора на Наблюдаемом вы создаете в flatMap .. Таким образом, вы получите OnComplete вызова и toList будет собирать и преобразовывать элементы.

observable.flatMap(list -> { 
    return Observable.from(list) 
         .filter(Item::isFiltered) 
         .toList() 
    }) 
    .subscribe(this::onNext, this::onError) 
Смежные вопросы