2016-05-05 5 views
1

Я пытаюсь распараллелить фрагмент кода, который умножает два вектора сложных поплавков и суммирует результат. Для этого я пытаюсь использовать std :: async с фьючерсами. Моя идея состояла в том, чтобы разбить вектор на 8 частей и выполнить умножение на каждую из этих 8 частей параллельно, прежде чем суммировать их для моего окончательного результата. Для этого я создаю 8 фьючерсов, каждый из которых содержит лямбду, которая умножает два вектора и суммирует результат. Каждому будущему передаются указатели на разные позиции вектора, который представляет сечение вектора, на которое должно действовать данное конкретное будущее.Параллельное умножение векторов с использованием async C++

Однако, похоже, он не дает мне ускорения, ожидаемого мной, возможно, это ускорило этот раздел кода на 20-30%, но, кроме того, загрузка, похоже, не распространяется мои ядра (4 или 8 с гиперпотоком), но, похоже, все они находятся на одном ядре, которое составляет 100%.

Я включил код ниже. Любые предложения будут ценны.

size_t size = Input1.size()/8; 

std::vector<std::future<complex<float> > > futures; 
futures.reserve(8); 

for(int i = 0; i<8; ++i) 
{ 
    futures.push_back(std::async([](complex<float>* pos, complex<float>*pos2, size_t siz) 
    { 
     complex<float> resum(0,0); 
     for(int i = 0; i < siz; ++i) 
      resum += pos[i]*pos2[i]; 
     return resum; 
    }, &Input1[i*size], &Input2[i*size], size)); 
} 


complex<float> ResSum(0,0); 
for(int i = 0; i < futures.size(); ++i) 
    ResSum += futures.at(i).get(); 
+0

Насколько велики эти векторы? Возможно, вы захотите разделить входные данные на границах выравнивания. Это, вероятно, принесет больше пользы от векторизации и перегруппировки алгоритма, поэтому вы получите большую часть кеша. –

+0

Не знаете, сколько это поможет, но вы должны сделать 'резервный' вызов' resum' для предотвращения выделения памяти в потоке. – NathanOliver

+0

Не знаю, влияет ли это на время в этом конкретном случае, но если 'Input1.size()' не кратно 8, последний поток будет работать с конца массивов. –

ответ

0

Как написано, вызов std::async получает политику по умолчанию запуск launch::any, который позволяет запускать все asyncs на одном потоке. Чтобы настаивать на отдельных потоках, передайте launch::async в качестве первого аргумента в вызове std::async.

+0

Эй! Я попытался включить этот флаг запуска :: async, но он заставляет код работать значительно медленнее. Возможно, это указывает на большую ошибку в этом коде? – tallonj

0

Это зависит от того, сколько данных вы выбрасываете.

В следующем примере 4096 записей будут выполняться быстрее с помощью простого цикла. Но с 1000 * 4096 записей параллельная версия работает быстрее.

Таким образом, ваши результаты улучшения на 20-30%, вероятно, просто упали между этим диапазоном с рассматриваемым оборудованием.

Вот тестовая программа, которую я использовал.

Первый прогон - это простой цикл, второй - вопрос, а третий - std::launch::async.

Plain  From  With 
loop  question launch::async 
First  Second  Third 
166   1067  607  
166   614   434  
166   523   509  
265993  94633  66231  
182981  60594  69537  
237767  65731  57256 

Адрес live result.

#include <vector> 
#include <thread> 
#include <future> 
#include <complex> 
#include <string> 
#include <iostream> 
#include <chrono> 
#include <random> 
#include <ratio> 

float get_random() 
{ 
    static std::default_random_engine e; 
    static std::uniform_real_distribution<> dis(0,1); // rage 0 - 1 
    return static_cast<float>(dis(e)); 
} 

void do_tests(float val1, float val2, float val3, float val4, int multiplier) 
{ 
    { 
     std::vector<std::complex<float>> Input1(4096*multiplier,std::complex<float>{val1,val2}); 
     std::vector<std::complex<float>> Input2(4096*multiplier,std::complex<float>{val3,val4}); 
     std::complex<float> ResSum(0,0); 
     auto start{std::chrono::high_resolution_clock::now()}; 

     size_t size = Input1.size(); 
     for (int i=0; i<size; ++i) { 
      ResSum += Input1[i]*Input2[i]; 
     } 

     auto end{std::chrono::high_resolution_clock::now()}; 
     auto time_used{end-start}; 
     std::cout << std::chrono::duration_cast<std::chrono::microseconds>(time_used).count() << "\t\t"; 
    } 

    { 
     std::vector<std::complex<float>> Input1(4096*multiplier,std::complex<float>{val1,val2}); 
     std::vector<std::complex<float>> Input2(4096*multiplier,std::complex<float>{val3,val4}); 
     std::complex<float> ResSum(0,0); 
     auto start{std::chrono::high_resolution_clock::now()}; 

     size_t size = Input1.size()/8; 
     std::vector<std::future<std::complex<float>>> futures; 
     futures.reserve(8); 

     for (int i = 0; i<8; ++i) { 
      futures.push_back(
       std::async(
        [](std::complex<float>* pos,std::complex<float>*pos2,size_t siz) { 
       std::complex<float> resum(0,0); 
       for (int i = 0; i < siz; ++i) 
        resum += pos[i]*pos2[i]; 
       return resum; 
      } 
        ,&Input1[i*size],&Input2[i*size],size 
       ) 
       ); 
     } 

     for (int i = 0; i < futures.size(); ++i) 
      ResSum += futures.at(i).get(); 

     auto end{std::chrono::high_resolution_clock::now()}; 
     auto time_used{end-start}; 
     std::cout << std::chrono::duration_cast<std::chrono::microseconds>(time_used).count() << "\t\t"; 
    } 


    { 
     std::vector<std::complex<float>> Input1(4096*multiplier,std::complex<float>{val1,val2}); 
     std::vector<std::complex<float>> Input2(4096*multiplier,std::complex<float>{val3,val4}); 
     std::complex<float> ResSum(0,0); 
     auto start{std::chrono::high_resolution_clock::now()}; 

     size_t size = Input1.size()/8; 
     std::vector<std::future<std::complex<float>>> futures; 
     futures.reserve(8); 

     for (int i = 0; i<8; ++i) { 
      futures.push_back(
       std::async(std::launch::async, 
        [](std::complex<float>* pos,std::complex<float>*pos2,size_t siz) { 
       std::complex<float> resum(0,0); 
       for (int i = 0; i < siz; ++i) 
        resum += pos[i]*pos2[i]; 
       return resum; 
      } 
        ,&Input1[i*size],&Input2[i*size],size 
       ) 
       ); 
     } 

     for (int i = 0; i < futures.size(); ++i) 
      ResSum += futures.at(i).get(); 

     auto end{std::chrono::high_resolution_clock::now()}; 
     auto time_used{end-start}; 
     std::cout << std::chrono::duration_cast<std::chrono::microseconds>(time_used).count() << "\t\t"; 
    } 

    std::cout << '\n'; 

} 

int main() 
{ 
    float val1{get_random()}; 
    float val2{get_random()}; 
    float val3{get_random()}; 
    float val4{get_random()}; 

    std::cout << "First\t\tSecond\t\tThird\n"; 

    do_tests(val1, val2, val3, val4, 1); 
    do_tests(val1, val2, val3, val4, 1); 
    do_tests(val1, val2, val3, val4, 1); 
    do_tests(val1, val2, val3, val4, 1000); 
    do_tests(val1, val2, val3, val4, 1000); 
    do_tests(val1, val2, val3, val4, 1000); 


}