2013-02-12 3 views
9

Я не уверен по поводу использования локальной функции инициализации в Parallel.ForEach, как описано в статье MSDN: http://msdn.microsoft.com/en-us/library/dd997393.aspxКак работает локальная инициализация с Parallel ForEach?

Parallel.ForEach<int, long>(nums, // source collection 
    () => 0, // method to initialize the local variable 
    (j, loop, subtotal) => // method invoked by the loop on each iteration 
    { 
     subtotal += nums[j]; //modify local variable 
     return subtotal; // value to be passed to next iteration 
    },... 

Как() => 0 инициализировать что-нибудь? Каково имя переменной и как я могу ее использовать в логике цикла?

+0

() => не инициализирует ничего, возвращаемое значение этой функции будет использоваться для инициализации локальной переменной (промежуточный итог в вашем примере). –

ответ

19

Со ссылкой на following overload метода Parallel.ForEach статического расширения:

public static ParallelLoopResult ForEach<TSource, TLocal>(
    IEnumerable<TSource> source, 
    Func<TLocal> localInit, 
    Func<TSource, ParallelLoopState, TLocal, TLocal> taskBody, 
    Action<TLocal> localFinally 
) 

В вашем конкретном примере

Линия:

() => 0, // method to initialize the local variable 

просто лямбда (анонимной функции), который вернет постоянное целое число.Эта лямбда передается в качестве параметра Parallel.ForEachlocalInit - с лямбдой возвращает целое число, оно имеет тип Func<int> и тип TLocal может быть выведена, как int компилятором (аналогично, TSource может быть выведен из типа коллекции, переданный в качестве параметра source)

Возвращаемое значение (0) затем передается как 3-й параметр (с именем subtotal) на taskBodyFunc. Это (0) используется начальное семя для контура тела:

(j, loop, subtotal) => 
{ 
    subtotal += nums[j]; //modify local variable (Bad idea, see comment) 
    return subtotal;  // value to be passed to next iteration 
} 

Этот второй лямбда (передаваемый taskBody) называется N раз, где N является количество элементов, выделенных для этой задачи с помощью секционирования TPL.

Каждый последующий вызов второго taskBody лямбда будет проходить новое значение subTotal, эффективного вычисления работает частичной общей сложности, для выполнения этой задачи. После того, как все элементы, назначенные этой задаче, будут добавлены, будет вызываться третий и последний параметр функции localFinally, опять же, передав окончательное значение subtotal, возвращенному с taskBody. Поскольку несколько таких задач будут работать параллельно, также необходимо будет сделать заключительный шаг, чтобы скомпоновать все частичные итоги в итоговую итоговую сумму. Однако, поскольку несколько одновременных задач (в разных потоках) могут конкурировать за переменную grandTotal, важно, чтобы изменения в ней выполнялись поточно-безопасным образом.

(я изменил имена переменных MSDN, чтобы сделать его более ясным)

long grandTotal = 0; 
Parallel.ForEach(nums,   // source collection 
() => 0,      // method to initialize the local variable 
    (j, loop, subtotal) =>   // method invoked by the loop on each iteration 
    subtotal + nums[j],   // value to be passed to next iteration subtotal 
    // The final value of subtotal is passed to the localFinally function parameter 
    (subtotal) => Interlocked.Add(ref grandTotal, subtotal) 

В примере MS, модификация параметра итога внутри тела задачи является плохой практикой, и ненужным. т.е. код subtotal += nums[j]; return subtotal; будет лучше, как только return subtotal + nums[j];, которые могут быть сокращены до проекции лямбды сокращенного (j, loop, subtotal) => subtotal + nums[j]

В общем

В localInit/body/localFinally перегруженных Parallel.For/Parallel.ForEach позволяют один раз на задачу инициализации и коды очистки в (до) и после (соответственно) итерации taskBody выполняются Задачей.

(отмечая Для диапазона/перечислимых передается параллельно For/Foreach будет разделена на партии IEnumerable<>, каждый из которых будет выделена задача)

В каждой задаче, localInit будет вызываться один раз , код body будет повторно вызываться, один раз за элемент в партии (0..N раз), а localFinally будет вызываться один раз после завершения.

Кроме того, вы можете передать любое состояние, необходимое для длительности задачи (т.е. к делегатам taskBody и localFinally) с помощью общего TLocal возвращаемого значения из localInit Func - Я назвал эту переменную taskLocals ниже.

Общие использования «localInit»:

  • Создание и инициализация дорогостоящих ресурсов, необходимых тела цикла, как соединение с базой данных или подключения веб-службы.
  • Ведение Task-Локальные переменных для хранения (uncontended) погонных итогов или коллекций
  • Если вам нужно возвратить несколько объектов из localInit в taskBody и localFinally, вы можете использовать строго типизированный класс, а Tuple<,,> или, если вы используете только lambdas для localInit/taskBody/localFinally, вы также можете передавать данные через анонимный класс. Обратите внимание, если вы используете возврат из localInit, чтобы поделиться ссылочным типом между несколькими задачами, вам нужно будет рассмотреть проблему безопасности потоков на этом объекте - неизменность предпочтительнее.

Общие использования акции "localFinally":

  • освободить ресурсы, такие как IDisposables, используемые в taskLocals (соединений например, баз данных, файловых дескрипторов, клиентов веб-служб и т.д.)
  • Чтобы агрегировать/объединить/сократить работу, выполняемую каждой задачей, обратно в общую переменную (ы). Эти общие переменные будут рассмотрены, поэтому проблема безопасности потоков является проблемой:
    • , например. Interlocked.Increment на примитивных типов, таких как целые числа
    • lock или подобное будет требоваться для операций записи
    • воспользоваться concurrent collections, чтобы сэкономить время и усилия.

taskBody является tight части операции петли - вы хотите, чтобы оптимизировать это для повышения производительности.

Это все лучше суммированы с комментариями, например:

public void MyParallelizedMethod() 
{ 
    // Shared variable. Not thread safe 
    var itemCount = 0; 

    Parallel.For(myEnumerable, 
    // localInit - called once per Task. 
    () => 
    { 
     // Local `task` variables have no contention 
     // since each Task can never run by multiple threads concurrently 
     var sqlConnection = new SqlConnection("connstring..."); 
     sqlConnection.Open(); 

     // This is the `task local` state we wish to carry for the duration of the task 
     return new 
     { 
      Conn = sqlConnection, 
      RunningTotal = 0 
     } 
    }, 
    // Task Body. Invoked once per item in the batch assigned to this task 
    (item, loopState, taskLocals) => 
    { 
     // ... Do some fancy Sql work here on our task's independent connection 
     using(var command = taskLocals.Conn.CreateCommand()) 
     using(var reader = command.ExecuteReader(...)) 
     { 
     if (reader.Read()) 
     { 
      // No contention for `taskLocal` 
      taskLocals.RunningTotal += Convert.ToInt32(reader["countOfItems"]); 
     } 
     } 
     // The same type of our `taskLocal` param must be returned from the body 
     return taskLocals; 
    }, 
    // LocalFinally called once per Task after body completes 
    // Also takes the taskLocal 
    (taskLocals) => 
    { 
     // Any cleanup work on our Task Locals (as you would do in a `finally` scope) 
     if (taskLocals.Conn != null) 
     taskLocals.Conn.Dispose(); 

     // Do any reduce/aggregate/synchronisation work. 
     // NB : There is contention here! 
     Interlocked.Add(ref itemCount, taskLocals.RunningTotal); 
    } 

И еще примеры:

Example of per-Task uncontended dictionaries

Example of per-Task database connections

+0

Удалите пример БД и замените его примером. Использование Parallel.For с базами данных, вероятно, не очень хорошая идея - предпочтительнее использовать async/await и parallelism через «Task.WhenAll». – StuartLC

2

Вы можете получить подсказку на MSDN в correct Parallel.ForEach перегрузка.

Делегат localInit вызывается один раз для каждого потока, который участвует в выполнении цикла и возвращает начальное локальное состояние для каждой из этих задач. Эти начальные состояния передаются первым вызовам тела для каждой задачи. Затем каждый последующий вызов тела возвращает возможно измененное значение состояния, которое передается следующему вызову тела.

В вашем примере () => 0 является делегатом только возвращение 0, поэтому это значение используется для первой итерации каждой задачи.

+0

Я прочитал этот текст перед публикацией здесь, но мой вопрос остается тем же. Как использование лямбды, которая только что возвращает 0, дает мне значение, которое «используется для первой итерации для каждой задачи»? Как я могу использовать это значение в коде? У него нет идентификатора. –

+3

У этого есть - это 'subtotal' в вашем примере. Но только на первой итерации в каждой задаче. Значение «subtotal» во всех других итерациях в этой задаче - это то, что возвращается из предыдущей итерации. –

+0

Хорошо, я до сих пор слежу за вами, но какая часть этого кода указывает, что «() => 0» = subtotal?Каково соглашение между функцией init и декларацией (j, loop, subtotal)? Редактирование: Я просто понял это, черт в деталях, или в этом случае, в именах общих параметров функции тела System.Func . Спасибо, я помету твой ответ. –

6

в качестве дополнительного ответа на вопрос @Honza Brestan. Способ Parallel foreach также разбивает работу на задачи, он будет группировать несколько итераций цикла в одну задачу, поэтому на практике localInit() вызывается один раз для каждого n итераций цикла, и несколько групп могут запускаться одновременно.

Точкой в ​​localInit и localFinally является обеспечение параллельной петля Еогеаспа может объединить результаты от каждого itteration в единый результат без необходимости указания блокировки заявления в body, чтобы сделать это, вы должны обеспечить инициализацию для значение, которое вы хотите создать (localInit), то каждая итерация body может обрабатывать локальное значение, тогда вы предоставляете метод для объединения значений из каждой группы (localFinally) поточно-безопасным способом.

Если вам не нужен localInit для синхронизации задач, вы можете использовать лямбда-методы для нормального задания значений из окружающего контекста, без каких-либо проблем. См. Threading in C# (Parallel.For and Parallel.ForEach) для более глубокого изучения использования localInit/Наконец и прокрутите вниз до Оптимизация с локальными значениями, Джозеф Альбахари - действительно мой источник для всех потоков.

+0

До голосования это потому, что это действительно прояснило меня. Благодарю. –

0

С моей стороны немного больше легче, например

class Program 
{ 
    class Person 
    { 
     public int Id { get; set; } 
     public string Name { get; set; } 
     public int Age { get; set; } 
    } 

    static List<Person> GetPerson() => new List<Person>() 
    { 
     new Person() { Id = 0, Name = "Artur", Age = 26 }, 
     new Person() { Id = 1, Name = "Edward", Age = 30 }, 
     new Person() { Id = 2, Name = "Krzysiek", Age = 67 }, 
     new Person() { Id = 3, Name = "Piotr", Age = 23 }, 
     new Person() { Id = 4, Name = "Adam", Age = 11 }, 
    }; 

    static void Main(string[] args) 
    { 
     List<Person> persons = GetPerson(); 
     int ageTotal = 0; 

     Parallel.ForEach 
     (
      persons, 
      () => 0, 
      (person, loopState, subtotal) => subtotal + person.Age, 
      (subtotal) => Interlocked.Add(ref ageTotal, subtotal) 
     ); 

     Console.WriteLine($"Age total: {ageTotal}"); 
     Console.ReadKey(); 
    } 
} 
Смежные вопросы