2017-01-11 4 views
1

Есть ли хороший способ проверить, не завершен ли Наблюдаемый в это точное время?проверить, если не завершено. Наблюдаемый пуст

let cache = new ReplaySubject<number>(1); 
... 
// Here I want to know if 'cache' still empty or not. And, for example, fill it with initial value. 
cache.isEmpty().subscribe(isEmpty => { 
    if (isEmpty) { 
     console.log("I want to be here!!!"); 
     cache.next(0); 
    } 
}); 
// but that code does not work until cache.complete() 
+0

«пустой» 'Observable' не очень содержательное понятие в этом контексте, я думаю, вы приравнивая проблемы между' 'Observer' и Observable'. Что вы пытаетесь достичь этим? – paulpdaniels

ответ

1

Вы можете использовать takeUntil():

Observable.of(true) 
    .takeUntil(cache) 
    .do(isEmpty => { 
     if (isEmpty) { 
      console.log("I want to be here!!!"); 
      cache.next(0); 
     } 
    }) 
    .subscribe(); 

Однако это будет просто работать один раз.


Другим способом было бы «нулевой» кэш и инициализировать его как пустой, используя BehaviorSubject:

let cache = new BehaviorSubject<number>(null as any); 
... 
cache 
    .do(content => { 
     if (content == null) { 
      console.log("I want to be here!!!"); 
      cache.next(0); 
     } 
    }) 
    .subscribe(); 

И, конечно, вы можете инициализировать кэш со значением по умолчанию какой-то сразу.

+2

Кроме того, 'BehaviorSubject' имеет метод [getValue] (http://reactivex.io/rxjs/file/es6/BehaviorSubject.js.html#lineNumber21), который возвращает текущее значение, если оно есть. –

+0

takeUntil достаточно хорош. –

1

На самом деле, это не так просто, и принятый ответ не очень универсален. Вы хотите проверить, является ли ReplaySubject пустым в этот конкретный момент времени.

Однако, если вы хотите, чтобы сделать это действительно совместимы с ReplaySubjectнеобходимо учитывать также windowTime параметр, который определяет«время жизни» для каждого значения, которое проходит через этот объект. Это означает, что ваш cache пуст или нет изменится во времени.

ReplaySubject имеет способ _trimBufferThenGetEvents, который делает то, что вам нужно. К сожалению, этот метод является конфиденциальным, поэтому вам нужно сделать небольшой «взлом» в JavaScript и напрямую расширить его prototype.

import { ReplaySubject } from 'rxjs'; 

// Tell the compiler there's a isNowEmpty() method 
declare module "rxjs/ReplaySubject" { 
    interface ReplaySubject<T> { 
     isNowEmpty(): boolean; 
    } 
} 

ReplaySubject.prototype['isNowEmpty'] = function() { 
    let events = this._trimBufferThenGetEvents(); 
    return events.length > 0; 
}; 

Затем с помощью этого ReplaySubject проста:

let s = new ReplaySubject<number>(1, 100); 
s.next(3); 
console.log(s.isNowEmpty()); 
s.next(4); 

setTimeout(() => { 
    s.next(5); 
    s.subscribe(val => console.log('cached:', val)); 
    console.log(s.isNowEmpty()); 
}, 200); 

setTimeout(() => { 
    console.log(s.isNowEmpty()); 
}, 400); 

Обратите внимание, что некоторые вызовы к isNowEmpty() возвращения true, в то время как другие возвращаются false. Например, последний возвращает false, потому что в то же время значение было недействительным.

Этот пример напечатает:

true 
cached: 5 
true 
false 

Смотреть демо: https://jsbin.com/sutaka/3/edit?js,console

0

Вы можете использовать .scan() накапливать свой счет, и карту, что логическое значение, будь-то отлична от нуля. (Требуется второй параметр для начального значения, который бы начинал с 0, поэтому он всегда отражает текущий счетчик.)

Я также добавил .filter() вместо оператора if, чтобы сделать это очиститель:

let cache = new ReplaySubject<number>(1); 

cache 
    .map((object: T) => 1) 
    .scan((count: number, incoming: number) => count + incoming, 0) 
    .map((sum) => sum == 0) 
    .filter((isEmpty: boolean) => isEmpty) 
    .subscribe((isEmpty: boolean) => { 
     console.log("I want to be here!!!"); 
     cache.next(0); 
    }); 
Смежные вопросы