2015-05-29 2 views
3

У меня есть ConcurrentStack, в который я вбрасываю элементы. Каков хороший способ обрабатывать эти элементы по одному за раз, когда стек не пуст? Я бы хотел сделать это так, чтобы не переваривать циклы процессора, когда стек не обрабатывается.Обработать ConcurrentStack, когда он не пуст?

То, что у меня есть в настоящее время, в основном такое, и это не похоже на идеальное решение.

private void AddToStack(MyObj obj) 
{ 
    stack.Push(obj); 
    HandleStack(); 
} 

private void HandleStack() 
{ 
    if (handling) 
     return; 

    Task.Run(() => 
    { 
     lock (lockObj) 
     { 
      handling = true; 
      if (stack.Any()) 
      { 
       //handle whatever is on top of the stack 
      } 
      handling = false; 
     } 
    } 
} 

Таким образом, bool существует, поэтому несколько потоков не получают резервное копирование, ожидая блокировки. Но я не хочу, чтобы несколько вещей обрабатывали стек сразу, следовательно, блокировку. Поэтому, если два отдельных потока в конечном итоге вызывают HandleStack одновременно и проходят мимо bool, блокировка существует, поэтому оба они не повторяются через стек сразу. Но как только второй пройдет через замок, стек будет пустым и ничего не сделает. Так что это в конечном итоге дает мне поведение, которое я хочу.

Так что я просто пишу псевдо-параллельную оболочку вокруг ConcurrentStack, и похоже, что для этого нужно по-другому. Мысли?

+0

Вам действительно нужен каждый элемент, который нужно обрабатывать последовательно? Если они могут обрабатываться одновременно, просто делегируйте threadpool, который будет эффективно обрабатывать свою рабочую очередь. –

+0

Вам не нужен замок. Это ConcurrentStack, он модифицируется несколькими потоками. Если вы действительно хотите заблокировать во время ожидания, используйте BlockingCollection. По умолчанию он использует ConcurrentQueue, но вы можете указать другую параллельную коллекцию, например ConcurrentStack –

+0

@PanagiotisKanavos. Я знаю, что она модифицирована несколькими потоками. Я хочу добавить к нему несколько потоков, но только один из них.Вот почему я запираюсь вокруг поп (обфускации позади «// обрабатывают все ...»), а не вокруг толчка. – claudekennilol

ответ

2

ConcurrentStack<T> является одной из коллекций, которые реализует IProducerConsumerCollection<T>, и как таковые могут быть обернут BlockingCollection<T>. BlockingCollection<T> имеет несколько членов удобства для обычных операций, таких как «потреблять, пока стек не пуст». Например, вы можете вызвать TryTake в цикле. Или, вы могли бы просто использовать GetConsumingEnumerable:

private BlockingCollection<MyObj> stack; 
private Task consumer; 

Constructor() 
{ 
    stack = new BlockingCollection<MyObj>(new ConcurrentStack<MyObj>()); 
    consumer = Task.Run(() => 
    { 
    foreach (var myObj in stack.GetConsumingEnumerable()) 
    { 
     ... 
    } 
    }); 
} 

private void AddToStack(MyObj obj) 
{ 
    stack.Add(obj); 
} 
+0

Спасибо, это определенно кажется тем, что мне нужно. Я смущен тем, как работает потребительская задача. Я выбрал против TryPop в цикле, потому что мне не нужны константные циклы процессора. Каким образом GetConsumingEnumerable отличается от этого (т. Е. Что происходит, когда стек пуст/что запускает его, когда что-то добавляется)? – claudekennilol

+1

Оба «TryPop» и «MoveNext» (при вызове перечислимого пользователя) блокируют вызывающий поток, пока базовая коллекция пуста. Я не знаю точную реализацию, но это логически монитор. Таким образом, заблокированные потоки находятся в состоянии ожидания и не потребляют процессор. –

2

Похоже, вы хотите типичного потребителя-производителя.

Я бы рекомендовал использовать в AutoResetEvent

иметь ваш потребительское ожидание, когда стек пуст. Набор вызовов при вызове метода производителя.

Читать эту тему

Fast and Best Producer/consumer queue technique BlockingCollection vs concurrent Queue

+1

Разве это не то, что BlockingCollection делает? В чем разница блокировки BlockingCollection и блокировки на AutoResetEvent? –

+0

@PanagiotisKanavos Я на самом деле не предлагал использовать их вместе, но я вижу, где у вас такое впечатление. Мой ответ был немного неуклюжим. –

+1

Вы неправильно поняли. Зачем использовать AutoResetEvent с ConcurrentStack, когда вы можете просто использовать BlockingCollection? –

3

Вы могли бы рассмотреть вопрос об использовании Microsoft TPL Dataflow делать такого рода вещи.

Вот простой пример, показывающий, как создать очередь. Попробуйте и поиграйте с настройками для MaxDegreeOfParallelism и BoundedCapacity, чтобы узнать, что произойдет.

Для вашего примера, я думаю, вы захотите установить MaxDegreeOfParallelism на 1, если вы не хотите, чтобы несколько потоков обрабатывали элемент данных одновременно.

(Примечание. Вы должны использовать .Net 4.5x и установить TPL DataFlow для проекта с использованием NuGet)

также прочитанный из Stephen Cleary's blog about TPL.

using System; 
using System.Threading; 
using System.Threading.Tasks; 
using System.Threading.Tasks.Dataflow; 

namespace SimpleTPL 
{ 
    class MyObj 
    { 
     public MyObj(string data) 
     { 
      Data = data; 
     } 

     public readonly string Data; 
    } 

    class Program 
    { 
     static void Main() 
     { 
      var queue = new ActionBlock<MyObj>(data => process(data), actionBlockOptions()); 
      var task = queueData(queue); 

      Console.WriteLine("Waiting for task to complete."); 
      task.Wait(); 
      Console.WriteLine("Completed."); 
     } 

     private static void process(MyObj data) 
     { 
      Console.WriteLine("Processing data " + data.Data); 
      Thread.Sleep(200); // Simulate load. 
     } 

     private static async Task queueData(ActionBlock<MyObj> executor) 
     { 
      for (int i = 0; i < 20; ++i) 
      { 
       Console.WriteLine("Queuing data " + i); 
       MyObj data = new MyObj(i.ToString()); 

       await executor.SendAsync(data); 
      } 

      Console.WriteLine("Indicating that no more data will be queued."); 

      executor.Complete(); // Indicate that no more items will be queued. 

      Console.WriteLine("Waiting for queue to empty."); 

      await executor.Completion; // Wait for executor queue to empty. 
     } 

     private static ExecutionDataflowBlockOptions actionBlockOptions() 
     { 
      return new ExecutionDataflowBlockOptions 
      { 
       MaxDegreeOfParallelism = 4, 
       BoundedCapacity  = 8 
      }; 
     } 
    } 
} 
Смежные вопросы