2016-01-10 7 views
10

Я новичок в Rx, и мне трудно найти документацию по составлению обещаний, чтобы данные с первого обещания были переданы во второй и т. Д. Вот три очень простых обещания, вычисления по данным не важны, просто что-то нужно сделать с использованием данных предыдущего обещания.RxJS Promise Composition (прохождение данных)

const p1 =() => Promise.resolve(1); 
const p2 = x => { const val = x + 1; return Promise.resolve(val); }; 
const p3 = x => { 
     const isEven = x => x % 2 === 0; 
     return Promise.resolve(isEven(x)); 
}; 

Традиционный способ достижения композиции я говорю:

pl().then(p2).then(p3).then(console.log); 

Моей любимой реализации является Ramda в composeP и pipeP:

R.pipeP(p1, p2, p3, console.log)() 

Похоже Rx может быть в состоянии достаточно легко справиться с такой ситуацией. Тем не менее, ближе всего я нашел до сих пор от RxJS до ASync (библиотека) сравнение здесь https://github.com/Reactive-Extensions/RxJS/blob/master/doc/mapping/async/comparing.md:

var Rx = require('rx'), 
    fs = require('fs'), 
    path = require('path'); 
var file = path.join(__dirname, 'file.txt'), 
    dest = path.join(__dirname, 'file1.txt'), 
    exists = Rx.Observable.fromCallback(fs.exists), 
    rename = Rx.Observable.fromNodeCallback(fs.rename), 
    stat = Rx.Observable.fromNodeCallback(fs.stat); 
exists(file) 
    .concatMap(function (flag) { 
    return flag ? 
     rename(file, dest) : 
     Rx.Observable.throw(new Error('File does not exist.')); 
    }) 
    .concatMap(function() { 
     return stat(dest); 
    }) 
    .forEach(
     function (fsStat) { 
      console.log(JSON.stringify(fsStat)); 
     }, 
     function (err) { 
      console.log(err); 
     } 
    ); 

concatMap кажется перспективным, но приведенный выше код выглядит ужасающе. У меня также были проблемы с моим примером, потому что Rx.Observable.fromPromise (p1) не будет работать, поскольку он ожидает само обещание, а не функцию, а Rx.Observable.defer (p1), похоже, не передает такие параметры, как пример.

Спасибо!

Подобный вопрос, но без передачи данных: Chaining promises with RxJS

+0

Ваши обещания должны быть завернуты в функцию? – user3743222

+0

Только в том случае, если вы определили обещание, встроенное вне цепочки Promise, или наблюдаемое с чем-то вроде const p1 = new Promise ((разрешить, отклонить) => {}), оно сразу начнет оценивать и не сможет получать данные из ранее выполненное обещание. Или я ошибаюсь в отношении непосредственной оценки? –

ответ

14

Я не читал все это, но если вы хотите, чтобы достичь такой же, как pl().then(p2).then(p3).then(console.log); с p быть функция, возвращающая обещания, вы могли бы сделать что-то подобное (например, here)

Rx.Observable.fromPromise(p1()) 
      .flatMap(function(p1_result){return p2(p1_result);}) 
      .flatMap(function(p2_result){return p3(p2_result);}) 

Или более симметричным:

var chainedPromises$ = 
    Rx.Observable.just() 
      .flatMap(p1) 
      .flatMap(p2) 
      .flatMap(p3); 

Теперь, если вы хотите, чтобы выполнить обратный вызов последовательно обернутый через fromCallback или fromNodeCallback, вы могли бы сделать что-то вроде:

function rename (flag){ 
    return flag 
      ? rename(file,dest).flatMap(return Rx.Observable.just(dest)) 
      : Rx.Observable.throw(new Error('File does not exist.')); 
} 

Rx.Observable.just(file) 
      .flatMap(exists) 
      .flatMap(rename) 
      .flatMap(stat) 

Последний код не тестировался, так что держите меня в курсе, если это работает. Последний комментарий, это должно сработать, если в каждой точке вы получите только одно значение (например, обещание). Если у вас будет несколько файлов, а не один, с flatMap, вы можете получить вопросы о заказе (если дело касается вас), так что в этом случае вы можете использовать concatMap в качестве замены.

+0

Я несколько надеялся на чуть более высокую абстракцию, которая была бы чем-то вроде flatMapAll (p1, p2, p3). Особенно полезно, если последовательность обещаний генерируется посредством карты, например. const ps = map ((x) => expectedFsReadFileCurriedSoThatItDoesSomethingWithPreviousFileData (x), ['1.txt', '2.txt', '3.txt']); . Rx.Observable.just() flatMapAll (... пс); (только псевдокод). Но это определенно управляемое решение, и, вероятно, есть способ сделать это с отображением overPromise или что-то в этом роде. благодаря! –

+0

также не тестировал второй образец кода, но первые работают как шарм –

+0

, вы можете делать «flatMapAll» самостоятельно. 'flatMapAll :: Rx.Observable -> [a -> a] -> Rx.Observable'. 'flatMapAll = (source, fn_array) -> fn_array.reduce ((acc, fn) -> acc.flatMap (fn), source)'. В js, 'Rx.Observable.prototype.flatMapAll = function (fn_array) {source = this; return ...} ' – user3743222