У меня есть служба, которая публикует общие сообщения, и я создал наблюдаемый для этих сообщений. Эти сообщения могут содержать что угодно, и разные протоколы могут быть наложены сверху.Реактивные расширения: как я могу скрыть наблюдаемые?
Я хочу добавить второй слой, наблюдаемый для интерпретации конкретного протокола из этих сообщений. Например, сообщения могут иметь тип «обновление», «ошибка» или «полная». Я хочу повторно опубликовать сообщения об обновлении, выбросить ошибку на «ошибки» и завершить последовательность «complete».
Как я могу достичь этого чисто?
Не думаю, что я могу использовать SelectMany
для этого; в то время как селектор может вернуть Observable.Return()
или Observable.Throw()
для первых двух случаев, я не могу закончить от селектора (вызов observer.OnCompleted()
и отменить подписку на лежащий в основе наблюдаемый).
Мне кажется, что я должен использовать Observable.Create()
и подписаться на лежащий в основе наблюдаемый внутри метода подписки. Я сделал это, но реализация кажется мне странной, потому что она не использует стиль функциональной композиции, более распространенный в Rx. Правильно ли это?
public IObservable<Message> InterpretProtocol(IObservable<message> stream)
{
return Observable.Create<Message>(observer =>
{
return stream.Subscribe(message =>
{
switch (ProtocolMessageTypeOf(message))
{
case ProtocolMessageType.Error:
observer.OnError(new InvalidOperationException(message));
break;
case ProtocolMessageType.Complete:
observer.OnCompleted();
break;
default:
observer.OnNext(message);
break;
}
});
});
}