2013-07-03 2 views
1

У меня есть служба, которая публикует общие сообщения, и я создал наблюдаемый для этих сообщений. Эти сообщения могут содержать что угодно, и разные протоколы могут быть наложены сверху.Реактивные расширения: как я могу скрыть наблюдаемые?

Я хочу добавить второй слой, наблюдаемый для интерпретации конкретного протокола из этих сообщений. Например, сообщения могут иметь тип «обновление», «ошибка» или «полная». Я хочу повторно опубликовать сообщения об обновлении, выбросить ошибку на «ошибки» и завершить последовательность «complete».

Как я могу достичь этого чисто?

Не думаю, что я могу использовать SelectMany для этого; в то время как селектор может вернуть Observable.Return() или Observable.Throw() для первых двух случаев, я не могу закончить от селектора (вызов observer.OnCompleted() и отменить подписку на лежащий в основе наблюдаемый).

Мне кажется, что я должен использовать Observable.Create() и подписаться на лежащий в основе наблюдаемый внутри метода подписки. Я сделал это, но реализация кажется мне странной, потому что она не использует стиль функциональной композиции, более распространенный в Rx. Правильно ли это?

public IObservable<Message> InterpretProtocol(IObservable<message> stream) 
{ 
    return Observable.Create<Message>(observer => 
    { 
    return stream.Subscribe(message => 
    { 
     switch (ProtocolMessageTypeOf(message)) 
     { 
     case ProtocolMessageType.Error: 
      observer.OnError(new InvalidOperationException(message)); 
      break; 

     case ProtocolMessageType.Complete: 
      observer.OnCompleted(); 
      break; 

     default: 
      observer.OnNext(message); 
      break; 
     } 
    }); 
    }); 
} 

ответ

2

Вы можете попробовать что-то вроде:

public IObservable<Message> InterpretProtocol(IObservable<message> stream) { 

    return stream. 
     TakeWhile(msg => ProtocolMessageTypeOf(message) != ProtocolMessageType.Complete). 
     Select(msg => { 
      if(ProtocolMessageTypeOf(message) == ProtocolMessageType.Error) 
       throw new InvalidOperationException(message); 
      else 
       return msg; 
     }); 

}