2017-01-28 2 views
1

Я хочу, чтобы заполнить внутренний поток, который, естественно, не полный, с помощью оператора takeUntil, как это:Использование takeUntil для завершения потока RxJS

outerObservable 
    .mergeMap(() => { 
     innerObservable 
      .takeUntil(stopObservable$) 
    }); 

Это работает, внутренний поток завершает, как и ожидалось, но Я хотел бы, чтобы внешний поток возвращал одно последнее значение после сигнала останова. Даже после многих поисковых запросов я до сих пор не знаю, как это сделать.

EDIT:

Я написал оператор, который, кажется, сделать трюк, но оставив открытым вопрос, потому что я знаю, что есть лучший путь или что-то я полностью недоразумение.

function takeUntilThen(notifier, oneLastValue) { 
 
    return Rx.Observable.create(subscriber => { 
 
     var source = this; 
 
     notifier.subscribe(() => { 
 
      subscriber.next(oneLastValue); 
 
      subscriber.complete() 
 
     }); 
 
     return source.subscribe(value => { 
 
      subscriber.next(value); 
 
     }, 
 
     err => subscriber.error(err), 
 
     () => subscriber.complete()); 
 
    }); 
 
}

+1

осторожен, ваш оператор пропускает его подписка подписчика! – jayphelps

+0

спасибо, я посмотрю, что, когда я выясню, что это значит :-) – mysomic

ответ

3

Похоже, вы хотите, чтобы мчаться между innerObservable и stopObservable и выигрывает Какой должна быть в состоянии вывести что-то.

Для этого используется оператор aptly named .race(), а не .takeUntil(). Смотрите пример в рецепте DOCS Redux-наблюдаемый на Отмена:

https://redux-observable.js.org/docs/recipes/Cancellation.html#cancel-and-do-something-else-eg-emit-a-different-action

const somethingEpic = action$ => 
    action$.ofType(SOMETHING) 
    .mergeMap(action => 
     innerObservable 
     .race(
      stopObservable 
      .take(1) 
     ) 
    ); 

Поскольку ваш пример псевдо-код, вот один, что более конкретно:

import { ajax } from 'rxjs/observable/dom/ajax'; 

const fetchUserEpic = action$ => 
    action$.ofType(FETCH_USER) 
    .mergeMap(action => 
     ajax.getJSON(`/api/users/${action.payload}`) 
     .map(response => fetchUserFulfilled(response)) 
     .race(
      action$.ofType(FETCH_USER_CANCELLED) 
      .map(() => incrementCounter()) 
      .take(1) 
     ) 
    ); 
+0

Спасибо за ответ. Будет ли это работать, когда внутреннее наблюдение никогда не завершится естественным путем? Я посмотрел и поиграл с этим решением, которое было представлено в документах, которые можно было увидеть с помощью сокращения, но это не помогло мне. – mysomic

+0

@mysomic вы можете объяснить, что вы подразумеваете под «никогда не заканчивается естественно»? – jayphelps

+0

В этом примере поток ajax только когда-либо возвращает одно значение, а затем завершается. Мой поток (который я имею в том же месте) возвращает много значений, поскольку события происходят, и они никогда не завершаются (естественно). Поэтому, по крайней мере, на мой взгляд, не может быть гонки ... по крайней мере, я полагаю, что этот пример не работает для моего конкретного случая :-) Я отлично работал, используя поток ajax в пример – mysomic

Смежные вопросы