У меня есть объект, основанный на событиях, который кажется идеально подходящим для RX: после подключения к сетевому источнику он вызывает события при получении сообщения и может заканчиваться либо с одной ошибкой (соединительные матрицы и т. д.) или (редко) указание на то, что сообщений больше не будет. Этот объект также имеет пару прогнозов - большинство пользователей интересуются только подмножеством полученных сообщений, поэтому появляются альтернативные события, возникающие только при появлении известных подтипов сообщений.Обтекание устаревшего объекта в IConnectableObservable
Таким образом, в процессе обучения больше о реактивном программировании, я построил следующую обертку:
class LegacyReactiveWrapper : IConnectableObservable<TopLevelMessage>
{
private LegacyType _Legacy;
private IConnectableObservable<TopLevelMessage> _Impl;
public LegacyReactiveWrapper(LegacyType t)
{
_Legacy = t;
var observable = Observable.Create<TopLevelMessage>((observer) =>
{
LegacyTopLevelMessageHandler tlmHandler = (sender, tlm) => observer.OnNext(tlm);
LegacyErrorHandler errHandler = (sender, err) => observer.OnError(new ApplicationException(err.Message));
LegacyCompleteHandler doneHandler = (sender) => observer.OnCompleted();
_Legacy.TopLevelMessage += tlmHandler;
_Legacy.Error += errHandler;
_Legacy.Complete += doneHandler;
return Disposable.Create(() =>
{
_Legacy.TopLevelMessage -= tlmHandler;
_Legacy.Error -= errHandler;
_Legacy.Complete -= doneHandler;
});
});
_Impl = observable.Publish();
}
public IDisposable Subscribe(IObserver<TopLevelMessage> observer)
{
return _Impl.RefCount().Subscribe(observer);
}
public IDisposable Connect()
{
_Legacy.ConnectToMessageSource();
return Disposable.Create(() => _Legacy.DisconnectFromMessageSource());
}
public IObservable<SubMessageA> MessageA
{
get
{
// This is the moral equivalent of the projection behavior
// that already exists in the legacy type. We don't hook
// the LegacyType.MessageA event directly.
return _Impl.RefCount()
.Where((tlm) => tlm.MessageType == MessageType.MessageA)
.Select((tlm) => tlm.SubMessageA);
}
}
public IObservable<SubMessageB> MessageB
{
get
{
return _Impl.RefCount()
.Where((tlm) => tlm.MessageType == MessageType.MessageB)
.Select((tlm) => tlm.SubMessageB);
}
}
}
Кое-что о том, как он используется в другом месте чувствует ... от ... как-то, хотя. Вот пример использования, который работает, но кажется странным. Контекст пользовательского интерфейса для моего тестового приложения - это WinForms, но это не имеет большого значения.
// in Program.Main...
MainForm frm = new MainForm();
// Updates the UI based on a stream of SubMessageA's
IObserver<SubMessageA> uiManager = new MainFormUiManager(frm);
LegacyType lt = new LegacyType();
// ... setup lt...
var w = new LegacyReactiveWrapper(lt);
var uiUpdateSubscription = (from msgA in w.MessageA
where SomeCondition(msgA)
select msgA).ObserveOn(frm).Subscribe(uiManager);
var nonUiSubscription = (from msgB in w.MessageB
where msgB.SubType == MessageBType.SomeSubType
select msgB).Subscribe(
m => Console.WriteLine("Got MsgB: {0}", m),
ex => Console.WriteLine("MsgB error: {0}", ex.Message),
() => Console.WriteLine("MsgB complete")
);
IDisposable unsubscribeAtExit = null;
frm.Load += (sender, e) =>
{
var connectionSubscription = w.Connect();
unsubscribeAtExit = new CompositeDisposable(
uiUpdateSubscription,
nonUiSubscription,
connectionSubscription);
};
frm.FormClosing += (sender, e) =>
{
if(unsubscribeAtExit != null) { unsubscribeAtExit.Dispose(); }
};
Application.Run(frm);
Это работает - форма запуски, обновление пользовательского интерфейса, и когда я закрываю его подписки очищаются и процесс завершается (что он не будет делать, если подключение к сети в LegacyType по-прежнему подключено). Строго говоря, достаточно просто распорядиться connectionSubscription
. Однако явное выражение Connect
кажется мне странным. Так как RefCount
предполагается сделать это для вас, я попытался модифицировать обертку таким образом, что вместо использования _Impl.RefCount
в MessageA
и MessageB
и явно подключиться позже, я использовал this.RefCount
вместо этого и переехал вызовы Subscribe
в Load
обработчика. У этой проблемы была другая проблема: вторая подписка вызвала еще один звонок до LegacyReactiveWrapper.Connect
. Я думал, что идея, стоящая за Publish
/RefCount
, была «первым подключением триггеров, последнее - удаляет соединение».
Я думаю, у меня есть три вопроса:
- ли я в корне неправильно
Publish
/RefCount
? - Есть ли какой-то предпочтительный способ реализовать
IConnectableObservable<T>
, который не включает делегирование одному из полученных черезIObservable<T>.Publish
? Я знаю, что вы не должны сами реализоватьIObservable<T>
, но я не понимаю, как ввести логику подключения вIConnectableObservable<T>
, которую даетObservable.Create().Publish()
.Connect
должен быть идемпотент? - Кто-нибудь, более знакомый с RX/реактивным программированием, смотрит на образец того, как используется обертка, и говорит «это уродливо и сломано», или это не так странно, как кажется?
+1 для 'Publish(). RefCount()' как деталь реализации. Слишком много раз я вижу оболочки событий, которые не делают этого по умолчанию. Rx заботится о подписках для вас, поэтому убедитесь, что вы не удваиваете их, добавляя избыточные обработчики событий к своим старым событиям! :) –