2017-01-25 2 views
1

Я хотел бы реализовать цикл async while в Java, используя RxJava.RxJava async while loop

Более конкретно, вот мой не Асинхр код Java:

for (String dataCenter : dataCenters) { 
     final Set<Server> serversInDataCenter = getServersInDataCenterSync(dataCenter); 
     if (!CollectionUtils.isEmpty(serversInDataCenter)) { 
      final Server available = findOneWithSlots(serversInDataCenter); 
      if (available != null) { 
       return available; 
      } 
     } 
     // if no available server found for current dataCenter, try next 
    } 
    return null; 

Что выше код делает то, что находит доступный сервер из центра обработки данных.

С 90% случаев в первом датацентре будет установлен сервер, который будет отмечен, я не хочу заранее загружать все серверы для всех центров обработки данных.

Теперь, представьте, что метод Set<Server> getServersInDataCenterSync(String dataCenter) изменен на асинхронный, и вместо этого требуется обратный вызов: void getServersInDataCenter(String dataCenter, AsyncResultHandler<Set<Server>> handler). Это также делает другую вещь

ответ

1
Observable.fromIterable(dataCenters) // emits data center names 
    .flatMap(name -> getServersInDataCenter(name), // returns Observable<Server> 
     maxConcurrency) // see note below 
    .filter(Server::hasSlotsAvailable) // pass through only available ones 
    .take(1) // take first one and unsubscribe 

В моем примере getServersInDataCenter() возвращает Observable<Server>. Для того, чтобы преобразовать метод обратного вызова стиля в наблюдаемый поток вы можете использовать что-то вроде следующий:

Observable<Server> getServersInDataCenter(String name) { 
    return Observable.create(emitter -> 
     getServersInDataCenterAsync(name, event -> { 
      if (event.isError()) 
       emitter.onError(event.getError()); 
      else { 
       emitter.onNext(event.getResultSet()); // emit Set<Server> 
       emitter.onComplete(); 
      } 
     }) 
     .flatMapIterable(set -> set); // flatten Set into individual items 
} 

С maxConcurrency параметра вы можете ограничить количество одновременных запросов в асинхронном режиме. Если вы не хотите делать второй запрос, пока не проверите все серверы из первого центра данных, установите его в 1. Если вы хотите ускорить поиск доступного сервера, если его осталось немного, увеличьте его. Вы также можете использовать параметр delayErrors (см. Документы here).