2013-02-22 2 views
1

У меня есть одна наблюдаемая последовательность сообщений. Существует набор подписчиков, которые могут обрабатывать эти сообщения. Каждый абонент имеет приоритет выполнения. Каждое сообщение должно обрабатываться один раз абонентом с наивысшим приоритетом, выбранным из списка подписчиков, подписанных в настоящее время. Подписчики постоянно подписываются/подписываются из последовательности, поэтому мы не знаем количества и приоритетов подписчиков при построении последовательности. Это возможное/жизнеспособное решение с использованием rx?Приоритет вызова подписок

Для иллюстрации:

public class Message 
{ 
    public string Value { get; set; } 
    public bool IsConsumed { get; set; } 
} 

var subject = new Subject<Message>(); 
var sequence = subject.Publish().RefCount(); 

Action<Message, int> subscriber = (m, priority) => 
{ 
    if (!m.IsConsumed) 
    { 
     m.IsConsumed = true; 
     Trace.WriteLine(priority); 
    } 
}; 

var s2 = sequence.Priority(2).Subscribe(m => subscriber(m, 2)); 
var s1 = sequence.Priority(1).Subscribe(m => subscriber(m, 1)); 

subject.OnNext(new Message()); // output: 1 

s1.Dispose(); 
subject.OnNext(new Message()); // output: 2 

Недостающий кусок, чтобы сделать эту работу решением является метод приоритета, который не существует в библиотеке Rx.

ответ

1

Это была очень интересная проблема ...

Итак, во-первых: я не знаю ни собственных операторов Rx, которые могут достичь эффекта «маршрутизации», похожий на то, что вы хотите в этом Priority расширения.

То есть, я играл вокруг в LINQPad за обедом сегодня, и придумал (очень) Hacky доказательство концепции, которая появляется на работу:

Во-первых, ваш класс сообщение

public class Message 
{ 
    public string Value { get; set; } 
    public bool IsConsumed { get; set; } 
} 

Далее, метод расширения обертка-класс:

public static class Ext 
{  
    public static PrioritizedObservable<T> Prioritize<T>(this IObservable<T> source) 
    { 
     return new PrioritizedObservable<T>(source); 
    } 
} 

И что это PrioritizedObservable<T>?

public class PrioritizedObservable<T> 
    : IObservable<T>, IObserver<T>, IDisposable 
{ 
    private IObservable<T> _source; 
    private ISubject<T,T> _intermediary; 
    private IList<Tuple<int, Subject<T>>> _router; 

    public PrioritizedObservable(IObservable<T> source) 
    { 
     // Make sure we don't accidentally duplicate subscriptions 
     // to the underlying source 
     _source = source.Publish().RefCount(); 

     // A proxy from the source to our internal router 
     _intermediary = Subject.Create(this, _source); 
     _source.Subscribe(_intermediary);   

     // Holds per-priority subjects 
     _router = new List<Tuple<int, Subject<T>>>(); 
    } 

    public void Dispose() 
    { 
     _intermediary = null; 
     foreach(var entry in _router) 
     { 
      entry.Item2.Dispose(); 
     } 
     _router.Clear(); 
    } 

    private ISubject<T,T> GetFirstListener() 
    { 
     // Fetch the first subject in our router 
     // ordered by priority 
     return _router.OrderBy(tup => tup.Item1) 
      .Select(tup => tup.Item2) 
      .FirstOrDefault(); 
    } 

    void IObserver<T>.OnNext(T value) 
    { 
     // pass along value to first in line 
     var nextListener = GetFirstListener(); 
     if(nextListener != null) 
      nextListener.OnNext(value); 
    } 

    void IObserver<T>.OnError(Exception error) 
    { 
     // pass along error to first in line 
     var nextListener = GetFirstListener(); 
     if(nextListener != null) 
      nextListener.OnError(error); 
    } 

    void IObserver<T>.OnCompleted() 
    { 
     var nextListener = GetFirstListener(); 
     if(nextListener != null) 
      nextListener.OnCompleted(); 
    } 

    public IDisposable Subscribe(IObserver<T> obs) 
    { 
     return PrioritySubscribe(1, obs); 
    } 

    public IDisposable PrioritySubscribe(int priority, IObserver<T> obs) 
    { 
     var sub = new Subject<T>(); 
     var subscriber = sub.Subscribe(obs); 
     var entry = Tuple.Create(priority, sub); 
     _router.Add(entry); 
     _intermediary.Subscribe(sub); 
     return Disposable.Create(() => 
     { 
      subscriber.Dispose(); 
      _router.Remove(entry); 
     }); 
    } 
} 

И тест Жгут:

void Main() 
{ 
    var subject = new Subject<Message>(); 
    var sequence = subject.Publish().RefCount().Prioritize(); 

    Action<Message, int> subscriber = (m, priority) => 
    { 
     if (!m.IsConsumed) 
     { 
      m.IsConsumed = true; 
      Console.WriteLine(priority); 
     } 
    }; 

    var s3 = sequence.PrioritySubscribe(3, Observer.Create<Message>(m => subscriber(m, 3))); 
    var s2 = sequence.PrioritySubscribe(2, Observer.Create<Message>(m => subscriber(m, 2))); 
    var s1 = sequence.PrioritySubscribe(1, Observer.Create<Message>(m => subscriber(m, 1))); 
    var s11 = sequence.PrioritySubscribe(1, Observer.Create<Message>(m => subscriber(m, 1))); 

    subject.OnNext(new Message()); // output: 1 

    s1.Dispose(); 
    subject.OnNext(new Message()); // output: 1 
    s11.Dispose(); 

    subject.OnNext(new Message()); // output: 2 
    s2.Dispose(); 
    subject.OnNext(new Message()); // output: 3 

    sequence.Dispose(); 

} 
+0

Ты гений! Большое спасибо! – andriys

Смежные вопросы