2015-12-01 3 views
4

Есть ли способ иметь горячий наблюдаемый от EventEmitter (или эквивалент, доступный в Angular 2 alpha 46/RxJS 5 alpha)? то есть , если мы подписываемся после того, как значение будет разрешено, оно инициирует с ранее разрешенным значением. Подобно тому, что мы имеем, когда всегда возвращаем одно и то же обещание.Горячая и общая информация Наблюдаемая с EventEmitter

В идеале, только с использованием объектов Angular 2 (я прочитал где-то светлый RxJS, который будет встроен позже, чтобы удалить зависимость), в противном случае импорт RxJS в порядке. AsyncSubject, похоже, соответствует моей потребности, но он недоступен в RxJS 5 alpha.

Я пробовал следующее, без успеха (никогда не срабатывает). Любая идея о том, как его использовать?

let emitter = new EventEmitter<MyObj>(); 
setTimeout(() => {emitter.next(new MyObj());}); 
this.observable = emitter; 
return this.observable.share(); 

Full plunker here comparing hot and cold

USECASE: достичь некоторые асинхронных объекты только один раз (например, серия HTTP вызовов слита/завернутой в нового EventEmitter), но обеспечивают разрешенный объект асинхронного к любой услуге/компонент, подписавшийся на него, даже если они подписываются после его разрешения (получены ответы HTTP).

EDIT: вопрос не в том, как объединить ответы HTTP, но как получить (горячий?) Наблюдаемый от EventEmitter или любой эквивалент, доступный с альфа-альфа-альфа-46/RxJS 5, который позволяет подписываться после асинхронизации результат получается/разрешен (HTTP - всего лишь пример асинхронного происхождения). myEventEmitter.share() не работает (cf plunker выше), хотя он работает с Observable, возвращаемым HTTP (cf plunker from @ Eric Martinez). И что касается метода Angular 2 alpha 46, .toRx() больше не существует, EventEmitter является наблюдаемым и субъектом.

Это то, что хорошо работает с обещаниями, пока мы всегда возвращаем один и тот же объект обещания. Поскольку у нас есть наблюдатели, представленные с помощью услуг HTTP Angular 2, я хотел бы избежать смешивания обещаний и наблюдателей (и, как говорят, наблюдатели более сильны, чем обещания, поэтому он должен позволять делать то, что легко с обещаниями).

Specs about share() (Я не нашел doc для версии 5 alpha - версия, используемая Angular 2) - работает на Observable, возвращенной службой HTTP с угловым 2, не работая над EventEmitter.

EDIT: разъяснено, почему не использовать Observable, возвращаемое HTTP, и добавил, что не использовать RxJS напрямую было бы еще лучше.

EDIT: изменено описание: проблема связана с несколькими подписками, а не с объединением результатов HTTP.

Спасибо!

+0

У меня есть пример http, использующего share в этом [plnkr] (http://plnkr.co/edit/GFdXPncYPxx0IcqIwQnO?p=info). Почему вы спрашиваете о преобразовании EventEmitter в нечто такое, когда ваш usecase использует модуль Http? –

+0

В моем описании вводящий в заблуждение, usecase немного сложнее: серия HTTP-вызовов, и после завершения я объединять/обертывать результаты в новом Observable. Эквивалент обещаний (синтаксис углового 1): 'return $ q.all (httpPromisesToResolve);' – Antoine

+0

Я не думаю, что для этого вам понадобится 'share()'. Вам нужно ['flatMap'] (https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/selectmany.md). Проверьте это [пример] (http://plnkr.co/edit/9sPNlaNLh7Aie9yH1H7i?p = preview) –

ответ

1

ReplaySubject делает то, что я искал. @robwormald предоставил рабочий пример на gitter, который я слегка изменил, чтобы лучше продемонстрировать.

Подвергая HTTP-ответа:

import {Injectable} from 'angular2/angular2'; 
import {Http} from 'angular2/http'; 
import {ReplaySubject} from '@reactivex/rxjs/dist/cjs/Rx' 

@Injectable() 
export class PeopleService { 
    constructor(http:Http) { 
    this.people = new ReplaySubject(1); 

    http.get('api/people.json') 
     .map(res => res.json()) 
     .subscribe(this.people); 
    } 
} 

подписчики несколько раз:

// ... annotations 
export class App { 
    constructor(peopleService:PeopleService) { 

    people.subscribe(v => { 
     console.log(v) 
    }); 

    //some time later 

    setTimeout(() => { 
     people.subscribe(v => { 
     console.log(v) 
     }); 
     people.subscribe(v => { 
     console.log(v) 
     }); 
    },2000) 
    } 
} 

Full plunker

EDIT: BehaviorSubject является альтернативой. В этом случае разница - это начальное значение, например, если мы хотим отображать содержимое из кэша до обновления с помощью HTTP-ответа.

+0

Итак, мы вернемся к началу в конце. Рад, что вы нашли ответ! Takeaways как новый член SO: 1. сформулируйте ваши потребности как можно более разделенные с (воображаемым) решением (вы настаивали на «эмитенте событий» и в конечном итоге не использовали его - это происходит на самом деле довольно часто) и представляют ваш текущий (неудачный) подход, 2. быть настолько точным, насколько это разумно может быть в понятиях и формулировках.Пример: первое предложение вопроса: 'если мы подписываемся после того, как значение будет разрешено, оно запускается с ранее разрешенным значением.' - понятно с обещаниями, но уже не с наблюдаемыми. – user3743222

+0

Но в любом случае всегда будет так, что вам нужна чат-сессия, чтобы правильно выразить вашу проблему. Поэтому просто упомянем пункты раньше для всех, кто будет смотреть на вопрос и ответ. – user3743222

+0

Кажется, я не использовал правильные слова, чтобы выразить поведение, которое я искал (я не знаю, как сказать «разрешить» с наблюдаемым лексиконом), спасибо за ваши советы! Я попытаюсь применить его. – Antoine

2

Функциональность, которую вы, по-видимому, описываете, относится не к холодному наблюдаемому, а к Rx.BehaviourSubject. Посмотрите здесь объяснение по темам Rxjs: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/subjects.md.

Цитирую оттуда:

BehaviourSubject похож на ReplaySubject, за исключением того, что она сохраняется только последнее значение его опубликования. BehaviourSubject также требует значения по умолчанию при инициализации.Это значение отправляется наблюдателям, когда еще не полученное значение еще не получено. Это означает, что все подписчики мгновенно получат значение в подписке, если объект уже не завершен.

Rx.AsyncSubject будет ближе всего в поведении обещания:

AsyncSubject похож на повтор и поведение субъектов, однако он будет хранить только последнее значение, и только опубликовать его, когда последовательность выполнен. Тип AsyncSubject можно использовать для ситуаций, когда наблюдаемый источник является горячим и может завершиться до того, как любой наблюдатель сможет подписаться на него. В этом случае AsyncSubject может по-прежнему предоставлять последнее значение и публиковать его любым будущим подписчикам.

еще два замечания:

  • в вашей plunker: this._coldObservable = emitter.share();. Использование share возвращает горячий наблюдаемый!
  • EventEmitter фактически расширяет предмет в первую очередь

UPDATE: Обертывание в EventEmitter вокруг Rx.Observable:

function toRx (eventEmitter) { 
    return Rx.Observable.create(function (observer) { 
    eventEmitter.subscribe(function listener (value) {observer.onNext(value)}); 
    // Ideally you also manage error and completion, if that makes sense with Angular2 
    return function() { 
     /* manage end of subscription here */ 
    }; 
    }; 
) 
} 

После того, как вы есть, что Rx.Observable, вы можете применить share(), shareReplay(1), ничего вы хотите.

Моя ставка заключается в том, что команда Angular рано или поздно предложит функцию бригадирования, но если вы не хотите ждать, вы можете сделать это сами.

+0

Спасибо за ваш ответ! Кажется, что «AsyncSubject» - это то, что я ищу, хотя объект, заданный Angular lib, будет еще лучше. Но я не смог найти его в RxJS lib, используемом Angular 2 (версия 5.0.0-alpha.7 - я исправил номер версии в вопросе). Есть идеи об этом? Что-то вроде 'import {AsyncSubject} из '@ reactivex/rxjs/dist/cjs/Rx';' Кажется, что доступны объекты BehaviorSubject и ReplaySubject, но ни один из них не работает здесь, как ожидалось. PS: казалось, я перевернул слова 'hot' и' cold', * hot *, о чем я думал. – Antoine

+0

Я действительно не понимаю, что вам нужно. Предположим, что ваш HTTP-вызов возвращает наблюдаемый, вы можете эмулировать '$ q.all (httpPromisesToResolve)' с 'Rx.Observable.forkJoin ([httpCalls])'. Ср https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/forkjoin.md и http://reactivex.io/documentation/operators/zip.html (прокрутите вниз до js часть и документация 'forkJoin'). Затем с помощью команды 'Rx.Observable.forkJoin ([httpCalls]). Share()' вы можете подписаться на все, что вы хотите, на 'share' obs., Будет только одна подписка на' Rx.Observable.forkJoin ([httpCalls ]) ' – user3743222

+0

Теперь, если вы хотите получить последнее значение, даже если подписались после завершения наблюдаемого, используйте' Rx.Observable.forkJoin ([httpCalls]). ShareReplay (1) 'вместо этого. Ср https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/sharereplay.md. Я не знаю, будет ли это работать в версии 5.0.0-alpha.7'. – user3743222

Смежные вопросы