2016-01-27 3 views
0

У меня есть поток объектов, которые я должен обрабатывать параллельно (это долго работающая задача), однако в то же время я должен поддерживать последовательность результатов. Я пытаюсь использовать RxJava для этого и не могу найти подходящего решения.RxJava последовательный результат параллельной задачи async

Основная идея, переводится каждый объект в функцию будущего, а затем вызывается последовательно в каждом будущем. В случае завершения в будущем Exception мне нужно повторить попытку, пока он не будет успешно завершен.

public class RxTest { 

    private static CompletableFuture<String> futureFunction(int i) { 
     return CompletableFuture.supplyAsync(() -> { 
      try { 
       Thread.sleep(300); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 

      if (ThreadLocalRandom.current().nextInt(1, 100) < 10) { 
       throw new RuntimeException("failed.. sorry"); 
      } 

      return Thread.currentThread().getName() + " " + String.valueOf(i); 
     }); 
    } 

    public static void main(String[] args) throws InterruptedException { 

     Observable.range(1, 100) 
       .flatMap(i -> 
         Observable.just(i).map(RxTest::futureFunction).flatMap(Observable::from).retry() 
       ) 

       .observeOn(Schedulers.from(Executors.newSingleThreadExecutor())) 

       .subscribe(Subscribers.create((x) -> { 
        System.out.println(Thread.currentThread().getName() + " " + x); 
       })); 

    } 
} 

В этом примере я получить Последовательная OUTPUT

pool-1-thread-1 ForkJoinPool.commonPool-worker-9 1 
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 2 
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 3 
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 4 
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 5 
pool-1-thread-1 ForkJoinPool.commonPool-worker-9 6 

Но каждое будущее создается и вызывается только после того, как предыдущий завершенное, не параллельно, как я ожидал.

Я играл с observeOn() и subscribeOn(), ничего не работает.

ответ

1

Вы можете использовать concatMapEager или concatEager, которые будут работать на источники, но сохраняет порядок элемента при испускании:

Observable.range(1, 100) 
.concatMapEager(v -> 
    Observable.fromCallable(() -> { ... }) 
    .subscribeOn(Schedulers.computation()) 
    .retry() 
) 
.subscribe(...) 
+0

Работает как шарм :) Спасибо! –

+0

@akarnokd, насколько экспериментальный API является 'concatMapEager'? –

+0

Я не знаю о каком-либо плане основной реструктуризации API для ветки 1.x в течение года. – akarnokd

Смежные вопросы