2014-02-19 3 views
23

Я последовательность RxJS потребляется обычным способом ...Дождитесь асинхронной операция в onNext из RxJS наблюдаемой

Однако в наблюдаемом обработчике «onNext», некоторые из операций будут завершены синхронно, но другие требуют асинхронных обратных вызовов, которые необходимо ожидать до обработки следующего элемента во входной последовательности.

... немного путать как сделать это. Есть идеи? благодаря!

someObservable.subscribe(
    function onNext(item) 
    { 
     if (item == 'do-something-async-and-wait-for-completion') 
     { 
      setTimeout(
       function() 
       { 
        console.log('okay, we can continue'); 
       } 
       , 5000 
      ); 
     } 
     else 
     { 
      // do something synchronously and keep on going immediately 
      console.log('ready to go!!!'); 
     } 
    }, 
    function onError(error) 
    { 
     console.log('error'); 
    }, 
    function onComplete() 
    { 
     console.log('complete'); 
    } 
); 

ответ

22

Каждая операция, которую вы хотите выполнить, может быть смоделирована как наблюдаемая. Даже синхронную операцию можно моделировать таким образом. Затем вы можете использовать map, чтобы преобразовать последовательность в последовательность последовательностей, затем используйте concatAll, чтобы сгладить последовательность.

someObservable 
    .map(function (item) { 
     if (item === "do-something-async") { 
      // create an Observable that will do the async action when it is subscribed 
      // return Rx.Observable.timer(5000); 

      // or maybe an ajax call? Use `defer` so that the call does not 
      // start until concatAll() actually subscribes. 
      return Rx.Observable.defer(function() { return Rx.Observable.ajaxAsObservable(...); }); 
     } 
     else { 
      // do something synchronous but model it as an async operation (using Observable.return) 
      // Use defer so that the sync operation is not carried out until 
      // concatAll() reaches this item. 
      return Rx.Observable.defer(function() { 
       return Rx.Observable.return(someSyncAction(item)); 
      }); 
     } 
    }) 
    .concatAll() // consume each inner observable in sequence 
    .subscribe(function (result) { 
    }, function (error) { 
     console.log("error", error); 
    }, function() { 
     console.log("complete"); 
    }); 

Чтобы ответить на некоторые ваши комментарии ... в какой-то момент вам нужно заставить некоторые ожидания от потока функций. В большинстве языков, когда речь идет о функциях, которые возможно асинхронны, сигнатуры функций являются асинхронными, и фактический асинхронный и синхронный характер функции скрывается как деталь реализации функции. Это верно, если вы используете javaScript-обещания, Rx-наблюдаемые, C# Tasks, C++ Futures и т. Д. Функции возвращают обещание/наблюдаемое/task/future/etc, и если функция фактически синхронна, то возвращаемый объект просто уже завершено.

Сказав, что, так как это JavaScript, вы можете обманщика:

var makeObservable = function (func) { 
    return Rx.Observable.defer(function() { 
     // execute the function and then examine the returned value. 
     // if the returned value is *not* an Rx.Observable, then 
     // wrap it using Observable.return 
     var result = func(); 
     return result instanceof Rx.Observable ? result: Rx.Observable.return(result); 
    }); 
} 

someObservable 
    .map(makeObservable) 
    .concatAll() 
    .subscribe(function (result) { 
    }, function (error) { 
     console.log("error", error); 
    }, function() { 
     console.log("complete"); 
    }); 
+0

, который выглядит хорошо, но, к сожалению, мы не знаем, какие элементы собираются работать синхронно, и который асинхронно. – user3291110

+0

интуитивно, (в мире, отличном от rx), решение было бы приостановить обработку входной последовательности до тех пор, пока не произойдет событие ... , что заставляет меня думать о каком-то гибриде, использующем ваше решение плюс наблюдаемый fromEvent, тикать 'входную очередь? – user3291110

+0

Мне бы очень хотелось, чтобы операция была выполнена в onNext, чтобы как-то сказать Rx продолжить ... таким образом наши разработчики могут просто вызвать функцию, чтобы сказать ... «теперь все в порядке». Я беспокоюсь о техническом обслуживании по дороге, если кто-то добавляет асинхронный режим, в глубине реализации того, что было когда-то синхронно, как позволить им делать это с наименьшим количеством хлопот fyi ... items входящие в очередь, являются функциями(), которые нужно вызвать .. и я не знаю, какие из них синхронизированы с асинхронным. они принимают параметр контекста в качестве аргумента ... так что они могут отметить себя полностью. – user3291110

4

Прежде всего, переместить операции асинхронных из subscribe, это не для асинхронных операций.

Что вы можете использовать: mergeMap (псевдоним flatMap) или concatMap. Я упоминаю их оба, потому что concatMap на самом деле mergeMap с параметром concurrent, установленным в 1. Это полезно, так как иногда вы хотите ограничить количество одновременных запросов, но все же выполните пару одновременных.

source.concatMap(item => { 
    if (item == 'do-something-async-and-wait-for-completion') { 
    return Rx.Observable.timer(5000) 
     .mapTo(item) 
     .do(e => console.log('okay, we can continue')); 
    } else { 
     // do something synchronously and keep on going immediately 
     return Rx.Observable.of(item) 
     .do(e => console.log('ready to go!!!')); 
    } 
}).subscribe(); 

Я также покажу, как вы можете оценить лимит своих звонков. Совет: Только ограничение скорости в той точке, в которой вы действительно нуждаетесь, например, при вызове внешнего API, который разрешает только определенное количество запросов в секунду или минут. В противном случае лучше просто ограничить количество одновременных операций и позволить системе двигаться с максимальной скоростью.

Мы начнем со следующего фрагмента кода:

const concurrent; 
const delay; 
source.mergeMap(item => 
    selector(item, delay) 
, concurrent) 

Далее нам нужно выбрать значения для concurrent, delay и реализации selector. concurrent и delay тесно связаны между собой. Например, если мы хотим запускать 10 пунктов в секунду, мы можем использовать concurrent = 10 и delay = 1000 (миллисекунда), а также concurrent = 5 и delay = 500 или concurrent = 4 и delay = 400. Количество пунктов в секунду всегда будет concurrent/(delay/1000).

Теперь можно реализовать selector. У нас есть пара вариантов.Мы можем установить минимальное время выполнения для selector, мы можем добавить к нему постоянную задержку, мы можем испустить результаты, как только они будут доступны, мы можем испустить результат только после того, как прошла минимальная задержка и т. Д. Это даже возможно для добавления тайм-аута с помощью операторов timeout. Удобство.

Установить минимальное время, отправь результат рано:

function selector(item, delay) { 
    return Rx.Observable.of(item) 
    .delay(1000) // replace this with your actual call. 
    .merge(Rx.Observable.timer(delay).ignoreElements()) 
} 

Установить минимальное время, отправь результат поздно:

function selector(item, delay) { 
    return Rx.Observable.of(item) 
    .delay(1000) // replace this with your actual call. 
    .zip(Rx.Observable.timer(delay), (item, _)) 
} 

Добавить время, отправить результат рано:

function selector(item, delay) { 
    return Rx.Observable.of(item) 
    .delay(1000) // replace this with your actual call. 
    .concat(Rx.Observable.timer(delay).ignoreElements()) 
} 

Добавить время , отправьте результат поздно:

function selector(item, delay) { 
    return Rx.Observable.of(item) 
    .delay(1000) // replace this with your actual call. 
    .delay(delay) 
} 
0

Еще один простой пример выполнения ручных асинхронных операций.

Помните, что это не очень хорошая реактивная практика! Если вы хотите только подождать 1000 мс, используйте оператор Rx.Observable.timer или delay.

someObservable.flatMap(response => { 
    return Rx.Observable.create(observer => { 
    setTimeout(() => { 
     observer.next('the returned value') 
     observer.complete() 
    }, 1000) 
    }) 
}).subscribe() 

Теперь замените SetTimeout вашей функции асинхронной, как Image.onload или fileReader.onload ...

Смежные вопросы