2016-11-16 4 views
2

Короткого рассказа: У меня есть ситуации, когда у меня есть 2 Наблюдаемых которые имеют единственную цель:RxJava объединение наблюдаемого без повторения исполнения ордеров

  • они получают некоторые данные
  • они возвращают измененные данные
  • бросаться если данные не могут быть обработаны

Каждый из них отвечает за обработку различных типов данных. Кроме того, я хочу что-то сделать, когда обе данные были обработаны.

Моя текущая лучшая реализация выглядит следующим образом, это мои Наблюдаемые:

Single<BlueData> blueObservable = Single.create(singleSubscriber -> { 
     if (BlueDataProcessor.isDataValid(myBlueData)) { 
      singleSubscriber.onSuccess(BlueDataProcessor.process(myBlueData)); 
     } 
     else { 
      singleSubscriber.onError(new BlueDataIsInvalidThrow()); 
     } 
    }); 

    Single<RedData> redObservable = Single.create(singleSubscriber -> { 
     if (RedDataProcessor.isDataValid(myRedData)) { 
      singleSubscriber.onSuccess(RedDataProcessor.process(myRedData)); 
     } 
     else { 
      singleSubscriber.onError(new RedDataIsInvalidThrowable()); 
     } 
    }); 

    Single<PurpleData> composedSingle = Single.zip(blueObservable, redObservable, 
      (blueData, redData) -> PurpleGenerator.combine(blueData, redData)); 

У меня также есть следующие подписки:

blueObservable.subscribe(
      result -> { 
       saveBlueProcessStats(result); 
      }, 
      throwable -> { 
       logError(throwable); 
      }); 

    redObservable.subscribe(
      result -> { 
       saveRedProcessStats(result); 
      }, 
      throwable -> { 
       logError(throwable); 
      }); 


    composedSingle.subscribe(
      combinedResult -> { 
       savePurpleProcessStats(combinedResult) 
      }, 
      throwable -> { 
       logError(throwable); 
      }); 

МОЯ ПРОБЛЕМА: Синий & красный данные обрабатываются в два раза , потому что обе подписки снова запускаются с подпиской на объединенный наблюдаемый, созданный с помощью Observable.zip().

Как это сделать, если вы не выполняете обе операции дважды?

ответ

2

Это невозможно с Single в 1.x, потому что нет понятия ConnectableSingle и, следовательно, Single.publish. Вы можете достичь эффекта с помощью 2.x и библиотеки RxJava2Extensions:

SingleSubject<RedType> red = SingleSubject.create(); 
SingleSubject<BlueType> blue = SingleSubject.create(); 

// subscribe interested parties 
red.subscribe(...); 
blue.subscribe(...); 

Single.zip(red, blue, (r, b) -> ...).subscribe(...); 

// connect() 
blueObservable.subscribe(blue); 
redObservable.subscribe(red); 
+0

Я реализовать это решение, как только я могу обновить этот проект rxjava2. Однако я достаточно доволен этим решением, чтобы отметить его как правильный ответ. Благодаря! – dbar

Смежные вопросы