У меня есть поток объектов, которые я должен обрабатывать параллельно (это долго работающая задача), однако в то же время я должен поддерживать последовательность результатов. Я пытаюсь использовать RxJava для этого и не могу найти подходящего решения.RxJava последовательный результат параллельной задачи async
Основная идея, переводится каждый объект в функцию будущего, а затем вызывается последовательно в каждом будущем. В случае завершения в будущем Exception мне нужно повторить попытку, пока он не будет успешно завершен.
public class RxTest {
private static CompletableFuture<String> futureFunction(int i) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (ThreadLocalRandom.current().nextInt(1, 100) < 10) {
throw new RuntimeException("failed.. sorry");
}
return Thread.currentThread().getName() + " " + String.valueOf(i);
});
}
public static void main(String[] args) throws InterruptedException {
Observable.range(1, 100)
.flatMap(i ->
Observable.just(i).map(RxTest::futureFunction).flatMap(Observable::from).retry()
)
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.subscribe(Subscribers.create((x) -> {
System.out.println(Thread.currentThread().getName() + " " + x);
}));
}
}
В этом примере я получить Последовательная OUTPUT
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 1
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 2
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 3
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 4
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 5
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 6
Но каждое будущее создается и вызывается только после того, как предыдущий завершенное, не параллельно, как я ожидал.
Я играл с observeOn()
и subscribeOn()
, ничего не работает.
Работает как шарм :) Спасибо! –
@akarnokd, насколько экспериментальный API является 'concatMapEager'? –
Я не знаю о каком-либо плане основной реструктуризации API для ветки 1.x в течение года. – akarnokd