2014-09-28 6 views
0

Я новичок в rxjava и у меня есть следующая проблема:значения питания наблюдаемого

объекты неравномерно опускают в очередь FIFO с помощью внешней системы. Мне нужно Наблюдение, которое выполняется каждую секунду, берет элемент из очереди (если есть) и передает его подписчикам.

Две проблемы:

  • Элементы очереди производятся в то время как Наблюдаемые жив, это не представляется возможным, чтобы обеспечить все детали заранее. Очередь может запускаться пустым, и в этом случае Observable должен стоять и ничего не испускать. (Было бы неплохо, если Observable начнет немедленно запускаться, когда элемент станет доступен в очереди после паузы, но тогда очередь, вероятно, также должна быть Observable, если мы не хотим опросить более часто, no идея как.)

  • Должно быть возможно, чтобы внешняя система завершила наблюдение. Я мог бы установить переменную и прочитать ее из Observable, но я хотел бы знать, есть ли более элегантный способ сделать это.

    LinkedList<Layer> queue = new LinkedList<Layer>(); // the queue 
    boolean stopObservable = false; // the variable to stop the observable 
    
    Observable.create(new Observable.OnSubscribe<Layer>() { 
    
        @Override public void call(Subscriber<? super Layer> subscriber) { 
         try { 
          if (!queue.isEmpty()) { 
           Layer layer = queue.poll(); 
           subscriber.onNext(layer); 
          } else { 
           if (stopObservable) { subscriber.onCompleted(); } 
          } 
         } catch (Exception e) { 
          subscriber.onError(e); 
         } 
        } 
    
    }).somethingThatCreatesTheInterval().subscribeOnEtc. 
    

Для интервала, я не могу использовать .Sample(), потому что он падает элементы, и это очень важно, чтобы все элементы выбрасываются.

.throttleWithTimeout() выглядит лучше, но он также, кажется, бросает предметы.

rx очень круто, но трудно попасть. Любой вход оценивается.

+0

Таким образом, вы не возражаете, если очередь входных элементов накапливается в течение долгого времени - вы просто хотите, чтобы излучать один раз в секунду (или просто пропустить " слот ", если в входной очереди нет элемента)? Моим первым инстинктом было бы взглянуть на таймер (чтобы обеспечить «импульс») и карту (которая не отображает ничего, а просто отбрасывает каждый длинный, испускаемый таймером, и вместо этого выдает следующий элемент из входной очереди - или вызывает onCompleted if для переменной stop установлено значение true). Но, возможно, есть более элегантная альтернатива ... –

+0

На самом деле, я думаю, вам может понадобиться использовать flatMap (вместо карты) на втором шаге - чтобы иметь возможность обрабатывать случай, когда входная очередь пуста. Таким образом, вы либо испускаете Observable.just(), либо Observable.empty(). –

ответ

1

Я сделал что-то подобное, когда мне нужно было опросить внешние веб-сервисы с регулярным интервалом времени.

  1. Для временного интервала Вы можете исправить ошибки, связанные с: timer; на каждом тике с зернистостью 1s наблюдаемой цепи будет опрашивать и, возможно, выбрать один слой, если этот слой является нулевым, тогда ничего не излучается

    Observable.timer(0, 1, TimeUnit.SECOND) 
        .flatMap(tick -> Observable.just(queue.poll()).filter(layer -> layer != null)) 
        .subscribe(layer -> System.out.format("The layer is : %s", layer)); 
    
  2. Теперь, если вы хотите прервать целом цепочку вы можете добавить takeUntil , Так что, когда ваша внешняя система хочет, чтобы остановить его будет представлять что-то в stopObservable, который остановит последующую подписку:

    // somewhere before 
    PublishSubject stopNotifier = PublishSubject.create(); 
    
    // somewhere process the queue 
    Observable.timer(0, 1, TimeUnit.SECOND) 
        .takeUntil(stopNotifier) 
        .flatMap(tick -> Observable.just(queue.poll())) 
        .subscribe(layer -> System.out.format("The layer is : %s", layer)); 
    
    // when not anymore interested (calling onComplete works too) 
    stopNotifier.onNext("cancel everything about the queue"); 
    

Я пишу этот ответ от таблетки, так что вы можете предположить, что я, возможно, некоторые орфографические ошибки слова или наивные ошибки программирования;)

0

Если возможно, вы должны использовать PublishSubject<Layer> вместо LinkedList<Layer>.Затем внешняя система может предоставлять новые элементы, вызывая publishSubject.onNext, а так как PublishSubject является подклассом Observable, ваша система может рассматривать его как наблюдаемую, и, в зависимости от того, какую семантику по времени вы хотите, примените один из этих операторов к это:

  • sample
  • debounce
  • throttleFirst/throttleLast/throttleWithTimeout
  • .zipWith(Observable.timer(1, TimeUnit.SECONDS), (value, tick) -> value) (! может сделать много буферизации)
  • никаких изменений сроков на всех (считают, что это хорошо)
Смежные вопросы