2013-02-28 2 views
52

Я хотел бы обрабатывать коллекцию параллельно, но у меня возникли проблемы с ее реализацией, и поэтому я надеюсь на помощь.Параллельный foreach с асинхронным lambda

Проблема возникает, если я хочу вызвать метод, помеченный async в C#, в пределах лямбда параллельного цикла. Например:

var bag = new ConcurrentBag<object>(); 
Parallel.ForEach(myCollection, async item => 
{ 
    // some pre stuff 
    var response = await GetData(item); 
    bag.Add(response); 
    // some post stuff 
} 
var count = bag.Count; 

Проблема возникает с графом, равным 0, так как все потоки, созданные эффективны только фоновыми потоки и Parallel.ForEach вызова не ждать завершения. Если удалить асинхронной ключевое слово, метод выглядит следующим образом:

var bag = new ConcurrentBag<object>(); 
Parallel.ForEach(myCollection, item => 
{ 
    // some pre stuff 
    var responseTask = await GetData(item); 
    responseTask.Wait(); 
    var response = responseTask.Result; 
    bag.Add(response); 
    // some post stuff 
} 
var count = bag.Count; 

Он работает, но он полностью отключает ждут сообразительности, и я должен сделать некоторые ручной обработки исключений .. (Удалено для краткости).

Как я могу реализовать цикл Parallel.ForEach, который использует ключевое слово ожидания в лямбда? Является ли это возможным?

Прототип метода Parallel.ForEach принимает параметр Action<T> как параметр, но я хочу, чтобы он дождался моей асинхронной лямбда.

+0

Я предполагаю, что вы имели в виду, чтобы удалить '' await' из AWAIT GetData (пункт) 'в вашем втором блоке кода, как он будет производить ошибку компиляции, как -является. –

+0

Возможный дубликат [Nesting waiting in Parallel.ForEach] (https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach) –

ответ

76

Если вы просто хотите простой параллелизм, вы можете сделать это:

var bag = new ConcurrentBag<object>(); 
var tasks = myCollection.Select(async item => 
{ 
    // some pre stuff 
    var response = await GetData(item); 
    bag.Add(response); 
    // some post stuff 
}); 
await Task.WhenAll(tasks); 
var count = bag.Count; 

Если вам нужно что-то более сложное, проверить Stephen Toub's ForEachAsync post.

+18

Возможно, необходим механизм дросселирования. Это немедленно создаст столько задач, сколько есть элементов, которые могут оказаться в 10k сетевых запросах и т. Д. – usr

+4

@usr Последний пример статьи Стивена Тууба касается этого. – svick

+0

@svick Я был озадачен этим последним образцом. Мне кажется, что он просто загружает множество задач, чтобы создать для меня больше задач, но все они начинают работать с массой. –

14

Вы можете использовать метод ParallelForEachAsync расширения от AsyncEnumerator NuGet Package:

using System.Collections.Async; 

var bag = new ConcurrentBag<object>(); 
await myCollection.ParallelForEachAsync(async item => 
{ 
    // some pre stuff 
    var response = await GetData(item); 
    bag.Add(response); 
    // some post stuff 
}, maxDegreeOfParallelism: 10); 
var count = bag.Count; 
+0

Это ваш пакет? Я видел, как вы опубликовали это сейчас в нескольких местах? : D Ой, подождите .. твое имя на упаковке: D +1 – ppumkin

+5

@ppumkin, да, это мое. Я видел эту проблему снова и снова, поэтому решил разрешить ее самым простым способом и освободить других от борьбы :) –

+0

Спасибо .. это определенно имеет смысл и помогло мне в большое время! – ppumkin

Смежные вопросы