1

Я пытаюсь оценить с помощью Rx, чтобы создать последовательность из паба/подэлемента (например, классический шаблон наблюдателя, где следующий элемент публикуется производителем (ами)) , Это в основном то же самое, что и .net-события, за исключением того, что нам нужно его обобщить таким образом, что наличие события не является требованием, поэтому я не могу воспользоваться Observable.FromEvent. Я играл с Observable.Create и Observable.Generate и обнаружил, что мне приходится писать код, чтобы позаботиться о pub/sub (т. Е. Мне нужно написать код производителя/потребителя, чтобы запечатать опубликованный элемент, а затем уничтожить его вызывая IObserver.OnNext() с ним), поэтому кажется, что я не пользуюсь Rx ...Rx как создать последовательность из паба/подкарта

Я смотрю вниз по правильному пути или это подходит для Rx?

Благодаря

ответ

2

Ваш издатель просто выставляет некоторые IObservables как свойства. И вашим потребителям только Subscribe им (или делать любые Rx-fu они хотят, прежде чем подписываться).

Иногда это так же просто, как использование Subjects в вашем издательстве. И иногда это сложнее, потому что ваш издатель фактически наблюдает за другим наблюдаемым процессом.

Вот тупой пример:

public class Publisher 
{ 
    private readonly Subject<Foo> _topic1; 

    /// <summary>Observe Foo values on this topic</summary> 
    public IObservable<Foo> FooTopic 
    { 
     get { return _topic1.AsObservable(); } 
    } 

    private readonly IObservable<long> _topic2; 

    /// <summary>Observe the current time whenever our clock ticks</summary> 
    public IObservable<DateTime> ClockTickTopic 
    { 
     get { return _topic2.Select(t => DateTime.Now); } 
    } 

    public Publisher() 
    { 
     _topic1 = new Subject<Foo>(); 
     // tick once each second 
     _topic2 = Observable.Interval(TimeSpan.FromSeconds(1)); 
    } 

    /// <summary>Let everyone know about the new Foo</summary> 
    public NewFoo(Foo foo) { _topic1.OnNext(foo); } 
} 


// interested code... 
Publisher p = ...; 
p.FooTopic.Subscribe(foo => ...); 

p.ClickTickTopic.Subscribe(currentTime => ...); 

// count how many foos occur during each clock tick 
p.FooTopic.Buffer(p.ClockTickTopic) 
    .Subscribe(foos => Console.WriteLine("{0} foos during this interval", foos.Count)); 
0

Использование RX, безусловно, хорошо подходит для паба/суб. Вот демонстрация, которая иллюстрирует простейший возможный паб/дополнительный шаблон с использованием IObservable и RX.

Добавить реактивные расширения (RX) в проект с помощью NuGet, найти rx-main и установить Reactive Extensions - Main Library.

using System; 
using System.Reactive.Subjects; 

namespace RX_2 
{ 
    public static class Program 
    { 
     static void Main(string[] args) 
     { 
      Subject<int> stream = new Subject<int>(); 

      stream.Subscribe(
       o => 
       { 
        Console.Write(o); 
       }); 

      stream.Subscribe(
       o => 
       { 
        Console.Write(o); 
       }); 

      for (int i = 0; i < 5; i++) 
      { 
       stream.OnNext(i); 
      } 

      Console.ReadKey(); 
     } 
    } 
} 

При выполнении кода выводит это:

0011223344 
+0

Для того, чтобы скомпилировать этот код, вы должны добавить RX к вашему решению с помощью NuGet. – Contango