2012-01-28 4 views
4

На странице 88 книги Стивена Toub вStreaming данных

http://www.microsoft.com/download/en/details.aspx?id=19222

Существует код

private BlockingCollection<T> _streamingData = new BlockingCollection<T>(); 
// Parallel.ForEach 
Parallel.ForEach(_streamingData.GetConsumingEnumerable(), 
item => Process(item)); 
// PLINQ 
var q = from item in _streamingData.GetConsumingEnumerable().AsParallel() 
... 
select item; 

Стивен затем упоминает

«, когда передавая результат вызова GetConsumingEnumerable как источник данных для Parallel.ForEach, потоки u sed цикл может быть заблокирован, когда коллекция становится пустой. И заблокированный поток не может быть выпущен Parallel.ForEach обратно в ThreadPool для выхода на пенсию или других целей. Таким образом, с кодом, как показано выше, если есть какие-то периоды времени, когда эта коллекция пуста, количество нитей в процессе, может постоянно расти;»

Я не понимаю, почему число потоков будет расти?

Если коллекция пуста, то не бы BlockingCollection не запрашивать какие-либо дополнительные темы?

Следовательно, вы не должны делать WithDegreeOfParallelism ограничить количество используемых потоков на BlockingCollection

ответ

3

Пул потоков имеет алгоритм альпинизма, который используется для оценки соответствующего количества потоков. Пока добавление потоков увеличивает пропускную способность, пул потоков создаст больше потоков. Предполагается, что некоторые блокировки или IO происходят и пытаются насытить процессор, перейдя по количеству процессоров в системе.

Именно поэтому выполнение ввода-вывода и блокирование потоков потоков потоков может быть опасным.

Вот полностью рабочий пример указанного поведения:

 BlockingCollection<string> _streamingData = new BlockingCollection<string>(); 

     Task.Factory.StartNew(() => 
      { 
       for (int i = 0; i < 100; i++) 
       { 
        _streamingData.Add(i.ToString()); 
        Thread.Sleep(100); 
       } 
      }); 

     new Thread(() => 
      { 
       while (true) 
       { 
        Thread.Sleep(1000); 
        Console.WriteLine("Thread count: " + Process.GetCurrentProcess().Threads.Count); 
       } 
      }).Start(); 

     Parallel.ForEach(_streamingData.GetConsumingEnumerable(), item => 
      { 
      }); 

Я не знаю, почему число потоков продолжает подниматься, хотя не увеличивает пропускную способность. Согласно модели, которую я объяснил, она не будет расти. Но я не знаю, действительно ли моя модель правильна.

Возможно, пул потоков имеет дополнительную эвристику, которая заставляет его порождать потоки, если он вообще не видит прогресса (измеряется в задачах, выполняемых в секунду). Это имело бы смысл, поскольку это, вероятно, предотвратило бы множество тупиков в приложениях. Тупики могут возникать, если важные задачи не могут выполняться, потому что они ждут выхода из существующих задач и создания потоков. Это хорошо известная проблема с пулом потоков.

+0

Но почему бы вам насытить процессор при блокировке? – TheWommies

+0

Пример: Современные твердотельные накопители имеют несколько внутренних путей ввода-вывода. Mine имеет 4. Когда у вас есть 2 ядра, вам по-прежнему нужно 4 потока, чтобы насытить ваш SSD. Алгоритм пула потоков фактически попытался бы создать 4 потока (по крайней мере, если бы он работал правильно). Он будет создавать нити до тех пор, пока не будут созданы 5. Тогда, было бы замечательно, что пятый не увеличил пропускную способность, поэтому он удалил бы пятый поток. – usr

+0

Детали этого процесса не документированы. Но я часто видел это на практике, когда мои задачи выполняли IO. – usr