2015-04-10 5 views
10

Можно ли использовать ReactiveExtensions для достижения следующих целей:Слияние двух наблюдаемых с одним приоритетом

  • Два Наблюдаемые, один, который является «высокий» приоритет, а другой «Low»

  • Объединение обоих Наблюдаемые в один, который затем может быть подписан, с намерением, что в результате Наблюдаемые всегда будет испускать приоритетные позиции перед любыми низкоприоритетными.

Я понимаю, что это может быть более тривиальным реализовано с использованием двух ConcurrentQueue коллекций и что-то вроде этого;

return this.highPriorityItems.TryDequeue(out item) 
    || this.lowPriorityItems.TryDequeue(out item); 

Но этот подход имеет такие проблемы, как не «subscribable» в той же манере, что Наблюдаемый будет (так как только очереди истощаются, обработка закончится без большого количества дополнительной пустой болтовни, чтобы подтолкнуть эту опцию в Задача).

Кроме того, мне было бы интересно применить некоторую дополнительную фильтрацию в очередях, например, дросселирование и «отличную до изменения», поэтому Rx выглядит естественным образом.

+0

Таким образом, «низкие» наблюдаемые руки (независимо от этого нового фрагмента кода) имеют значение. Что это значит для этого кода? Подождите, чтобы увидеть, сможет ли «высокий» наблюдаемый когда-либо вырабатывать другое значение? Я изо всех сил пытаюсь понять, как приоритеты могут когда-либо работать, и просто продолжайте возвращаться к использованию «Слияния» и игнорируя приоритеты. –

+0

@Damien_The_Unbeliever, идея в основном «Merge», но с первоочередными пунктами пропускается на передний план. Имеет ли это смысл? Я также хочу обрабатывать оба источника элементов таким же образом, следовательно, слияние и подписку, которые не будут знать о приоритете. Но это порядок, в котором они прибывают, на которые я сосредоточен, - представьте, что у меня есть 20 предметов с низким приоритетом, и вы получаете новый высокий приоритет. Я хочу, чтобы это было выпущено дальше, а не после 20 других предметов. –

+0

Но вы видите только один пункт за раз. Если «низкий» приоритет прошел пять пунктов до сих пор и * намеревается * отправить вам еще 15, что из этого? И вы, конечно же, не можете обнаружить этот факт исключительно из наблюдаемых. –

ответ

4

То, что вы описываете, является, конечно, приоритетной.

Rx все о потоки событий, а не очередей. Конечно, очереди много использовали в Rx - но они не являются концепцией первого класса, большей частью частью реализации концепций Rx.

Хороший пример того, где нам нужны очереди, - это иметь дело с медленным наблюдателем. События отправляются последовательно в Rx, и если события прибывают быстрее, чем наблюдатель может справиться с ними, тогда они должны быть поставлены в очередь против этого наблюдателя. Если есть много наблюдателей, необходимо поддерживать несколько логических очередей, поскольку наблюдатели могут прогрессировать в разных шагах - и Rx предпочитает не держать их в блокировке.

«Противодавление» представляет собой концепцию наблюдателей, обеспечивающих обратную связь с наблюдаемыми, чтобы позволить механизмам выдерживать давление более быстрого наблюдения - например, слияние или дросселирование. Rx не имеет первоклассного способа введения противодавления - единственное встроенное средство, которое наблюдает наблюдательный наблюдатель, имеет синхронный характер OnNext. Любой другой механизм должен быть вне диапазона. Ваш вопрос напрямую связан с противодавлением, поскольку он применим только в случае медленного наблюдателя.

Я упоминаю все это, чтобы предоставить доказательства моей претензии о том, что Rx не является отличным выбором для обеспечения такой приоритетной отправки, которую вы ищете - действительно, первоклассный механизм очередей кажется более подходящим.

Чтобы решить эту проблему, вам необходимо самостоятельно управлять очередью приоритетов в пользовательском операторе. Чтобы переформулировать проблему: вы говорите, что если события происходят во время обработки наблюдателем события OnNext, так что существует рассылка событий для отправки, то вместо типичной очереди FIFO, которую использует Rx, вы хотите отправка на основе некоторого приоритета.

Что-то примечание состоит в том, что в духе того, как Rx не удерживает несколько наблюдателей в режиме блокировки, параллельные наблюдатели потенциально могут видеть события в другом порядке, что может быть или не быть проблемой для вас. Вы можете использовать такой механизм, как Publish, чтобы получить согласованность заказов - но вы, вероятно, не хотите этого делать, поскольку время доставки событий в этом сценарии будет весьма непредсказуемым и неэффективным.

Я уверен, что есть лучшие способы сделать это, но вот пример доставки на основе приоритета - вы можете расширить его для работы с несколькими потоками и приоритетами (или даже с приоритетами для каждого события), используя (например, очередь приоритетов на основе b-дерева), но я решил сохранить это довольно просто. Даже тогда обратите внимание на значительное количество проблем, которые должен решить код: обработка ошибок, завершение и т. Д. - и я сделал выбор в отношении того, когда они сигнализируют о том, что, безусловно, есть много других допустимых вариантов.

Всеобъемлющее, эта реализация конечно ставит меня с идеей использования Rx для этого. Это достаточно сложно, что, вероятно, есть ошибки. Как я уже говорил, может быть аккуратнее код для этого (особенно учитывая минимальное усилие я поставил в него!), Но концептуально, я некомфортно с идеей, независимо от реализации:

public static class ObservableExtensions 
{ 
    public static IObservable<TSource> MergeWithLowPriorityStream<TSource>(
     this IObservable<TSource> source, 
     IObservable<TSource> lowPriority, 
     IScheduler scheduler = null) 
    {  
     scheduler = scheduler ?? Scheduler.Default; 
     return Observable.Create<TSource>(o => {  
      // BufferBlock from TPL dataflow is used as it is 
      // handily awaitable. package: Microsoft.Tpl.Dataflow   
      var loQueue = new BufferBlock<TSource>(); 
      var hiQueue = new BufferBlock<TSource>(); 
      var errorQueue = new BufferBlock<Exception>(); 
      var done = new TaskCompletionSource<int>(); 
      int doneCount = 0; 
      Action incDone =() => { 
       var dc = Interlocked.Increment(ref doneCount); 
       if(dc == 2) 
        done.SetResult(0); 
      }; 
      source.Subscribe(
       x => hiQueue.Post(x), 
       e => errorQueue.Post(e), 
       incDone); 
      lowPriority.Subscribe(
       x => loQueue.Post(x), 
       e => errorQueue.Post(e), 
       incDone); 
      return scheduler.ScheduleAsync(async(ctrl, ct) => { 
       while(!ct.IsCancellationRequested) 
       { 
        TSource nextItem; 
        if(hiQueue.TryReceive(out nextItem) 
         || loQueue.TryReceive(out nextItem)) 
         o.OnNext(nextItem); 

        else if(done.Task.IsCompleted) 
        { 
         o.OnCompleted(); 
         return; 
        } 

        Exception error;       
        if(errorQueue.TryReceive(out error)) 
        { 
         o.OnError(error); 
         return; 
        } 

        var hiAvailableAsync = hiQueue.OutputAvailableAsync(ct);  
        var loAvailableAsync = loQueue.OutputAvailableAsync(ct);      
        var errAvailableAsync = 
         errorQueue.OutputAvailableAsync(ct); 
        await Task.WhenAny(
         hiAvailableAsync, 
         loAvailableAsync, 
         errAvailableAsync, 
         done.Task); 
       } 
      }); 
     }); 
    } 
} 

И пример использования:

void static Main() 
{ 
    var xs = Observable.Range(0, 3); 
    var ys = Observable.Range(10, 3); 

    var source = ys.MergeWithLowPriorityStream(xs); 

    source.Subscribe(Console.WriteLine,() => Console.WriteLine("Done")); 
} 

Это напечатает элементы ys во-первых, что указывает на их более высокий приоритет.

+0

Спасибо, Джеймс, ваше объяснение немного расширило мои взгляды на реальную цель и способности Rx. Вы правы, конечно, Rx-решение пытается поместить квадратную привязку в круглое отверстие. Я внезапно почувствую себя вынужденным снова прочитать IntroToRx от начала до конца! Еще раз спасибо за то, что нашли время, чтобы предложить этот ответ, это очень ценится. –

1

Для решения этой проблемы необходимо учитывать время. В приведенном выше комментарии вы говорите об уведомлениях пользователей. Мне кажется, что вы хотите сказать, это что-то вроде этого: отобразите последнее уведомление, если нет уведомления о высоком приоритете, в этом случае покажите это.

Диаграммы пузырей облегчат рассуждение об этом. Один символ - одна секунда:

High : ---------3---5-6 
Low : 1--2-------4---- 
Result: 1--2-----3---5-6 

Это то, что вы имели в виду? Вы хотите буферизовать сообщения и отображать их позже? Как в этом случае, нормально ли, что сообщение 5 будет отображаться только в течение 2 секунд?

Смежные вопросы