2015-03-25 4 views
5

У меня есть класс, который будет отвечать за генерацию событий на частый, но нерегулярный интервал, который должны потреблять и работать другие классы. Я хочу использовать Reactive Extensions для выполнения этой задачи.Как подтолкнуть сущность к наблюдаемому Rx?

Потребительская сторона этого очень проста; У меня есть потребительский класс, реализующий IObserver<Payload>, и все выглядит хорошо. Проблема возникает в классе производителей.

Реализация IObservable<Payload> непосредственно (то есть, поставив свою собственную реализацию для IDisposable Subscribe(IObserver<Payload>) есть, в соответствии с документацией, не рекомендуется. Он предлагает вместо составления с Observable.Create() набором функций. Так как мой класс будет работать в течение длительного времени, я уже пробовал создавать наблюдаемый с var myObservable = Observable.Never(), а потом, когда у меня есть новые Payloads доступны, вызывая myObservable.Publish(payloadData). когда я делаю это, хотя, я, кажется, не попал в OnNext реализации в моем потребителе.

Я думаю, что я могу создать событие в своем классе, а затем создать Observable с помощью функции FromEvent, но это похоже на чрезмерное сотрудничество (т. е. кажется странным, что новая жара Observables «требует» событий для работы). Есть ли простой подход, который я рассматриваю здесь? Каков «стандартный» способ создания собственных источников Observable?

+1

Как правило, если вы обнаруживаете, что используете либо «IObservable », либо «IObserver », то вы, вероятно, что-то не так. – Enigmativity

ответ

6

Создать new Subject<Payload> и называем это OnNext метод отправки события. Вы можете использовать Subscribe в теме с вашим наблюдателем.

Часто обсуждается использование предметов. Подробное обсуждение использования объектов (которые ссылаются на праймер), см. here, но в целом, этот прецедент звучит корректно (т. Е. Он, вероятно, соответствует локальным + горячим критериям).

В качестве альтернативы, перегрузки принимающих делегатов подписки могут устранить необходимость предоставить вам реализацию IObserver<T>.

+0

Понятие предмета и предоставленные ссылки действительно помогли мне понять это более полно. – GWLlosa

1

Observable.Never() не «отправить» уведомление, вы должны использовать Observable.Return(yourValue)

Если вам нужно руководство с конкретными примерами я рекомендую прочитать Intro to Rx

+0

Я могу быть очень смущен здесь ... Если я использую Observable.Return, не будет ли это наблюдаемое только одно значение? Мне нужен тот, который остается ненадолго, так что я могу периодически нажимать на него больше значений, когда я их вычисляю. – GWLlosa

+0

Вы должны действительно прочитать «Intro to Rx», IObservable - это последовательность событий, когда Observer получает значение OnNext (значение T), с которым вы получаете последнее значение. – kondas

+1

В соответствии с документами: «Один последний важный заводский метод или примитивный конструктор называется Return. Его роль состоит в том, чтобы представить последовательность одноэлементной последовательности , так же как одноячеечный массив будет находиться в мире последовательностей IEnumerable. Поведение , наблюдаемое по подписанные наблюдатели - это два сообщения: OnNext со значением и сигнализацией OnCompleted, конец этой последовательности получен: IObservable Источник = Наблюдаемый.Возврат (42); Выполнение этого кода дает следующий результат: OnNext: 42 OnCompleted «Я не хочу завершенный сигнал, я хочу, чтобы последовательность оставалась открытой/живой». – GWLlosa

0

Если я не попадался лучшим способом сделать это, что я На данный момент мы остановились на использовании BlockingCollection.

var _itemsToSend = new BlockingCollection<Payload>(); 
IObservable<MessageQueuePayload> _deliverer = 
_itemsToSend.GetConsumingEnumerable().ToObservable(Scheduler.Default); 
Смежные вопросы