Я новичок в rxjava и у меня есть следующая проблема:значения питания наблюдаемого
объекты неравномерно опускают в очередь FIFO с помощью внешней системы. Мне нужно Наблюдение, которое выполняется каждую секунду, берет элемент из очереди (если есть) и передает его подписчикам.
Две проблемы:
Элементы очереди производятся в то время как Наблюдаемые жив, это не представляется возможным, чтобы обеспечить все детали заранее. Очередь может запускаться пустым, и в этом случае Observable должен стоять и ничего не испускать. (Было бы неплохо, если Observable начнет немедленно запускаться, когда элемент станет доступен в очереди после паузы, но тогда очередь, вероятно, также должна быть Observable, если мы не хотим опросить более часто, no идея как.)
Должно быть возможно, чтобы внешняя система завершила наблюдение. Я мог бы установить переменную и прочитать ее из Observable, но я хотел бы знать, есть ли более элегантный способ сделать это.
LinkedList<Layer> queue = new LinkedList<Layer>(); // the queue boolean stopObservable = false; // the variable to stop the observable Observable.create(new Observable.OnSubscribe<Layer>() { @Override public void call(Subscriber<? super Layer> subscriber) { try { if (!queue.isEmpty()) { Layer layer = queue.poll(); subscriber.onNext(layer); } else { if (stopObservable) { subscriber.onCompleted(); } } } catch (Exception e) { subscriber.onError(e); } } }).somethingThatCreatesTheInterval().subscribeOnEtc.
Для интервала, я не могу использовать .Sample(), потому что он падает элементы, и это очень важно, чтобы все элементы выбрасываются.
.throttleWithTimeout() выглядит лучше, но он также, кажется, бросает предметы.
rx очень круто, но трудно попасть. Любой вход оценивается.
Таким образом, вы не возражаете, если очередь входных элементов накапливается в течение долгого времени - вы просто хотите, чтобы излучать один раз в секунду (или просто пропустить " слот ", если в входной очереди нет элемента)? Моим первым инстинктом было бы взглянуть на таймер (чтобы обеспечить «импульс») и карту (которая не отображает ничего, а просто отбрасывает каждый длинный, испускаемый таймером, и вместо этого выдает следующий элемент из входной очереди - или вызывает onCompleted if для переменной stop установлено значение true). Но, возможно, есть более элегантная альтернатива ... –
На самом деле, я думаю, вам может понадобиться использовать flatMap (вместо карты) на втором шаге - чтобы иметь возможность обрабатывать случай, когда входная очередь пуста. Таким образом, вы либо испускаете Observable.just(), либо Observable.empty(). –