2015-08-26 2 views
3

RxJS flatMapLatest сглаживает последнюю (только одну) вложенную Observable. У меня есть прецедент, где я не хочу flatMap (который выравнивает все вложенные Observables из прошлого), и я не хочу flatMapWithConcurrency (потому что он поддерживает старые Observables, а не последние Observables), поэтому я хочу, чтобы flatMapLatestTwo или в некоторой версии flatMapLatest, где вы можете указать максимальное количество одновременных вложенных наблюдаемых, например flatMapLatest(2, selectorFn).Как реализовать RxJS flatMapLatestTwo

Вот мой желаемый результат (_X относится к вложенным наблюдаемых X и eX ссылается на X-й onNext события):

_0e0 
_0e1 
_1e0 
_0e2 
_1e1 
    _2e0 
_1e2 
    _2e1 
    _3e0 
    _2e2 
    _3e1 
    _4e0 
    _3e2 
    _4e1 
    _3e3 
    _4e2 
    _4e3 

Это то, что производит flatMapLatest:

_0e0 
_0e1 
_1e0 
_1e1 
    _2e0 
    _2e1 
    _3e0 
    _3e1 
    _4e0 
    _4e1 
    _4e2 
    _4e3 

Я d предпочитает решение, которое использует существующие операторы вместо реализации этого низкого уровня.

+0

Я попытался 'map' наблюдаемым (который дает мне наблюдаемую-из-наблюдаемых характеристик), то bufferWithCount (2,1), а затем flatMapLatest на том, но я получил некоторые странные повторения и упущения. –

ответ

2

Это выглядит наивным. Я ищу способы улучшить, но здесь это:

Rx.Observable.prototype.flatMapLatestN = function (count, transform) { 

    let queue = []; 

    return this.flatMap(x => { 
    return Rx.Observable.create(observer => { 

     let disposable; 

     if (queue.length < count) { 
     disposable = transform(x).subscribe(observer); 
     queue.push(observer); 
     } 
     else { 
     let earliestObserver = queue[0]; 
     if (earliestObserver) { 
      earliestObserver.onCompleted(); 
     } 

     disposable = transform(x).subscribe(observer); 
     queue.push(observer); 
     } 

     return() => { 
     disposable.dispose(); 
     let i = queue.indexOf(observer); 
     queue.splice(i, 1); 
     }; 
    }); 
    }); 
}; 

Чтобы проверить:

function space(n) { 
    return Array(n+1).join(' '); 
} 

Rx.Observable 
    .interval(1000) 
    .take(6) 
    .flatMapLatestN(2, (x) => { 
    return Rx.Observable 
     .interval(300) 
     .take(10) 
     .map(n => `${space(x*4)}${x}-${n}`); 
    }) 
    .subscribe(console.log.bind(console)); 

Он будет:

0-1 
0-2 
0-3 
    1-0 
0-4 
    1-1 
0-5 
    1-2 
    1-3 
     2-0 
    1-4 
     2-1 
    1-5 
     2-2 
     2-3 
      3-0 
     2-4 
      3-1 
     2-5 
      3-2 
      3-3 
       4-0 
      3-4 
       4-1 
      3-5 
       4-2 
       4-3 
        5-0 
       4-4 
        5-1 
       4-5 
        5-2 
       4-6 
        5-3 
       4-7 
        5-4 
       4-8 
        5-5 
       4-9 
        5-6 
        5-7 
        5-8 
        5-9 
+0

Да, я подумал об этом несколько часов и пришел к выводу, что, возможно, нет никакого способа сделать это без специального оператора. Спасибо за эту реализацию оператора. –

1

Мой ответ в C#. Сожалею.

Вы не указали, были ли ваши наблюдаемые горячими или холодными. Может быть, ваши странные цифры исходили из того факта, что ваше оконное оформление сделало новую подписку на вашу «внутреннюю» наблюдаемую, поскольку она была сдвинута с 1-го места в окне до второго. Моя первая попытка сделала то же самое:

var q = Observable.Interval(TimeSpan.FromSeconds(1)) 
     .Select(i => Observable.Interval(TimeSpan.FromMilliseconds(100)) 
     .Select(x => $"_{i}e{x}")); 

w = q.Zip(q.Skip(1), (prev, curr)=> prev.Merge(curr)).Switch(); 

Я думаю, что вы будете бороться, чтобы сделать что-нибудь, чтобы избежать этого, что разве собаки дерьма уродливого, если не создать оператор, поскольку есть государство участвует в управлении этим. (Очевидно, кто-то докажет мне, что здесь неправильно!)

Здесь был мой подход к оператору, который также поддерживает поддерживаемую параметризацию.

public static class Ex 
{ 
    public static IObservable<T> SelectManyLatest<T>(this IObservable<IObservable<T>> source, int latest) 
    { 
     return Observable.Create<T>(o => 
     { 
      var d = new Queue<IDisposable>(); 

      source.Subscribe(os => 
      { 
       if(d.Count == latest) 
        d.Dequeue().Dispose(); 

       d.Enqueue(os.Subscribe(o.OnNext, o.OnError,() => {})); 

      }, o.OnError, o.OnCompleted); 

      return Disposable.Create(()=>new CompositeDisposable(d).Dispose()); 
     });  
    } 
} 

Опять же, извините за C#

+0

Да, я не вижу, как вы можете это сделать без нового оператора. Тем не менее, ваш оператор не будет работать правильно, если 2-й внутренний наблюдаемый завершится до того, как выйдет 3-й внутренний наблюдаемый и до первого завершения: если это произойдет, я верю, что ваш оператор отпишет от 1-го внутреннего наблюдаемого ... Я думаю, вам нужно удалить подписка из очереди, если она завершена – Brandon

+0

Благодарим вас за проницательный ответ, но я согласен с ответом на JavaScript, который больше подходит для вопроса. –

+0

@Brandon да, это был немного быстрый хак. Я видел проблему, но ее решение связано с определением некоторой политики, которая не присутствовала в вопросе, поэтому я проигнорировал ее. ваша политика кажется разумной, но она зависит от того, что представляет поток, тишина от второго, а значения из третьего могут быть требованием. – dunkymole

2

Вот решение, которое использует встроенный в операторах. Сначала мы разбиваем Наблюдаемый на N наблюдаемых, где каждый наблюдаемый имеет соответствующий N-й последний элемент в последовательности. Затем мы получаем flatMapLatest каждый из них и объединим их.

Rx.Observable.prototype.flatMapLatestN = function(N, selector, thisArg) { 
    var self = this; 
    return Rx.Observable.merge(Rx.Observable.range(0, N).flatMap(function(n) { 
     return self.filter(function(x, i) { 
      return i % N === n; 
     }).flatMapLatest(selector, thisArg); 
    })); 
} 

Или в ES2015:

Rx.Observable.prototype.flatMapLatestN = function(N, selector, thisArg) { 
    const {merge, range} = Rx.Observable; 
    return merge(
     range(0, N) 
      .flatMap(n => 
       this.filter((x, i) => i % N === n).flatMapLatest(selector, thisArg)) 
    ); 
} 

Используя тот же тест, как Daiwei:

Выход для N=1 (такой же, как flatMapLatest):

0-0 
0-1 
0-2 
    1-0 
    1-1 
    1-2 
     2-0 
     2-1 
     2-2 
      3-0 
      3-1 
      3-2 
       4-0 
       4-1 
       4-2 
        5-0 
        5-1 
        5-2 
        5-3 
        5-4 
        5-5 
        5-6 
        5-7 
        5-8 
        5-9 

Выход для N=2:

0-0 
0-1 
0-2 
0-3 
    1-0 
0-4 
    1-1 
0-5 
    1-2 
    1-3 
     2-0 
    1-4 
     2-1 
    1-5 
     2-2 
     2-3 
      3-0 
     2-4 
      3-1 
     2-5 
      3-2 
      3-3 
       4-0 
      3-4 
       4-1 
      3-5 
       4-2 
       4-3 
        5-0 
       4-4 
        5-1 
       4-5 
        5-2 
       4-6 
        5-3 
       4-7 
        5-4 
       4-8 
        5-5 
       4-9 
        5-6 
        5-7 
        5-8 
        5-9 

Выход для N=3:

0-0 
0-1 
0-2 
0-3 
    1-0 
0-4 
    1-1 
0-5 
    1-2 
0-6 
    1-3 
     2-0 
0-7 
    1-4 
     2-1 
0-8 
    1-5 
     2-2 
    1-6 
     2-3 
      3-0 
    1-7 
     2-4 
      3-1 
    1-8 
     2-5 
      3-2 
     2-6 
      3-3 
       4-0 
     2-7 
      3-4 
       4-1 
     2-8 
      3-5 
       4-2 
      3-6 
       4-3 
        5-0 
      3-7 
       4-4 
        5-1 
      3-8 
       4-5 
        5-2 
      3-9 
       4-6 
        5-3 
       4-7 
        5-4 
       4-8 
        5-5 
       4-9 
        5-6 
        5-7 
        5-8 
        5-9 
+1

Это милое решение. – Bimper

+2

Кроме того, не очень научная точность документа, собаки не имеют нескольких носов, в отличие от этой картины. Пожалуйста, исправьте это. Благодарю. – Bimper

+0

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/range.md – Bimper

Смежные вопросы