2017-01-13 1 views
0

В моем приложении Angular 2.x у меня есть компонент, который подписывается на наблюдаемый, который может быть выставлен службой. Наблюдаемая служба данных в настоящее время реализована с запросами HTTP-запросов, но это изменится на реактивную реализацию WS. Я хотел бы оставить опрос вне компонентов, так что вопрос: как я могу принять меры, например, HTTP-опрос через setInterval, как только у меня есть подписчики? И как я могу принять меры, например, clearInterval, когда абонентов больше нет?RxJs: принять меры для первого и последнего подписчиков к наблюдаемым

Вид асинхронно обновление:

<div *ngFor="let headline of headlines$ | async"> 
    <md-list-item (click)="onSelect(headline)"> 
     <h4 md-line>{{headline.title}}</h4> 
    </md-list-item> 
    <md-divider></md-divider> 
    </div> 

компонент получает заголовок наблюдаемого от службы:

export class HeadlinesComponent implements OnInit, OnDestroy { 
    constructor(
     private _headlineService: HeadlineService, 
) { 
    } 

    headlines$: Observable<Headline[]>; 

    ngOnInit() { 
    this.headlines$ = this._headlineService.headlines$; 
    } 
} 

И сервис реализован в наблюдаемом хранилище данных:

@Injectable() 
export class HeadlineService { 
    constructor(
    private _http: Http) { 
    _init(); 
    } 

    private _headlines$: BehaviorSubject<Headline[]> = 
    new BehaviorSubject<Headline[]>([]); 

    get headlines$(): Observable<Headline[]> { 
    return this._headlines$.asObservable(); 
    } 

    private _store: { 
    headlines: Headline[] 
    } = { headlines: [] }; 

    private _save(headlines: Headline[]) { 
    this._store.headlines = headlines; 
    this._headlines$.next(Object.assign({}, this._store).headlines); 
    } 
} 

и магазин обновляется по HTTP:

private _interval; 

    private _init() { 
    let self = this; 
    this._interval = setInterval(function(){ self._loadHeadlines();}, 5000); 
    } 

    private _loadHeadlines(): Observable<Headline[]> { 
    let self = this; 
    let observable: Observable<Headline[]> = this._http 
     .get(url, options) 
     .map(response => response.json()._embedded.headlines) 
     .share(); 

    observable.subscribe(
     headlines => self._save(headlines) 
    );  

    return observable; 
    } 

Как реализовано, служба заголовка начинает опрос по строительству, а не задерживается до тех пор, пока не появится абонент. Чтобы получить правильный жизненный цикл, я мог бы открыть функциональные возможности службы _loadHeadlines и начать опрос, когда запускается ngOnInit компонента (и останавливается в ngOnDestroy), но он предоставляет детали реализации службы компоненту (эти данные будут меняться при переходе на веб- реализация сокетов), и мне нужно будет точно подсчитать количество подписчиков.

Итак, есть ли хороший способ RxJs-ish решить эту проблему?

ответ

0

Я не использовал RxJS в Angular, но из вашего примера кода вы можете указать .create наблюдаемый, который запустит интервал для опроса и возвращает функцию прозрачного интервала, которая будет вызываться, когда она будет отписана.

/* 
    Increment value every 1s, emit even numbers. 
*/ 
const evenNumbers = Rx.Observable.create(function(observer) { 
    let value = 0; 
    const interval = setInterval(() => { 
     if(value % 2 === 0){ 
      observer.next(value); 
     } 
     value++; 
    }, 1000); 

    return() => clearInterval(interval); 
}); 
//output: 0...2...4...6...8 
const subscribe = evenNumbers.subscribe(val => console.log(val)); 
//unsubscribe after 10 seconds 
setTimeout(() => { 
    subscribe.unsubscribe(); 
}, 10000); 

В приведенном выше примере от learnrxjs.io

JSBin Example

+0

Спасибо за примечание @subhaze и указатель на обучение-rxjs. Интервал в этом случае начинается с первого абонента и заканчивается последним абонентом. Я могу использовать обработчик завершения для последнего абонента, но любая идея, как я могу инициировать действие для первого абонента? –

+0

Я не уверен, что я полностью следую, но вот пример слияния в наблюдаемом, что только триггеры на первом испускают http://jsbin.com/bewedehoze/1/edit?jsconsole – subhaze

0

Наблюдаемые ленивы и только начинают работать, когда подписаны. Когда вы перестанете слушать, они откажутся от подписки и очистки.

Что вы ищете - это поведение многоадресной рассылки с refcounting для init при первой подписке и только тогда, когда никто не подписывается больше на очистку. Это делается с помощью оператора .share().

Смежные вопросы