2013-11-22 2 views
1

У меня есть объект, основанный на событиях, который кажется идеально подходящим для RX: после подключения к сетевому источнику он вызывает события при получении сообщения и может заканчиваться либо с одной ошибкой (соединительные матрицы и т. д.) или (редко) указание на то, что сообщений больше не будет. Этот объект также имеет пару прогнозов - большинство пользователей интересуются только подмножеством полученных сообщений, поэтому появляются альтернативные события, возникающие только при появлении известных подтипов сообщений.Обтекание устаревшего объекта в IConnectableObservable

Таким образом, в процессе обучения больше о реактивном программировании, я построил следующую обертку:

class LegacyReactiveWrapper : IConnectableObservable<TopLevelMessage> 
{ 
    private LegacyType _Legacy; 
    private IConnectableObservable<TopLevelMessage> _Impl; 
    public LegacyReactiveWrapper(LegacyType t) 
    { 
     _Legacy = t; 
     var observable = Observable.Create<TopLevelMessage>((observer) => 
     { 
      LegacyTopLevelMessageHandler tlmHandler = (sender, tlm) => observer.OnNext(tlm); 
      LegacyErrorHandler errHandler = (sender, err) => observer.OnError(new ApplicationException(err.Message)); 
      LegacyCompleteHandler doneHandler = (sender) => observer.OnCompleted(); 

      _Legacy.TopLevelMessage += tlmHandler; 
      _Legacy.Error += errHandler; 
      _Legacy.Complete += doneHandler; 

      return Disposable.Create(() => 
      { 
       _Legacy.TopLevelMessage -= tlmHandler; 
       _Legacy.Error -= errHandler; 
       _Legacy.Complete -= doneHandler; 
      }); 
     }); 

     _Impl = observable.Publish(); 
    } 

    public IDisposable Subscribe(IObserver<TopLevelMessage> observer) 
    { 
     return _Impl.RefCount().Subscribe(observer); 
    } 

    public IDisposable Connect() 
    { 
     _Legacy.ConnectToMessageSource(); 
     return Disposable.Create(() => _Legacy.DisconnectFromMessageSource()); 
    } 

    public IObservable<SubMessageA> MessageA 
    { 
     get 
     { 
      // This is the moral equivalent of the projection behavior 
      // that already exists in the legacy type. We don't hook 
      // the LegacyType.MessageA event directly. 
      return _Impl.RefCount() 
        .Where((tlm) => tlm.MessageType == MessageType.MessageA) 
        .Select((tlm) => tlm.SubMessageA); 
     } 
    } 

    public IObservable<SubMessageB> MessageB 
    { 
     get 
     { 
      return _Impl.RefCount() 
        .Where((tlm) => tlm.MessageType == MessageType.MessageB) 
        .Select((tlm) => tlm.SubMessageB); 
     } 
    } 
} 

Кое-что о том, как он используется в другом месте чувствует ... от ... как-то, хотя. Вот пример использования, который работает, но кажется странным. Контекст пользовательского интерфейса для моего тестового приложения - это WinForms, но это не имеет большого значения.

// in Program.Main... 

MainForm frm = new MainForm(); 

// Updates the UI based on a stream of SubMessageA's 
IObserver<SubMessageA> uiManager = new MainFormUiManager(frm); 

LegacyType lt = new LegacyType(); 
// ... setup lt... 

var w = new LegacyReactiveWrapper(lt); 

var uiUpdateSubscription = (from msgA in w.MessageA 
          where SomeCondition(msgA) 
          select msgA).ObserveOn(frm).Subscribe(uiManager); 

var nonUiSubscription = (from msgB in w.MessageB 
         where msgB.SubType == MessageBType.SomeSubType 
         select msgB).Subscribe(
          m => Console.WriteLine("Got MsgB: {0}", m), 
          ex => Console.WriteLine("MsgB error: {0}", ex.Message), 
          () => Console.WriteLine("MsgB complete") 
         ); 

IDisposable unsubscribeAtExit = null; 
frm.Load += (sender, e) => 
{ 
    var connectionSubscription = w.Connect(); 
    unsubscribeAtExit = new CompositeDisposable(
           uiUpdateSubscription, 
           nonUiSubscription, 
           connectionSubscription); 
}; 

frm.FormClosing += (sender, e) => 
{ 
    if(unsubscribeAtExit != null) { unsubscribeAtExit.Dispose(); } 
}; 


Application.Run(frm); 

Это работает - форма запуски, обновление пользовательского интерфейса, и когда я закрываю его подписки очищаются и процесс завершается (что он не будет делать, если подключение к сети в LegacyType по-прежнему подключено). Строго говоря, достаточно просто распорядиться connectionSubscription. Однако явное выражение Connect кажется мне странным. Так как RefCount предполагается сделать это для вас, я попытался модифицировать обертку таким образом, что вместо использования _Impl.RefCount в MessageA и MessageB и явно подключиться позже, я использовал this.RefCount вместо этого и переехал вызовы Subscribe в Load обработчика. У этой проблемы была другая проблема: вторая подписка вызвала еще один звонок до LegacyReactiveWrapper.Connect. Я думал, что идея, стоящая за Publish/RefCount, была «первым подключением триггеров, последнее - удаляет соединение».

Я думаю, у меня есть три вопроса:

  1. ли я в корне неправильно Publish/RefCount?
  2. Есть ли какой-то предпочтительный способ реализовать IConnectableObservable<T>, который не включает делегирование одному из полученных через IObservable<T>.Publish? Я знаю, что вы не должны сами реализовать IObservable<T>, но я не понимаю, как ввести логику подключения в IConnectableObservable<T>, которую дает Observable.Create().Publish(). Connect должен быть идемпотент?
  3. Кто-нибудь, более знакомый с RX/реактивным программированием, смотрит на образец того, как используется обертка, и говорит «это уродливо и сломано», или это не так странно, как кажется?

ответ

2

Я не уверен, что вам нужно разоблачить Connect напрямую, как есть. Я упростил бы следующим образом, используя Publish().RefCount() в качестве инкапсулированной детали реализации; это приведет к тому, что устаревшее соединение будет создано только по мере необходимости. Здесь первый абонент вызывает подключение, а последний - отключает. Также обратите внимание, что это правильно разделяет один RefCount для всех подписчиков, тогда как ваша реализация использует RefCount для каждого типа сообщения, что, вероятно, не было тем, что было предназначено. Пользователи не требуется для подключения в явном виде:

public class LegacyReactiveWrapper 
{ 
    private IObservable<TopLevelMessage> _legacyRx; 

    public LegacyReactiveWrapper(LegacyType legacy) 
    { 
     _legacyRx = WrapLegacy(legacy).Publish().RefCount(); 
    } 

    private static IObservable<TopLevelMessage> WrapLegacy(LegacyType legacy) 
    { 
     return Observable.Create<TopLevelMessage>(observer => 
     { 
      LegacyTopLevelMessageHandler tlmHandler = (sender, tlm) => observer.OnNext(tlm); 
      LegacyErrorHandler errHandler = (sender, err) => observer.OnError(new ApplicationException(err.Message)); 
      LegacyCompleteHandler doneHandler = sender => observer.OnCompleted(); 

      legacy.TopLevelMessage += tlmHandler; 
      legacy.Error += errHandler; 
      legacy.Complete += doneHandler; 
      legacy.ConnectToMessageSource(); 

      return Disposable.Create(() => 
      { 
       legacy.DisconnectFromMessageSource(); 
       legacy.TopLevelMessage -= tlmHandler; 
       legacy.Error -= errHandler; 
       legacy.Complete -= doneHandler; 
      }); 
     }); 
    } 

    public IObservable<TopLevelMessage> TopLevelMessage 
    { 
     get 
     { 
      return _legacyRx; 
     } 
    } 

    public IObservable<SubMessageA> MessageA 
    { 
     get 
     { 
      return _legacyRx.Where(tlm => tlm.MessageType == MessageType.MessageA) 
          .Select(tlm => tlm.SubMessageA); 
     } 
    } 

    public IObservable<SubMessageB> MessageB 
    { 
     get 
     { 
      return _legacyRx.Where(tlm => tlm.MessageType == MessageType.MessageB) 
          .Select(tlm => tlm.SubMessageB); 
     } 
    } 
} 

Дополнительное наблюдение состоит в том, что Publish().RefCount() сбросит, лежащую в основе подписки, когда это число подписчиков достигает 0. Обычно я использую только Connect над этим выбором, когда мне нужно, чтобы сохранить подписку, даже если подсчет подписчика на опубликованном источнике падает до нуля (и может снова возвратиться позже). Редко это нужно делать, хотя - только при более дорогостоящем подключении к ресурсу подписки, когда вам это может не понадобиться.

+0

+1 для 'Publish(). RefCount()' как деталь реализации. Слишком много раз я вижу оболочки событий, которые не делают этого по умолчанию. Rx заботится о подписках для вас, поэтому убедитесь, что вы не удваиваете их, добавляя избыточные обработчики событий к своим старым событиям! :) –

1
  1. Ваше понимание не совсем так, но вы, кажется, есть некоторые моменты недопонимания.

    Вы, кажется, считаете, что несколько вызовов RefCount на том же источнике IObservable приведет к общему количеству ссылок. Они не; каждый экземпляр сохраняет свой собственный счет. Таким образом, вы вызываете несколько подписей на _Impl, по одному на звонок для подписки или вызова свойств Message.

    Вы также можете ожидать, что делает _ImplIConnectableObservable каким-то образом вызывают ваш метод Connect называться (так как вы, кажется, удивлены вам необходимо позвонить Connect в вашей потребляющей коде). Все Publish действительно вызывает подписчиков на опубликованный объект (возвращается с.Вызов Publish()), чтобы разделить одну подписку на основной наблюдаемый источник (в данном случае объект, сделанный с вашего вызова Observable.Create).

    Как правило, я вижу, что Publish и RefCount используются немедленно вместе (например, как source.Publish().RefCount()), чтобы получить эффект общей подписки, описанный выше, или сделать холодную видимую горячую, без необходимости звонить Connect, чтобы начать подписку на исходный источник. Однако это зависит от использования того же объекта, который был возвращен из .Publish(). RefCount() для всех подписчиков (как указано выше).

  2. Ваша реализация Connect кажется разумной. Я не знаю никаких рекомендаций относительно того, должен ли Connect быть идемпотентным, но я бы не ожидал, что это так. Если бы вы хотели, чтобы это было, вам просто нужно было бы отслеживать звонки на него, чтобы избавиться от возвращаемого значения, чтобы получить правильный баланс.

    Я не думаю, что вам нужно использовать Publish так, как вы есть, если нет причин, по которым многие обработчики событий привязаны к устаревшему объекту. Если вам это нужно, я бы рекомендовал изменить _Impl на равнину IObservable и следовать за Publish с помощью RefCount.

  3. Ваши объекты MessageA и MessageB могут стать источником путаницы для пользователей, поскольку они возвращают IObservable, но все же требуют вызова Connect на базовом объекте для начала приема сообщений. Я либо изменил бы их на IConnectableObservables, которые каким-то образом делегируют исходное соединение (в этот момент обсуждение идемпотентности становится более актуальным), либо отбрасывают свойства и просто позволяют пользователям делать (довольно простые) проекции сами по себе, когда это необходимо.

+0

Основная проблема заключалась в том, что я неправильно понял, как работает RefCount. Вопрос о идемпотенции был связан с возможным обходным решением, если он работал так, как я работал, а не как это работает. У вас есть действительная точка с 3); Я включил их как для выяснения того, как работает соединение с Publish(), так и потому, что у нас есть много кода, который заботится о «только A» или «только B», несмотря на то, что они появляются в одном потоке. – twon33

Смежные вопросы