2016-06-11 2 views
2

У меня есть ConcurrentQueue со списком URL-адресов, которые мне нужны для получения источника. При использовании Parallel.ForEach с объектом ConcurrentQueue в качестве входного параметра метод Pop не будет работать ничего (должен возвращать строку).ConcurrentQueue и Parallel.ForEach

Я использую Параллельно с MaxDegreeOfParallelism, установленным в четыре. Мне действительно нужно блокировать количество параллельных потоков. Является ли использование очереди с параллелизмом избыточным?

Заранее спасибо.

// On the main class 
var items = await engine.FetchPageWithNumberItems(result); 
// Enqueue List of items 
itemQueue.EnqueueList(items); 
var crawl = Task.Run(() => { engine.CrawlItems(itemQueue); }); 

// On the Engine class 
public void CrawlItems(ItemQueue itemQueue) 
{ 
Parallel.ForEach(
      itemQueue, 
      new ParallelOptions {MaxDegreeOfParallelism = 4}, 
      item => 
      { 

       var worker = new Worker(); 
       // Pop doesn't return anything 
       worker.Url = itemQueue.Pop(); 
       /* Some work */ 
      }); 
} 

// Item Queue 
class ItemQueue : ConcurrentQueue<string> 
    { 
     private ConcurrentQueue<string> queue = new ConcurrentQueue<string>(); 

     public string Pop() 
     { 
      string value = String.Empty; 
      if(this.queue.Count == 0) 
       throw new Exception(); 
      this.queue.TryDequeue(out value); 
      return value; 
     } 

     public void Push(string item) 
     { 
      this.queue.Enqueue(item); 
     } 

     public void EnqueueList(List<string> list) 
     { 
      list.ForEach(this.queue.Enqueue); 
     } 
    } 
+1

доля вашего прогресса ... –

+1

ItemQueue не порождён 'ConcurrentQueue' и содержат' ConcurrentQueue', выбрать один. –

+0

@Zroq: Поскольку загрузка URL-адреса является операцией с привязкой к I/O, я должен указать, что параллелизм - неправильный инструмент для использования. Асинхронный параллелизм будет использовать гораздо меньше ресурсов и быть таким же быстрым. –

ответ

0

Проблема заключается методом CrawlItems, так как вы не должны вызывать поп в действии, предоставляемой методу ForEach. Причина в том, что действие вызывается для каждого всплывающего элемента, поэтому элемент уже выскочил. Именно по этой причине действие имеет аргумент «item».

Я предполагаю, что вы получаете нуль, поскольку все элементы, которые уже были вырезаны другими потоками, методом ForEach.

Таким образом, ваш код должен выглядеть следующим образом:

public void CrawlItems(ItemQueue itemQueue) 
{ 
    Parallel.ForEach(
     itemQueue, 
     new ParallelOptions {MaxDegreeOfParallelism = 4}, 
     item => 
     { 
      worker.Url = item; 
      /* Some work */ 
     }); 
} 
+0

После itemQueue.EnqueueList (предметы); Я проверил, что очередь trully имеет 41 элемент. – Zroq

+0

Не могли бы вы поделиться им, если ItemQueue? – yonisha

+0

Я отредактировал ответ; – Zroq

2

Вам не нужно ConcurrentQueue<T>, если все, что вы собираетесь сделать, это первый добавить в нее элементы из одного потока, а затем повторять его в Parallel.ForEach(). Для этого достаточно обычного List<T>.

Кроме того, ваша реализация ItemQueue очень подозрительно:

  • Он наследует от ConcurrentQueue<string>, а также содержит другую ConcurrentQueue<string>. Это не имеет большого смысла, сбивает с толку и неэффективно.

  • Методы на ConcurrentQueue<T> были разработаны очень тщательно, чтобы быть потокобезопасными. Ваш Pop() не является потокобезопасным. Что может случиться, так это то, что вы проверяете Count, обратите внимание, что это 1, затем наберите TryDequeue() и не получите никакого значения (то есть value будет null), потому что другой поток удалил элемент из очереди за время между двумя вызовами.

Смежные вопросы