Предполагая, что у меня есть hot, наблюдаемый за списками некоторых предметов;RxJava. Преобразование горячего наблюдаемого списка в поток отдельных элементов, обработка его и преобразование в список назад
Observable<List<Item>> observable = ...;
мне нужно преобразовать его в единый поток предметов и выполнять некоторые операции по каждому пункту, как фильтрация, после того, что я должен преобразовать его обратно в список и процесс его в onNext
методы абонента:
observable.flatMap(Observable::from)
.filter(Item::isFiltered)
.toList()
.subscribe(this::onNext, this::onError)
public void onNext(List<Item> items) {...}
С первого взгляда это выглядит нормально, но это не так, потому что наш наблюдаемый горячий, поэтому toList() никогда не будет выполнен (потому что он ждет наблюдаемого по завершении источника) и целые потоки.
Как решить эту проблему? Также обратите внимание, что рядом с filter
может быть любое количество дополнительных операций над отдельным элементом.