2016-05-31 3 views
-1

Итак, я преобразовал рекурсивную функцию в итеративную, а затем использовал Parallel.ForEach, но когда я запускал ее через VTune, она использовала только 2 логических ядра в течение большей части времени выполнения.Переход от Parallel.ForEach к многопоточности

я решил попытаться использовать управляемые темы вместо этого, и превращали этот код:

for (int N = 2; N <= length; N <<= 1) 
{ 
    int maxThreads = 4; 
    var workGroup = Enumerable.Range(0, maxThreads); 

    Parallel.ForEach(workGroup, i => 
    { 
     for (int j = ((i/maxThreads) * length); j < (((i + 1)/maxThreads) * length); j += N) 
     { 
      for (int k = 0; k < N/2; k++) 
      { 
       int evenIndex = j + k; 
       int oddIndex = j + k + (N/2); 

       var even = output[evenIndex]; 
       var odd = output[oddIndex]; 

       output[evenIndex] = even + odd * twiddles[k * (length/N)]; 
       output[oddIndex] = even + odd * twiddles[(k + (N/2)) * (length/N)]; 
      } 
     } 
    }); 
} 

В это:

for (int N = 2; N <= length; N <<= 1) 
{ 
    int maxThreads = 4; 

    Thread one = new Thread(() => calculateChunk(0, maxThreads, length, N, output)); 
    Thread two = new Thread(() => calculateChunk(1, maxThreads, length, N, output)); 
    Thread three = new Thread(() => calculateChunk(2, maxThreads, length, N, output)); 
    Thread four = new Thread(() => calculateChunk(3, maxThreads, length, N, output)); 

    one.Start(); 
    two.Start(); 
    three.Start(); 
    four.Start(); 
} 

public void calculateChunk(int i, int maxThreads, int length, int N, Complex[] output) 
{ 
    for (int j = ((i/maxThreads) * length); j < (((i + 1)/maxThreads) * length); j += N) 
    { 
     for (int k = 0; k < N/2; k++) 
     { 
      int evenIndex = j + k; 
      int oddIndex = j + k + (N/2); 
      var even = output[evenIndex]; 
      var odd = output[oddIndex]; 

      output[evenIndex] = even + odd * twiddles[k * (length/N)]; 
      output[oddIndex] = even + odd * twiddles[(k + (N/2)) * (length/N)]; 
     } 
    } 
} 

Проблема заключается в четвертом потоке на последней итерации N loop Я получаю исключение index out of bounds для выходного массива, где индекс пытается получить доступ к эквиваленту length.

Я не могу определить причину с помощью отладки, но я считаю, что она связана с потоками, я запускал код без потоков, и он работал по назначению.

Если какой-либо из кодов нуждается в изменении, сообщите мне, как правило, несколько человек предлагают изменения. Спасибо за вашу помощь, я попытался разобраться в этом сам и довольно уверен, что проблема возникает в моей потоковой передаче, но я не вижу, как это сделать.

PS: Целью является распараллеливание этого сегмента кода.

+1

[Parallel.ForEach] (https://msdn.microsoft.com/en-us/library/system.threading.tasks.parallel.foreach (v = vs.110) .aspx) имеет много перегрузок, вы можете контролировать степень параллелизма с вариантами. – davidshen84

+1

Вы можете потенциально достичь желаемого результата, просто переместив 'Parallel.ForEach' из внутреннего цикла во внешний цикл вашего исходного фрагмента (если вы ожидаете, что он будет содержать больше элементов, чем' workGroup', конечно). Это уменьшит затраты на установку и выключение Parallel.ForEach и позволит балансировщику нагрузки лучше выполнять свою работу, и я ожидаю, что он масштабируется до N ядер. Если вы придерживаетесь нитей, я ожидаю увидеть «Присоединиться где-то», иначе вы начинаете все больше и больше потоков на каждой итерации цикла до того, как предыдущая партия сможет закончить. –

+0

Вы уверены, что ваш алгоритм верен?По моему мнению, в цикле 'for (int j = ((i/maxThreads) * length); j <((i + 1)/maxThreads) * length); j + = N)' inital value ' int j = ((i/maxThreads) * length' всегда будет 0 для i в диапазоне [0, maxThreads-1] (это целочисленное деление!), а условие цикла j <((i + 1)/maxThreads) * length) 'будет' false' для всех значений 'i', за исключением последнего. Таким образом, в конце вы внутренний цикл вводится только один раз, независимо от того, сколько потоков вы используете. – qbik

ответ

1

Наблюдаемое поведение почти наверняка связано с использованием переменной итерации захваченного цикла N. Я могу воспроизвести ситуацию с помощью простого теста:

ConcurrentBag<int> numbers = new ConcurrentBag<int>(); 

for (int i = 0; i < 10000; i++) 
{ 
    Thread t = new Thread(() => numbers.Add(i)); 

    t.Start(); 
    //t.Join(); // Uncomment this to get expected behaviour. 
} 

// You'd not expect this assert to be true, but most of the time it will be. 
Assert.True(numbers.Contains(10000)); 

Проще говоря, ваш for цикла гонки для увеличения N до значения N может быть скопировано делегатом, который выполняет calculateChunk вызова. В результате calculateChunk видит почти случайные значения N, поднимаясь до (и в том числе) length <<= 1 - вот что вызывает IndexOutOfRangeException.

Выходные значения, которые вы получите, будут слишком мусорными, так как вы никогда не сможете полагаться на значение N.

Если вы хотите safely перепишите исходный код, чтобы использовать больше ядер, переместите Parallel.ForEach от внутреннего контура к внешнему контуру. Если число итераций внешнего цикла велико, балансировщик нагрузки сможет выполнить свою работу должным образом (что не может с вашим текущим workGroup числом 4 - это количество элементов просто слишком низкое).

+1

Просто свинья, чтобы подчеркнуть ** безопасный **. Этот код должен легко лежать на TPL ('Parallel.ForEach'). Бег к сырью 'Thread' опасен NIH. –

+0

Каждое приращение N использует вычисленные данные из предыдущей итерации, поэтому я не могу их разложить, его зависимость от данных :( –

+0

@OliverGiess, я понимаю сейчас. Я также думаю, что вы слишком рано отметили мой ответ. интересная проблема параллелизации из-за необходимости сканировать предыдущие результаты. Я вижу, как это можно решить с помощью подхода «производитель-потребитель» и барьеров, но я не уверен, что повышение производительности от параллелизации приведет к перевесе накладных расходов синхронизации. –