2010-07-19 5 views
7

System.Collections.Concurrent имеет несколько новых коллекций, которые очень хорошо работают в многопоточных средах. Однако они немного ограничены. Либо они блокируются, пока элемент становится доступным, либо они возвращают default(T) (методы TryXXX).Неблокирующий параллельный сбор?

Мне нужна коллекция, которая является потокобезопасной, но вместо блокировки вызывающего потока он использует обратный вызов, чтобы сообщить мне, что доступен хотя бы один элемент.

Мое настоящее решение - использовать BlockingCollection, но использовать APM с делегатом для получения следующего элемента. Другими словами, я создаю делегата для метода, который Take s из коллекции, и выполняет этот делегат с использованием BeginInvoke.

К сожалению, я должен поддерживать много состояний в своем классе, чтобы выполнить это. Хуже того, класс не является потокобезопасным; он может использоваться только одним потоком. Я обойду край ремонтопригодности, который я бы предпочел не делать.

Я знаю, что есть некоторые библиотеки, которые делают то, что я делаю здесь довольно просто (я считаю, что Reactive Framework является одним из них), но я хотел бы достичь своих целей, не добавляя никаких ссылок за пределы версии 4 рамок.

Есть ли лучшие шаблоны, которые я могу использовать, которые не требуют внешних ссылок, которые достигают моей цели?


Т.Л., д-р:

Существует ли какая-либо модель, удовлетворяющие требования:

«Мне нужно, чтобы сигнализировать коллекцию, я готов к следующему элементу, и есть коллекция выполнить обратный вызов, когда этот следующий элемент прибыл, без блокировки нитей ».

+0

Будет ли это быть потокобезопасным? Что останавливает доступный предмет, недоступный до вызова делегата? И какова ваша общая цель (т. Е. Система очередей)? –

+0

@Adam Хорошая точка в потреблении товара. Делегат берет предмет, удаленный из коллекции. Таким образом, выполнение делегата блокируется до тех пор, пока элемент не будет 'Take'-en из коллекции, и этот элемент является« объектом », переданным EndInvoke. Общая цель немного запутана; по существу я должен простаивать рабочий процесс, пока элемент не станет доступен. Вы не можете заблокировать выполнение рабочего процесса, поэтому просто «Take'-ing item не будет работать как блоки вызовов. Мне нужно создать закладку, а затем передать это расширение. Расширение вызывает делегата, возобновляя закладку в обратном вызове. – Will

+0

К сожалению, у меня мало опыта работы с рабочими процессами - попробуйте добавить эту деталь к своему вопросу, и это может вызвать интерес человека :-) –

ответ

4

Я думаю, у меня есть два возможных решения. Я тоже не очень доволен, но они, по крайней мере, предоставляют разумную альтернативу APM-подходу.

Первый не соответствует вашему требованию не блокирующего потока, но я думаю, что это довольно изящное, потому что вы можете зарегистрировать обратные вызовы, и они будут вызываться в циклическом режиме, но у вас еще есть возможность вызова Take или TryTake как обычно для BlockingCollection. Этот код заставляет обратные вызовы регистрироваться каждый раз, когда запрашивается элемент. Это механизм сигнализации для сбора. Самое приятное в этом подходе заключается в том, что призывы к Take не истощаются, как в моем втором решении.

public class NotifyingBlockingCollection<T> : BlockingCollection<T> 
{ 
    private Thread m_Notifier; 
    private BlockingCollection<Action<T>> m_Callbacks = new BlockingCollection<Action<T>>(); 

    public NotifyingBlockingCollection() 
    { 
     m_Notifier = new Thread(Notify); 
     m_Notifier.IsBackground = true; 
     m_Notifier.Start(); 
    } 

    private void Notify() 
    { 
     while (true) 
     { 
      Action<T> callback = m_Callbacks.Take(); 
      T item = Take(); 
      callback.BeginInvoke(item, null, null); // Transfer to the thread pool. 
     } 
    } 

    public void RegisterForTake(Action<T> callback) 
    { 
     m_Callbacks.Add(callback); 
    } 
} 

Второй выполняет ваши требования без блокировки нити. Обратите внимание, как он передает вызов обратного вызова в пул потоков. Я сделал это, потому что я думаю, что если он будет выполнен синхронно, тогда замки будут удерживаться дольше, что приведет к узким местам Add и RegisterForTake. Я просмотрел его внимательно, и я не думаю, что он может быть заблокирован в режиме реального времени (как элемент, так и обратный вызов доступны, но обратный вызов никогда не выполняется), но вы можете проверить его на себя, чтобы проверить. Единственная проблема здесь в том, что вызов Take стал бы голодным, поскольку обратные вызовы всегда будут иметь приоритет.

public class NotifyingBlockingCollection<T> 
{ 
    private BlockingCollection<T> m_Items = new BlockingCollection<T>(); 
    private Queue<Action<T>> m_Callbacks = new Queue<Action<T>>(); 

    public NotifyingBlockingCollection() 
    { 
    } 

    public void Add(T item) 
    { 
     lock (m_Callbacks) 
     { 
      if (m_Callbacks.Count > 0) 
      { 
       Action<T> callback = m_Callbacks.Dequeue(); 
       callback.BeginInvoke(item, null, null); // Transfer to the thread pool. 
      } 
      else 
      { 
       m_Items.Add(item); 
      } 
     } 
    } 

    public T Take() 
    { 
     return m_Items.Take(); 
    } 

    public void RegisterForTake(Action<T> callback) 
    { 
     lock (m_Callbacks) 
     { 
      T item; 
      if (m_Items.TryTake(out item)) 
      { 
       callback.BeginInvoke(item, null, null); // Transfer to the thread pool. 
      } 
      else 
      { 
       m_Callbacks.Enqueue(callback); 
      } 
     } 
    } 
} 
+0

Спасибо за ответ, но это немного не то, что я ищу. Это то, что я сейчас делаю, но с APM, вставляемым в коллекцию (код, который вы предоставили). Я предполагаю, что суть моей проблемы заключается в том, что APM не соответствует моим требованиям, это просто реализация, которую я использовал. Мои требования требуют шаблона, который дает решение вопроса: «Как я могу сигнализировать коллекцию, что я готов для следующего элемента, и собрать коллекцию обратного вызова, когда этот следующий элемент пришел, без блокировки потоков?» – Will

+0

Я как будто понял, что это не то, что вам нужно. Это интересная проблема. Слишком плохо 'Добавить' не является' виртуальным', иначе вы, возможно, могли бы добавить уведомление там. Возможно, вы можете использовать одну из реализаций блокирующей очереди в качестве отправной точки. Проблема в том, что вы должны быть осторожны с тем, как вы доставляете это уведомление, иначе другой потребитель сначала возьмет этот предмет. Я могу поиграть с этим сегодня, если у меня будет время. Ответьте сами, если вы это выясните. Я не знаю ... вам может быть легче поддразнивать и просто ссылаться на другую библиотеку. –

+0

Уведомление должно содержать следующий элемент и должно контролироваться уведомителем. Возможно, идея этого собрания является ошибочной; только через этот механизм может поставляться следующий предмет, тем самым избегая вопроса о двух наблюдателях, соперничающих за один предмет. Другими словами, один наблюдатель не может использовать механизм A для получения следующего элемента (т. Е. 'T Pop()'), а другой зарегистрирован для обратного вызова. – Will

3

Как насчет этого? (Именование может, вероятно, использовать некоторую работу. И обратите внимание, что это не проверено.)

public class CallbackCollection<T> 
{ 
    // Sychronization object to prevent race conditions. 
    private object _SyncObject = new object(); 

    // A queue for callbacks that are waiting for items. 
    private ConcurrentQueue<Action<T>> _Callbacks = new ConcurrentQueue<Action<T>>(); 

    // A queue for items that are waiting for callbacks. 
    private ConcurrentQueue<T> _Items = new ConcurrentQueue<T>(); 

    public void Add(T item) 
    { 
     Action<T> callback; 
     lock (_SyncObject) 
     { 
      // Try to get a callback. If no callback is available, 
      // then enqueue the item to wait for the next callback 
      // and return. 
      if (!_Callbacks.TryDequeue(out callback)) 
      { 
       _Items.Enqueue(item); 
       return; 
      } 
     } 

     ExecuteCallback(callback, item); 
    } 

    public void TakeAndCallback(Action<T> callback) 
    { 
     T item; 
     lock(_SyncObject) 
     { 
      // Try to get an item. If no item is available, then 
      // enqueue the callback to wait for the next item 
      // and return. 
      if (!_Items.TryDequeue(out item)) 
      { 
       _Callbacks.Enqueue(callback); 
       return; 
      } 
     } 
     ExecuteCallback(callback, item); 
    } 

    private void ExecuteCallback(Action<T> callback, T item) 
    { 
     // Use a new Task to execute the callback so that we don't 
     // execute it on the current thread. 
     Task.Factory.StartNew(() => callback.Invoke(item)); 
    } 
} 
+0

Просто обновил и увидел @ Brian's NotifyingBlockingCollection. Похоже, он и я придумали примерно одно и то же решение одновременно. –

+0

Да, мы определенно думали по тем же линиям здесь, особенно о том, как получить вызов обратного вызова от текущей нити. –

Смежные вопросы