Прежде всего, переместить операции асинхронных из subscribe
, это не для асинхронных операций.
Что вы можете использовать: mergeMap
(псевдоним flatMap
) или concatMap
. Я упоминаю их оба, потому что concatMap
на самом деле mergeMap
с параметром concurrent
, установленным в 1. Это полезно, так как иногда вы хотите ограничить количество одновременных запросов, но все же выполните пару одновременных.
source.concatMap(item => {
if (item == 'do-something-async-and-wait-for-completion') {
return Rx.Observable.timer(5000)
.mapTo(item)
.do(e => console.log('okay, we can continue'));
} else {
// do something synchronously and keep on going immediately
return Rx.Observable.of(item)
.do(e => console.log('ready to go!!!'));
}
}).subscribe();
Я также покажу, как вы можете оценить лимит своих звонков. Совет: Только ограничение скорости в той точке, в которой вы действительно нуждаетесь, например, при вызове внешнего API, который разрешает только определенное количество запросов в секунду или минут. В противном случае лучше просто ограничить количество одновременных операций и позволить системе двигаться с максимальной скоростью.
Мы начнем со следующего фрагмента кода:
const concurrent;
const delay;
source.mergeMap(item =>
selector(item, delay)
, concurrent)
Далее нам нужно выбрать значения для concurrent
, delay
и реализации selector
. concurrent
и delay
тесно связаны между собой. Например, если мы хотим запускать 10 пунктов в секунду, мы можем использовать concurrent = 10
и delay = 1000
(миллисекунда), а также concurrent = 5
и delay = 500
или concurrent = 4
и delay = 400
. Количество пунктов в секунду всегда будет concurrent/(delay/1000)
.
Теперь можно реализовать selector
. У нас есть пара вариантов.Мы можем установить минимальное время выполнения для selector
, мы можем добавить к нему постоянную задержку, мы можем испустить результаты, как только они будут доступны, мы можем испустить результат только после того, как прошла минимальная задержка и т. Д. Это даже возможно для добавления тайм-аута с помощью операторов timeout
. Удобство.
Установить минимальное время, отправь результат рано:
function selector(item, delay) {
return Rx.Observable.of(item)
.delay(1000) // replace this with your actual call.
.merge(Rx.Observable.timer(delay).ignoreElements())
}
Установить минимальное время, отправь результат поздно:
function selector(item, delay) {
return Rx.Observable.of(item)
.delay(1000) // replace this with your actual call.
.zip(Rx.Observable.timer(delay), (item, _))
}
Добавить время, отправить результат рано:
function selector(item, delay) {
return Rx.Observable.of(item)
.delay(1000) // replace this with your actual call.
.concat(Rx.Observable.timer(delay).ignoreElements())
}
Добавить время , отправьте результат поздно:
function selector(item, delay) {
return Rx.Observable.of(item)
.delay(1000) // replace this with your actual call.
.delay(delay)
}
, который выглядит хорошо, но, к сожалению, мы не знаем, какие элементы собираются работать синхронно, и который асинхронно. – user3291110
интуитивно, (в мире, отличном от rx), решение было бы приостановить обработку входной последовательности до тех пор, пока не произойдет событие ... , что заставляет меня думать о каком-то гибриде, использующем ваше решение плюс наблюдаемый fromEvent, тикать 'входную очередь? – user3291110
Мне бы очень хотелось, чтобы операция была выполнена в onNext, чтобы как-то сказать Rx продолжить ... таким образом наши разработчики могут просто вызвать функцию, чтобы сказать ... «теперь все в порядке». Я беспокоюсь о техническом обслуживании по дороге, если кто-то добавляет асинхронный режим, в глубине реализации того, что было когда-то синхронно, как позволить им делать это с наименьшим количеством хлопот fyi ... items входящие в очередь, являются функциями(), которые нужно вызвать .. и я не знаю, какие из них синхронизированы с асинхронным. они принимают параметр контекста в качестве аргумента ... так что они могут отметить себя полностью. – user3291110