2013-10-08 6 views
3

Следующий код действует на двух std::vectorsv1 и v2, каждый из которых содержит несколько векторов из 128 элементов. Петли через внешние векторы (с использованием i1 и i2) содержат внутренний контур, предназначенный для ограничения комбинаций i1 и i2, для которых выполняется дополнительная комплексная обработка. Около 99,9% комбинаций отфильтровываются.Как улучшить производительность этого кода на C++?

К сожалению, цикл фильтрации является основным узким местом в моей программе - профилирование показывает, что 26% всего времени выполнения тратится на линию if(a[k] + b[k] > LIMIT).

const vector<vector<uint16_t>> & v1 = ... 
const vector<vector<uint16_t>> & v2 = ... 

for(size_t i1 = 0; i1 < v1.size(); ++i1) { //v1.size() and v2.size() about 20000 
    for(size_t i2 = 0; i2 < v2.size(); ++i2) { 

     const vector<uint16_t> & a = v1[i1]; 
     const vector<uint16_t> & b = v2[i2]; 

     bool good = true; 
     for(std::size_t k = 0; k < 128; ++k) { 
      if(a[k] + b[k] > LIMIT) { //LIMIT is a const uint16_t: approx 16000 
       good = false; 
       break; 
      } 
     } 
     if(!good) continue; 

     // Further processing involving i1 and i2 
    } 
} 

Я думаю, что производительность этого кода может быть улучшена за счет увеличения объема памяти и, возможно, векторизации. Любые предложения о том, как это сделать, или о других улучшениях, которые могут быть сделаны?

+0

Следует, скорее всего, перейти на codereview (они должны положить это уже в список оффтопических близких голосов) – PlasmaHH

+0

звучит как проблема прогнозирования ветвлений. sse решит его –

+0

Может ли упростить структуру данных? –

ответ

0

Во-первых, один простой оптимизации может быть, но компилятор может сделать это сам по себе, так что я не уверен, сколько это могло бы улучшить:

for(std::size_t k = 0; k < 128 && good; ++k) 
{ 
    good = a[k] + b[k] <= LIMIT; 
} 

Во-вторых, я думаю, что это может лучше держать хороший результат во втором векторе, потому что любая обработка , связанная с i1 и i2, может сломать кэш ЦП.

В-третьих, и это может быть основной оптимизации, я думаю, вы можете переписать второй цикл как это: для (size_t i2 = i1; i2 < v2.size(); ++ i2), так как вы используете + операции для a и b векторов, которые являются коммутативными, поэтому результат i1 и i2 будет таким же, как i2 и i1. Для этого вам нужно иметь одинаковые размеры v1 и v2. Если размер отличается, вам нужно написать итерацию по-другому.

Форт, насколько я вижу, вы обрабатываете две матрицы, было бы лучше сохранить вектор элементов, а не вектор векторов.

Надеюсь, это поможет. Разван.

+0

'good & =' в противном случае вы просто проверяете самый последний 'k'. –

3

Вы могли бы применить SIMD к внутренней петле:

bool good = true; 
    for(std::size_t k = 0; k < 128; ++k) { 
     if(a[k] + b[k] > LIMIT) { //LIMIT is a const uint16_t: approx 16000 
      good = false; 
      break; 
     } 

следующим образом:

#include <emmintrin.h> // SSE2 intrinsics 
#include <limits.h>  // SHRT_MIN 

// ... 

    // some useful constants - declare these somewhere before the outermost loop 

    const __m128i vLIMIT = _mm_set1_epi16(LIMIT + SHRT_MIN); // signed version of LIMIT 
    const __m128i vOFFSET = _mm_set1_epi16(SHRT_MIN);  // offset for uint16_t -> int16_t conversion 

// ... 

    bool good = true; 
    for(std::size_t k = 0; k < 128; k += 8) { 
     __m128i v, va, vb;    // iterate through a, b, 8 elements at a time 
     int mask; 
     va = _mm_loadu_si128(&a[k]); // get 8 elements from a[k], b[k] 
     vb = _mm_loadu_si128(&b[k]); 
     v = _mm_add_epi16(va, vb);  // add a and b vectors 
     v = _mm_add_epi16(v, vOFFSET); // subtract 32768 to make signed 
     v = _mm_cmpgt_epi16(v, vLIMIT); // compare against LIMIT 
     mask = _mm_maskmove_epi8(v); // get comparison results as 16 bit mask 
     if (mask != 0) {    // if any value exceeded limit 
      good = false;    // clear good flag and exit loop 
      break; 
     } 

Предупреждение: непроверенный код - возможно, потребуется отладка, но общий подход должен быть звук.

+0

выглядит хорошо. для дальнейшего улучшения я бы выровнял данные –

+0

@ BЈовић: да, к сожалению, у вас нет большого контроля над выравниванием std :: vector. К счастью, хотя смещенные нагрузки не слишком дороги для современных процессоров. –

0

Несколько предложений:

  • Как указывается в комментариях, заменить внутренний 128 элемент вектора с массивом для лучшей локальности памяти.
  • Этот код выглядит очень параллелизуемым, вы пробовали это? Вы можете разделить комбинации для фильтрации по всем доступным ядрам, а затем перебалансировать собранную работу и разделить обработку по всем ядрам.

Я реализовал версию с использованием массивов для внутренних 128 элементов, PPL для распараллеливания (требуется VS 2012 или выше) и немного SSE-кода для фильтрации и получил довольно значительное ускорение.В зависимости от того, что именно включает в себя «дальнейшая обработка», могут быть преимущества для структурирования вещей несколько иначе (в этом примере я не перестраиваю работу после фильтрации, например).

Обновление: Я реализовал блокировку кэша, предложенную Беном Войгтом, и получил немного больше ускорения.

#include <vector> 
#include <array> 
#include <random> 
#include <limits> 
#include <cstdint> 
#include <iostream> 
#include <numeric> 
#include <chrono> 
#include <iterator> 

#include <ppl.h> 

#include <immintrin.h> 

using namespace std; 
using namespace concurrency; 

namespace { 
const int outerVecSize = 20000; 
const int innerVecSize = 128; 
const int LIMIT = 16000; 
auto engine = default_random_engine(); 
}; 

typedef vector<uint16_t> InnerVec; 
typedef array<uint16_t, innerVecSize> InnerArr; 

template <typename Cont> void randomFill(Cont& c) { 
    // We want approx 0.1% to pass filter, the mean and standard deviation are chosen to get close to that 
    static auto dist = normal_distribution<>(LIMIT/4.0, LIMIT/4.6); 
    generate(begin(c), end(c), [] { 
     auto clamp = [](double x, double minimum, double maximum) { return min(max(minimum, x), maximum); }; 
     return static_cast<uint16_t>(clamp(dist(engine), 0.0, numeric_limits<uint16_t>::max())); 
    }); 
} 

void resizeInner(InnerVec& v) { v.resize(innerVecSize); } 
void resizeInner(InnerArr& a) {} 

template <typename Inner> Inner generateRandomInner() { 
    auto inner = Inner(); 
    resizeInner(inner); 
    randomFill(inner); 
    return inner; 
} 

template <typename Inner> vector<Inner> generateRandomInput() { 
    auto outer = vector<Inner>(outerVecSize); 
    generate(begin(outer), end(outer), generateRandomInner<Inner>); 
    return outer; 
} 

void Report(const chrono::high_resolution_clock::duration elapsed, size_t in1Size, size_t in2Size, 
      const int passedFilter, const uint32_t specialValue) { 
    cout << passedFilter << "/" << in1Size* in2Size << " (" 
     << 100.0 * (double(passedFilter)/double(in1Size * in2Size)) << "%) passed filter\n"; 
    cout << specialValue << "\n"; 
    cout << "Elapsed time = " << chrono::duration_cast<chrono::milliseconds>(elapsed).count() << "ms" << endl; 
} 

void TestOriginalVersion() { 
    cout << __FUNCTION__ << endl; 

    engine.seed(); 
    const auto v1 = generateRandomInput<InnerVec>(); 
    const auto v2 = generateRandomInput<InnerVec>(); 

    int passedFilter = 0; 
    uint32_t specialValue = 0; 

    auto startTime = chrono::high_resolution_clock::now(); 

    for (size_t i1 = 0; i1 < v1.size(); ++i1) { // v1.size() and v2.size() about 20000 
     for (size_t i2 = 0; i2 < v2.size(); ++i2) { 

      const vector<uint16_t>& a = v1[i1]; 
      const vector<uint16_t>& b = v2[i2]; 

      bool good = true; 
      for (std::size_t k = 0; k < 128; ++k) { 
       if (static_cast<int>(a[k]) + static_cast<int>(b[k]) 
        > LIMIT) { // LIMIT is a const uint16_t: approx 16000 
        good = false; 
        break; 
       } 
      } 
      if (!good) continue; 

      // Further processing involving i1 and i2 
      ++passedFilter; 
      specialValue += inner_product(begin(a), end(a), begin(b), 0); 
     } 
    } 

    auto endTime = chrono::high_resolution_clock::now(); 

    Report(endTime - startTime, v1.size(), v2.size(), passedFilter, specialValue); 
} 

bool needsProcessing(const InnerArr& a, const InnerArr& b) { 
    static_assert(sizeof(a) == sizeof(b) && (sizeof(a) % 16) == 0, "Array size must be multiple of 16 bytes."); 
    static const __m128i mmLimit = _mm_set1_epi16(LIMIT); 
    static const __m128i mmLimitPlus1 = _mm_set1_epi16(LIMIT + 1); 
    static const __m128i mmOnes = _mm_set1_epi16(-1); 

    auto to_m128i = [](const uint16_t* p) { return reinterpret_cast<const __m128i*>(p); }; 
    return equal(to_m128i(a.data()), to_m128i(a.data() + a.size()), to_m128i(b.data()), [&](const __m128i& a, const __m128i& b) { 
     // avoid overflow due to signed compare by clamping sum to LIMIT + 1 
     const __m128i clampSum = _mm_min_epu16(_mm_adds_epu16(a, b), mmLimitPlus1); 
     return _mm_test_all_zeros(_mm_cmpgt_epi16(clampSum, mmLimit), mmOnes); 
    }); 
} 

void TestArrayParallelVersion() { 
    cout << __FUNCTION__ << endl; 

    engine.seed(); 
    const auto v1 = generateRandomInput<InnerArr>(); 
    const auto v2 = generateRandomInput<InnerArr>(); 

    combinable<int> passedFilterCombinable; 
    combinable<uint32_t> specialValueCombinable; 

    auto startTime = chrono::high_resolution_clock::now(); 

    const size_t blockSize = 64; 

    parallel_for(0u, v1.size(), blockSize, [&](size_t i) { 
     for (const auto& b : v2) { 
      const auto blockBegin = begin(v1) + i; 
      const auto blockEnd = begin(v1) + min(v1.size(), i + blockSize); 
      for (auto it = blockBegin; it != blockEnd; ++it) { 
       const InnerArr& a = *it; 
       if (!needsProcessing(a, b)) 
        continue; 

       // Further processing involving a and b 
       ++passedFilterCombinable.local(); 
       specialValueCombinable.local() += inner_product(begin(a), end(a), begin(b), 0); 
      } 
     } 
    }); 

    auto passedFilter = passedFilterCombinable.combine(plus<int>()); 
    auto specialValue = specialValueCombinable.combine(plus<uint32_t>()); 

    auto endTime = chrono::high_resolution_clock::now(); 

    Report(endTime - startTime, v1.size(), v2.size(), passedFilter, specialValue); 
} 

int main() { 
    TestOriginalVersion(); 
    TestArrayParallelVersion(); 
} 

На моей основной системе 8 я вижу очень хороший прирост скорости, ваши результаты будут варьироваться в зависимости от того, сколько ядер у вас есть и т.д.

TestOriginalVersion 
441579/400000000 (0.110395%) passed filter 
2447300015 
Elapsed time = 12525ms 
TestArrayParallelVersion 
441579/400000000 (0.110395%) passed filter 
2447300015 
Elapsed time = 657ms 
1

У вас есть наиболее эффективный шаблон доступа для v1 , но вы последовательно просматриваете все v2 для каждой итерации внешнего цикла. Это очень неэффективно, потому что v2 доступ будет постоянно вызывать (L2 и, возможно, также L3) пропуски кеша.

Лучший образец доступа является увеличение вложенности цикла, так что внешние петли шагать через v1 и v2, и внутренние циклы процесса элементы внутри подотрезка обоих v1 и v2, который достаточно мал, чтобы поместиться в кэш.

В принципе, вместо

for(size_t i1 = 0; i1 < v1.size(); ++i1) { //v1.size() and v2.size() about 20000 
    for(size_t i2 = 0; i2 < v2.size(); ++i2) { 

ли

for(size_t i2a = 0; i2a < v2.size(); i2a += 32) { 
    for(size_t i1 = 0; i1 < v1.size(); ++i1) { 
     for(size_t i2 = i2a; i2 < v2.size() && i2 < i2a + 32; ++i2) { 

Или

size_t i2a = 0; 

// handle complete blocks 
for(; i2a < v2.size() - 31; i2a += 32) { 
    for(size_t i1 = 0; i1 < v1.size(); ++i1) { 
     for(size_t i2 = i2a; i2 < i2a + 32; ++i2) { 

     } 
    } 
} 

// handle leftover partial block 
for(size_t i1 = 0; i1 < v1.size(); ++i1) { 
    for(size_t i2 = i2a; i2 < v2.size(); ++i2) { 
    } 
} 

Таким образом, кусок 32 * 128 * sizeof (uint16_t) байт, или 8kB будут загружены из v2 в кэш , а затем повторно использовали 20 000 раз.

Это улучшение является ортогональным к векторизации SIMD (SSE). Он будет взаимодействовать с потоковым параллелизмом, но, вероятно, в хорошем смысле.

Смежные вопросы