6

У меня проблема с особыми потребностями производителя в RxJS: производитель медленно производит элементы. Потребитель запрашивает элементы и часто должен ждать производителя. Это может быть достигнуто путем смены производителя и потока запросов:RxJS: Продюсер-потребитель с прерыванием

var produce = getProduceStream(); 
var request = getRequestStream(); 

var consume = Rx.Observable.zipArray(produce, request).pluck(0); 

Иногда запрос прерывается. Производимый элемент должен потребляться только после того, как не несостоявшегося запроса:

produce: -------------p1-------------------------p2---------> 
request: --r1--------------r2---------------r3--------------> 
abort: ------a(r1)------------------a(?)------------------> 
consume: ------------------c(p1, r2)-------------c(p2, r3)--> 

Первый запрос r1 будет потреблять первый произведенный элемент p1, но r1 получает прерван a(r1) прежде чем он может потреблять p1. p1 производится и потребляется c(p1, r2) по второму запросу r2. Второе прерывание a(?) игнорируется, поскольку ранее не было запрошено неотвеченный запрос. Третий запрос r3 должен подождать следующего изготовленного элемента p2 и не прерывается до p2. Таким образом, p2 потребляется c(p2, r3) сразу же после его изготовления.

Как добиться этого в RxJS?

Edit: Я создал example с тестом QUnit на jsbin. Вы можете отредактировать функцию createConsume(produce, request, abort), чтобы попробовать/протестировать ваше решение.

В примере содержится определение функции previously accepted answer.

+0

Хорошая проблема. [Здесь] (http://jsbin.com/havoroniqi/3/edit) - моя попытка, но @ Брэндон лучше. –

+0

КПП. Я заметил некоторые небольшие проблемы с вашими тестами: 1. вы создаете '1',' 2', но ждите: 'p1',' p2'; 2. Вы когда-то меняли «ожидаемые» и «фактические». –

+0

Прошу прощения. Я строю этот тест в спешке и поделился примером jsbin неправильным способом. Таким образом, я случайно изменил его позже во время моих собственных подходов. Я изменил элементы на их исходные значения: произведите: 'p1' и' p2'; request: 'r1',' r2' и 'r3'; abort: 'a1'. – maiermic

ответ

0

Это решение игнорирует абортов, которые не следуют без ответа запрос:

const {merge} = Rx.Observable; 

Rx.Observable.prototype.wrapValue = function(wrapper) { 
    wrapper = (wrapper || {}); 
    return this.map(function (value) { 
     wrapper.value = value; 
     return wrapper; 
    }); 
}; 

function createConsume(produce, request, abort) { 
    return merge(
      produce.wrapValue({type: 'produce'}), 
      request.wrapValue({type: 'request'}), 
      abort.wrapValue({type: 'abort'}) 
     ) 
     .scan(
      [false, []], 
      ([isRequest, products], e) => { 
       // if last time the request was answered 
       if (isRequest && products.length) { 
        // remove consumed product 
        products.shift(); 
        // mark request as answered 
        isRequest = false; 
       } 
       if (e.type === 'produce') { 
        // save product to consume later 
        products.push(e.value); 
       } else { 
        // if evaluated to false, e.type === 'abort' 
        isRequest = (e.type === 'request'); 
       } 
       return [isRequest, products]; 
      } 
     ) 
     .filter(([isRequest, products]) => (isRequest && products.length)) 
     .map(([isRequest, products]) => products[0]); // consume 
} 

Code в новейшем тесте на JSBin.

3

это (основные детали идея минус) проходит тест JSBin:

var consume = request 
    .zip(abort.merge(produce), (r,x) => [r,x]) 
    .filter(([r,x]) => isNotAbort(x)) 
    .map(([r,p]) => p); 

И в JSBin code.

+0

Благодарим вас за решение, хотя оно не ведет себя так, как хотелось бы. Я отредактировал свой вопрос, чтобы уточнить, какое поведение я бы хотел иметь. Я отредактировал диаграмму, добавил дескриптор и ссылку [http://jsbin.com/jotanilosi/2/edit?js,output] в тестовый пример/среду jsbin. В этом примере я использовал ваше решение. Он терпит неудачу, потому что он пропускает первый созданный элемент в пользу второго элемента, который потребляется дважды. – maiermic

+0

Обновлен мой ответ. Решает ли это все ваши дела[email protected] –

+0

Умный. Я знал, что должен быть способ сделать это с существующими операторами. – Brandon

2

Я не могу полностью обернуть мозг вокруг того, как это сделать с существующими операторами. Вот как сделать это с Observable.create():

return Rx.Observable.create(function (observer) { 
    var rsub = new Rx.SingleAssignmentDisposable(); 
    var asub = new Rx.SingleAssignmentDisposable(); 
    var psub = new Rx.SingleAssignmentDisposable(); 
    var sub = new Rx.CompositeDisposable(rsub, asub, psub); 
    var rq = []; 
    var pq = []; 
    var completeCount = 0; 
    var complete = function() { 
    if (++completeCount === 2) { 
     observer.onCompleted(); 
    } 
    }; 
    var consume = function() { 
    if (pq.length && rq.length) { 
     var p = pq.shift(); 
     var r = rq.shift(); 
     observer.onNext('p' + p); 
    } 
    }; 

    rsub.setDisposable(request.subscribe(
    function (r) { 
     rq.push(r); 
     consume(); 
    }, 
    function (e) { observer.onError(e); }, 
    complete)); 

    asub.setDisposable(abort.subscribe(
    function (a) { 
     rq.shift(); 
    }, 
    function (e) { observer.onError(e); } 
)); 

    psub.setDisposable(produce.subscribe(
    function (p) { 
     pq.push(p); 
     consume(); 
    }, 
    function (e) { observer.onError(e); }, 
    complete)); 


    return sub; 
}); 

http://jsbin.com/zurepesijo/1/

+0

Красивый подход. Просто хотел бы подтвердить: вызывает ли 'sub.dispose()' 'dispose все' Rx.SingleAssignmentDisposable '? –

+0

Да CompositeDisposable позволяет группировать несколько расходных материалов в один одноразовый. – Brandon

Смежные вопросы