2016-04-13 2 views
1

Я использую C++ и cuda/thrust для выполнения вычисления на графическом процессоре, который для меня является новым полем. К сожалению, мой код (MCVE ниже) не очень эффективен, поэтому я хотел бы знать, как его оптимизировать. Код выполняет следующие операции:Cuda Thrust - Как оптимизировать код с помощью sort_by_key, merge_by_key и reduce_by_key

Существует два ключевых вектора и два вектора значений. Ключевые векторы содержат в основном i и j верхней треугольной матрицы (в этом примере: размер 4x4).

key1 {0, 0, 0, 1, 1, 2} value1: {0.5, 0.5, 0.5, -1.0, -1.0, 2.0} 
key2 {1, 2, 3, 2, 3, 3} value2: {-1, 2.0, -3.5, 2.0, -3.5, -3.5} 

Задача состоит в том, чтобы суммировать все значения, имеющие один и тот же ключ. Для этого я отсортировал второй вектор значений, используя sort_by_key. Результат:

key1 {0, 0, 0, 1, 1, 2} value1: {0.5, 0.5, 0.5, -1.0, -1.0, 2.0} 
key2 {1, 2, 2, 3, 3, 3} value2: {-1.0, 2.0, 2.0, -3.5, -3.5, -3.5} 

После этого я слился как вектор значений с помощью merge_by_key, что приводит к новому ключу и значению вектора с размером двойным, как большая, чем раньше.

key_merge {0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3} 
value_merge {0.5, 0.5, 0.5, -1.0, -1.0, -1.0, 2.0, 2.0, 2.0, -3.5, -3.5, -3.5} 

Последний шаг - использовать reduce_by_key для суммирования по всем значениям с помощью того же ключа. Результат:

key {0, 1, 2, 3} value: {1.5, -3.0, 6.0, -10.5} 

ниже код, который выполняет эти операции тихо, медленно, и я боюсь, что производительность для большего размера будет плохо. Как его можно оптимизировать? Возможно ли слияние sort_by_key, merge_by_key и reduce_by_key? Поскольку я знаю результирующий вектор ключа из sort_by_key заранее, можно ли преобразовать вектор значений «от старого к новому ключу»? Имеет ли смысл, объединить два вектора перед их уменьшением или же быстрее использовать reduce_by_key отдельно для каждой пары вектора ценности/ключа? Можно ли ускорить вычисление reduce_by_key, используя тот факт, что здесь известен номер различного значения ключа, а число равных ключей всегда одинаково?

#include <stdio.h> 
#include <thrust/host_vector.h> 
#include <thrust/device_vector.h> 
#include <thrust/sort.h> 
#include <thrust/reduce.h> 
#include <thrust/merge.h> 

int main(){ 
    int key_1[6] = {0, 0, 0, 1, 1, 2}; 
    int key_2[6] = {1, 2, 3, 2, 3, 3}; 
    thrust::device_vector<double> k1(key_1,key_1+6); 
    thrust::device_vector<double> k2(key_2,key_2+6); 

    double value_1[6] = {0.5, 0.5, 0.5, -1.0, -1.0, 2.0}; 
    double value_2[6] = {-1, 2.0, -3.5, 2.0, -3.5, -3.5}; 
    thrust::device_vector<double> v1(value_1,value_1+6); 
    thrust::device_vector<double> v2(value_2,value_2+6); 

    thrust::device_vector<double> mk(12); 
    thrust::device_vector<double> mv(12); 
    thrust::device_vector<double> rk(4); 
    thrust::device_vector<double> rv(4); 

    thrust::sort_by_key(k2.begin(), k2.end(), v2.begin()); 
    thrust::merge_by_key(k1.begin(), k1.end(), k2.begin(), k2.end(),v1.begin(), v2.begin(), mk.begin(), mv.begin()); 
    thrust::reduce_by_key(mk.begin(), mk.end(), mv.begin(), rk.begin(), rv.begin()); 

    for (unsigned i=0; i<4; i++) { 
    double tmp1 = rk[i]; 
    double tmp2 = rv[i]; 
    printf("key value %f is related to %f\n", tmp1, tmp2); 
    } 
    return 0; 
} 

Результат:

key value 0.000000 is related to 1.500000 
key value 1.000000 is related to -3.000000 
key value 2.000000 is related to 6.000000 
key value 3.000000 is related to -10.500000 

ответ

2

Вот один из возможных подходов, что я думаю, что может быть быстрее, чем ваша последовательность. Основная идея заключается в том, что мы хотим избежать сортировки данных, когда мы знаем порядок раньше времени. Если мы можем использовать знания порядка, которые у нас есть, вместо сортировки данных мы можем просто переупорядочить его в желаемую компоновку.

Давайте сделаем некоторые наблюдения за данными. Если key1 и key2 являются на самом деле я, J индексы верхней треугольной матрицы, то мы можем сделать несколько замечаний о конкатенации этих двух векторов:

  1. Сцепленная вектор будет содержать одинаковое число каждый ключ. (Я полагаю, вы могли бы указать это на свой вопрос.) Таким образом, в вашем случае вектор будет содержать три клавиши 0, три клавиши 1, три клавиши 2 и три клавиши 3. Я считаю, что этот шаблон должен иметь место для любого верхнего треугольного шаблона, не зависящего от размерности матрицы. Таким образом, матрица размерности N, которая является верхней треугольной, будет иметь N наборов ключей в конкатенированном индексном векторе, причем каждый набор состоит из N-1 подобных элементов.

  2. В объединенном векторе мы можем обнаружить/установить последовательное упорядочение ключей (на основе размерности матрицы N), что позволяет нам переупорядочить вектор в порядке сгруппированных по клавишам, не прибегая к традиционной операции сортировки ,

Если мы объединим выше 2 идеи, то мы, вероятно, можем решить все проблемы с некоторыми операциями рассеяния заменить вид/слияние деятельности, а затем thrust::reduce_by_key операции. Операции рассеяния могут быть выполнены с помощью thrust::copy до соответствующего thrust::permutation_iterator в сочетании с соответствующим функтором вычисления индекса. Поскольку мы точно знаем, как будет выглядеть переупорядоченный конкатенированный вектор key (в примере вашего измерения 4: {0,0,0,1,1,1,2,2,2,3,3,3}), нам не нужно явно выполнять переупорядочение. Однако мы должны переупорядочить вектор value, используя одно и то же отображение. Итак, давайте развивать арифметику для этого отображения:

dimension (N=)4 example 

vector index:   0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11 
desired (group) order: 0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3 
concatenated keys:  0, 0, 0, 1, 1, 2, 1, 2, 3, 2, 3, 3 
group start idx:  0, 0, 0, 3, 3, 6, 3, 6, 9, 6, 9, 9 
group offset idx:  0, 1, 2, 0, 1, 0, 2, 1, 0, 2, 1, 2 
destination idx:  0, 1, 2, 3, 4, 6, 5, 7, 9, 8,10,11 


dimension (N=)5 example 

vector index:   0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15,16,17,18,19 
desired (group) order: 0, 0, 0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 4, 4 
concatenated keys:  0, 0, 0, 0, 1, 1, 1, 2, 2, 3, 1, 2, 3, 4, 2, 3, 4, 3, 4, 4 
group start idx:  0, 0, 0, 0, 4, 4, 4, 8, 8,12, 4, 8,12,16, 8,12,16,12,16,16 
group offset idx:  0, 1, 2, 3, 0, 1, 2, 0, 1, 0, 3, 2, 1, 0, 3, 2, 1, 3, 2, 3 
destination idx:  0, 1, 2, 3, 4, 5, 6,10, 7, 8,11,14, 9,12,15,17,13,16,18,19 

Мы можем наблюдать, что в каждом конкретном случае, индекс назначения (то есть место, чтобы переместить выбранный ключ или значение, для требуемого порядка группы) равно индекс начала группы плюс индекс группового смещения. Индекс начала группы - это просто ключ, умноженный на (N-1). Индекс группового смещения представляет собой шаблон, аналогичный шаблону верхнего или нижнего треугольного индекса (в двух разных воплощениях для каждой половины конкатенированного вектора). Конкатенированные ключи - это просто конкатенация векторов key1 и key2 (мы создадим эту конкатенацию практически с помощью permutation_iterator). Желаемый групповой порядок известен а-априори, это просто последовательность целых групп, причем N групп состоят из N-1 элементов. Это эквивалентно сортированной версии конкатенированного ключевого вектора. Поэтому мы можем непосредственно вычислить индекс назначения в функторе.

Для создания группы смещения моделей индекса, мы можем вычесть ваши два ключевых векторов (и вычесть дополнительные 1):

key2:     1, 2, 3, 2, 3, 3 
key1:     0, 0, 0, 1, 1, 2 
key1+1:    1, 1, 1, 2, 2, 3 
p1 = key2-(key1+1): 0, 1, 2, 0, 1, 0 
p2 = (N-2)-p1:   2, 1, 0, 2, 1, 2 
grp offset idx=p1|p2: 0, 1, 2, 0, 1, 0, 2, 1, 0, 2, 1, 2 

Вот полностью обработанный пример, демонстрирующий вышеупомянутые концепции, используя ваш пример данные:

$ cat t1133.cu 
#include <thrust/host_vector.h> 
#include <thrust/device_vector.h> 
#include <thrust/reduce.h> 
#include <thrust/copy.h> 
#include <thrust/transform.h> 
#include <thrust/iterator/transform_iterator.h> 
#include <thrust/iterator/permutation_iterator.h> 
#include <thrust/iterator/zip_iterator.h> 
#include <thrust/iterator/counting_iterator.h> 
#include <iostream> 

// "triangular sort" index generator 
struct idx_functor 
{ 
    int n; 
    idx_functor(int _n): n(_n) {}; 
    template <typename T> 
    __host__ __device__ 
    int operator()(const T &t){ 
    int k1 = thrust::get<0>(t); 
    int k2 = thrust::get<1>(t); 
    int id = thrust::get<2>(t); 
    int go,k; 
    if (id < (n*(n-1))/2){ // first half 
     go = k2-k1-1; 
     k = k1; 
     } 
    else { // second half 
     go = n-k2+k1-1; 
     k = k2; 
     } 
    return k*(n-1)+go; 
    } 
}; 


const int N = 4; 
using namespace thrust::placeholders; 

int main(){ 
    // useful dimensions 
    int d1 = N*(N-1); 
    int d2 = d1/2; 
    // iniitialize keys 
    int key_1[] = {0, 0, 0, 1, 1, 2}; 
    int key_2[] = {1, 2, 3, 2, 3, 3}; 
    thrust::device_vector<int> k1(key_1, key_1+d2); 
    thrust::device_vector<int> k2(key_2, key_2+d2); 
    // initialize values 
    double value_1[] = {0.5, 0.5, 0.5, -1.0, -1.0, 2.0}; 
    double value_2[] = {-1, 2.0, -3.5, 2.0, -3.5, -3.5}; 
    thrust::device_vector<double> v(d1); 
    thrust::device_vector<double> vg(d1); 
    thrust::copy_n(value_1, d2, v.begin()); 
    thrust::copy_n(value_2, d2, v.begin()+d2); 
    // reorder (group) values by key 
    thrust::copy(v.begin(), v.end(), thrust::make_permutation_iterator(vg.begin(), thrust::make_transform_iterator(thrust::make_zip_iterator(thrust::make_tuple(thrust::make_permutation_iterator(k1.begin(), thrust::make_transform_iterator(thrust::counting_iterator<int>(0), _1%d2)), thrust::make_permutation_iterator(k2.begin(), thrust::make_transform_iterator(thrust::counting_iterator<int>(0), _1%d2)), thrust::counting_iterator<int>(0))), idx_functor(N)))); 
    // sum results 
    thrust::device_vector<double> rv(N); 
    thrust::device_vector<int> rk(N); 
    thrust::reduce_by_key(thrust::make_transform_iterator(thrust::counting_iterator<int>(0), _1/(N-1)), thrust::make_transform_iterator(thrust::counting_iterator<int>(d1), _1/(N-1)), vg.begin(), rk.begin(), rv.begin()); 
    // print results 
    std::cout << "Keys:" << std::endl; 
    thrust::copy_n(rk.begin(), N, std::ostream_iterator<int>(std::cout, ", ")); 
    std::cout << std::endl << "Sums:" << std::endl; 
    thrust::copy_n(rv.begin(), N, std::ostream_iterator<double>(std::cout, ", ")); 
    std::cout << std::endl; 
    return 0; 
} 
$ nvcc -std=c++11 -o t1133 t1133.cu 
$ ./t1133 
Keys: 
0, 1, 2, 3, 
Sums: 
1.5, -3, 6, -10.5, 
$ 

чистый эффект является то, что ваши thrust::sort_by_key и thrust::merge_by_key операции были заменены одной thrust::copy операции, которая должна быть более эффективной.

Смежные вопросы