2012-02-08 2 views
2

Я использую BlockingCollection для обработки некоторых файлов и последующей загрузки их на сервер.Множественное взаимодействие между производителем и потребителем в .Net 4.0

Сейчас у меня есть один продюсер, который рекурсирует файловую систему и сжимает определенные файлы во временное место. По завершении работы с файлом он добавляет мой собственный объект в BlockingCollection, который содержит информацию, такую ​​как имя файла, путь к файлу, дата изменения и т. Д. Затем потребитель захватывает этот объект и использует его для загрузки файла. Когда Продюсер завершил поиск файловой системы и работу над файлами, он вызывает метод BlockingCollection.CompleteAdding(), чтобы сигнализировать потребителю, что он закончил.

Что я хотел бы сделать, это увеличить количество производителей до 2 или более. Причина в том, что процесс сжатия занимает некоторое время, а на многоядерных процессорах я использую только 1 ядро. Это заставляет производителя иногда отставать от Потребителя в более быстрых сетях.

Мой вопрос: когда у меня есть несколько продюсеров и только один покупатель, как я могу сигнализировать потребителю о том, что все продюсеры закончили свою работу? Если я вызываю метод BlockingCollection.CompleteAdding() на одном из Продюсеров, я все еще могу работать с одним или несколькими другими производителями.

+3

Возможно, было бы более выгодно разгрузить компрессию потребителю и сделать производитель максимально тонким. –

+1

Согласитесь с Babcock, рекурсивное дерево деревьев в апралеле будет только забивать файловую систему. Лучше добавить файлы, которые вы хотите обработать в очередь, а затем сжать и отправить параллельно. Следите за пропускной способностью жесткого диска. Одновременное чтение в местах с несколькими местами приведет к перемещению головок по диску в разные сектора. В это время он не будет считывать данные и, следовательно, снижает пропускную способность диска. В идеале вы все равно читаете по 1 потоку, читаете каждый файл в байт [], а затем сжимаете и отправляете их параллельно. – gjvdkamp

+0

Извините, я должен был быть более ясным в своем оригинальном посте. Существует только один поток рекурсии, который будет загружать 2 или более задачи сжатия (Продюсер), которые затем будут загружать одну задачу «Потребитель». Я использую высокое сжатие, которое занимает довольно много времени даже на быстрых процессорах. Запись не более 2 МБ данных в секунду на поток. – forcedfx

ответ

1

Вы можете использовать семафор в своем коде Producer перед вызовом BlockingCollection.CompleteAdding(). Семафор разделяется всеми экземплярами Producer, когда последний продюсер закончил, он может вызвать метод. Семафор может быть реализован как простой счетчик, увеличивать счетчик при создании Продюсера, уменьшать его, когда ваш продюсер заканчивает работу. Если счетчик достигнет нуля, можно вызвать BlockingCollection.CompleteAdding().

0

Я использую что-то подобное, чтобы иметь несколько производителей и потребителей. Это просто очень простое решение, не оптимизированное для производственного кода.

public class ManageBatchProcessing 
{ 
    private BlockingCollection<Action> blockingCollection; 

    public void Process() 
    { 
     blockingCollection = new BlockingCollection<Action>(); 
     int numberOfBatches = 10; 
     Process(HandleProducers, HandleConsumers, numberOfBatches); 
    } 

    private void Process(Action<int> produce, Action<int> consume, int numberOfBatches) 
    { 
     produce(numberOfBatches); 
     consume(numberOfBatches); 
    } 

    private void HandleConsumers(int numberOfBatches) 
    { 
     var consumers = new List<Task>(); 

     for (var i = 1; i <= numberOfBatches; i++) 
     { 
      consumers.Add(Task.Factory.StartNew(() => 
      { 
       foreach (var action in blockingCollection.GetConsumingEnumerable()) 
       { 
        action(); 
       } 
      })); 
     } 

     Task.WaitAll(consumers.ToArray()); 
    } 

    private void HandleProducers(int numberOfBatches) 
    { 
     var producers = new List<Task>(); 

     for (var i = 0; i <= numberOfBatches; i++) 
     { 
      producers.Add(Task.Factory.StartNew(() => 
      { 
       blockingCollection.Add(() => YourProdcerMethod()); 
      })); 
     } 

     Task.WaitAll(producers.ToArray()); 
     blockingCollection.CompleteAdding(); 
    } 
} 
Смежные вопросы