2012-06-25 2 views
9

Я недавно читал об IObservable. До сих пор я смотрел различные вопросы SO и смотрел видео о том, что они могут делать. Весь механизм «толчка», о котором я думаю, блистателен, но я все еще пытаюсь понять, что именно все делает. Из моих чтений я думаю, что IObservable - это то, что можно «наблюдать», а IObservers - «наблюдатели».Использование IObservable вместо событий

Итак, теперь я стараюсь реализовать это в своем приложении. Есть несколько вещей, которые я бы хотел отключить, прежде чем начать. Я видел, что IObservable является противоположностью IEnumerable, однако я не могу видеть какие-либо места в моем конкретном экземпляре, которые я могу включить в свое приложение.

В настоящее время я активно использую события, так что я вижу, что «сантехника» начинает становиться неуправляемой. Я думаю, что IObservable может помочь мне здесь.

Рассмотрим следующую конструкцию, которая моя обертка вокруг моего ввода/вывода в моем приложении (FYI, я обычно приходится иметь дело со строками):

У меня есть базовый интерфейс называется IDataIO:

public interface IDataIO 
{ 
    event OnDataReceived; 
    event OnTimeout: 
    event OnTransmit; 
} 

Теперь я в настоящее время есть три класса, которые реализуют этот интерфейс, каждый из этих классов в некотором роде использовать метод асинхронных вызовов, вводя некоторый тип многопоточной обработки:

public class SerialIO : IDataIO; 
public class UdpIO : IDataIO; 
public class TcpIO : IDataIO; 

Существует один экземпляр каждого из этих классов, завернутых в мой последний класс, называемый IO (который также реализует IDataIO - придерживаясь моей шаблон стратегии):

public class IO : IDataIO 
{ 
    public SerialIO Serial; 
    public UdpIO Udp; 
    public TcpIO Tcp; 
} 

Я использовал шаблон стратегии инкапсулировать эти три класса, так что при изменении между различными экземплярами IDataIO во время выполнения он становится «невидимым» для конечного пользователя. Как вы могли себе представить, это привело к тому, что на заднем плане появилось довольно много событий.

Итак, как я могу использовать уведомление «push» здесь в моем случае? Вместо того, чтобы подписываться на события (DataReceived и т. Д.), Я хотел бы просто подталкивать данные к любому, кого это интересует. Я немного не знаю, с чего начать. Я все еще пытаюсь играть с идеями/родовыми классами Subject и различными воплощениями этого (ReplaySubject/AsynSubject/BehaviourSubject). Может кто-нибудь, пожалуйста, просветит меня на этом (возможно, со ссылкой на мой дизайн)? Или это просто не идеально подходит для IObservable?

PS. Вы можете исправить любого из моих «недопониманий» :)

ответ

8

Наблюдаемых являются большими для представления потоков данных, так что ваше DataReceived события будет моделью красиво наблюдаемой картина, что-то вроде IObservable<byte> или IObservable<byte[]>. Вы также получаете дополнительную выгоду от OnError и OnComplete, которые удобны.

С точки зрения его реализации, трудно сказать для вашего точного сценария, но мы часто используем Subject<T> в качестве основного источника и вызываем OnNext для ввода данных.Может быть что-то вроде

// Using a subject is probably the easiest way to push data to an Observable 
// It wraps up both IObservable and IObserver so you almost never use IObserver directly 
private readonly Subject<byte> subject = new Subject<byte>(); 

private void OnPort_DataReceived(object sender, EventArgs e) 
{ 
    // This pushes the data to the IObserver, which is probably just a wrapper 
    // around your subscribe delegate is you're using the Rx extensions 
    this.subject.OnNext(port.Data); // pseudo code 
} 

Вы можете выставить объект через свойство:

public IObservable<byte> DataObservable 
{ 
    get { return this.subject; } // Or this.subject.AsObservable(); 
} 

Вы можете заменить DataReceived событие на IDataIO с IObservable<T> и имеют каждый класс стратегии обрабатывать свои данные в зависимости от того, каким образом они нужно и оттолкнуть до Subject<T>.

С другой стороны, тот, кто подписывается на Наблюдаемое затем может либо обработать его как событие (только с помощью Action<byte[]>), или вы можете выполнить некоторые действительно полезную работу на потоке с Select, Where, Buffer и т.д. .

private IDataIO dataIo = new ... 

private void SubscribeToData() 
{ 
    dataIo.DataObservable.Buffer(16).Subscribe(On16Bytes); 
} 

private void On16Bytes(IList<byte> bytes) 
{ 
    // do stuff 
} 

ReplaySubject/ConnectableObservable s велики, когда вы знаете, что ваш абонент будет опоздать на вечеринку, но все-таки нужно, чтобы догнать на все события. Источник кэширует все, что он нажал, и повторяет все для каждого абонента. Только вы можете сказать, действительно ли это поведение, которое вам действительно нужно (но будьте осторожны, поскольку оно будет кэшировать все, что явно увеличивает использование вашей памяти).

Когда я узнавал о Rx я нашел http://leecampbell.blogspot.co.uk/ серии блога на Rx, чтобы быть очень информативным, чтобы понять теорию (посты немного от ныне и API, изменились так что следите за этим)

+0

Hi RichK, могли бы вы подробнее остановиться на Subject собственности? Как это объявляется? И пользователь этого класса, как именно они «подписались» на IObservable 'DataReceived'. – Simon

+0

@Simon Я сделал несколько изменений, дайте мне знать, если вы все еще не уверены :) – RichK

+0

Спасибо, это очищает несколько вещей. Только 1 вещь, я предполагаю, что 'dataIo.DataObservable' является' public IObservable DataObservable'? – Simon

4

Это безусловно, идеальный случай для наблюдаемых. Класс IO, вероятно, увидит наибольшее улучшение. Для начала позвольте изменить интерфейс для использования наблюдаемых и посмотреть, насколько простым становится класс объединения.

public interface IDataIO 
{ 
    //you will have to fill in the types here. Either the event args 
    //the events provide now or byte[] or something relevant would be good. 
    IObservable<???> DataReceived; 
    IObservable<???> Timeout; 
    IObservable<???> Transmit; 
} 

public class IO : IDataIO 
{ 
    public SerialIO Serial; 
    public UdpIO Udp; 
    public TcpIO Tcp; 

    public IObservable<???> DataReceived 
    { 
     get 
     { 
      return Observable.Merge(Serial.DataReceived, 
            Udp.DataReceived, 
            Tcp.DataReceived); 
     } 
    } 

    //similarly for other two observables 
} 

SIDE ПРИМЕЧАНИЕ: Вы можете заметить, что я изменил имена элементов интерфейса. В .NET события обычно называются <event name>, а функции, которые их повышают, называются On<event name>.

Для классов производства у вас есть несколько вариантов, которые зависят от реальных источников. Предположим, вы используете класс .NET SerialPort в SerialIO и что DataReceived возвращает IObservable<byte[]>. Поскольку SerialPort уже имеет событие для полученных данных, вы можете использовать это непосредственно, чтобы сделать видимый вам нужным.

public class SerialIO : IDataIO 
{ 
    private SerialPort _port; 

    public IObservable<byte[]> DataRecived 
    { 
     get 
     { 
      return Observable.FromEventPattern<SerialDataReceivedEventHandler, 
               SerialDataReceivedEventArgs>(
         h => _port.DataReceived += h, 
         h => _port.DataReceived -= h) 
        .Where(ep => ep.EventArgs.EventType == SerialData.Chars) 
        .Select(ep => 
          { 
           byte[] buffer = new byte[_port.BytesToRead]; 
           _port.Read(buffer, 0, buffer.Length); 
           return buffer; 
          }); 
     } 
    } 
} 

В тех случаях, когда у вас нет источника событий, вам может потребоваться использовать тему, предложенную RichK. Его ответ очень хорошо описывает эту схему использования, поэтому я не буду дублировать это здесь.

Вы не указали, как вы используете этот интерфейс, но в зависимости от варианта использования может иметь смысл иметь другие функции в этих классах, возвращать IObservable и сами полностью уничтожать эти «события». С помощью шаблона async, основанного на событиях, вы должны иметь события отдельно от функции, которую вы вызываете, чтобы инициировать работу, но с помощью наблюдаемых вы можете вернуть их из функции, чтобы сделать ее более очевидной, для чего вы подписываетесь. Этот подход также позволяет наблюдаемым, возвращаемым с каждого вызова, отправлять сообщения OnError и OnCompleted, чтобы сигнализировать о завершении операции. Основываясь на вашем использовании комбинирующего класса, я не ожидаю, что это будет полезно в данном конкретном случае, но это то, что нужно иметь в виду.

+0

+1 Спасибо, здесь есть хорошая информация.Просто в команде 'Merge()' это объединяет последовательность наблюдаемых в 1. В моем приложении я буду использовать только один из (serial/udp/tcp) сразу и позволяя пользователю переключаться между различными интерфейсы (отсюда и моя дилемма с сантехникой событий). Рекомендуется ли здесь объединить наблюдаемые? Ценуйте ссылку на события Async Serial :) – Simon

+0

@Simon Вы можете применить «Where» к наблюдаемым из обернутых классов при слиянии их, который проверяет «CurrentSource» или подобное свойство для фильтрации нежелательных сообщений. –

+1

@Simon или вы можете (желательно) прекратить выпуск сообщений на других наблюдаемых, чтобы «Merge» получал только по одному. Если это так, вероятно, лучше использовать 'Switch', а не' Merge'. – yamen

0

Общий ответ на: события на реактивные события:

, так как мои iobservables обычно начинают с изменением свойств:

если событие происходит изменение в свойстве можно использовать:

IObservable<string> obs= user 
     .ItemPropertyChanged(u=>u.Name,false) 

путем ссылки на библиотеки DLL из этой статьи: Codeproject https://www.codeproject.com/Articles/731032/The-best-of-reactive-framework-

(Предположительно, автор изо всех сил, чтобы увидеть полезность реактивных расширений)

подписаться на эту «propertychange» события:

obs.Subscribe((args) => 
     { 
      //Do stuff with args.Sender,args.NewValue and args.OldValue 
     }); 

или, если заинтересованы только в изменении свойств есть NuGet пакет Rxx с подобным методом, используют так:

IObservable<string> obs=Observable2.FromPropertyChangedPattern(() => obj.Name) 

это содержит много других методов расширения


или если событие не позволяет изменения свойств/вы не хотите, чтобы реализовать INotifyPropertyChanged

class ObserveEvent_Simple 
{ 
    public static event EventHandler SimpleEvent; 
    static void Main() 
    {   
     IObservable<string> eventAsObservable = Observable.FromEventPattern(
      ev => SimpleEvent += ev, 
      ev => SimpleEvent -= ev); 
    } 
} 

, похожий на U/Gideon Engelberth из http://rxwiki.wikidot.com/101samples#toc6


или использовать этот статья кодепроекта, посвященная преобразованию событий в реактивные события

https://www.codeproject.com/Tips/1078183/Weak-events-in-NET-using-Reactive-Extensions-Rx

, который также имеет дело со слабыми подписок (что-то что может/не может быть проблемой в зависимости от того, что вы делаете с IDisposable от метода OnSubscribe)

Смежные вопросы