2017-02-16 2 views
4

Cython стартер здесь. Я пытаюсь ускорить вычисление определенной парной статистики (в нескольких ячейках), используя несколько потоков. В частности, я использую prange из cython.parallel, который внутренне использует openMP.Cython: make prange распараллеливание поточно-безопасный

Следующий минимальный пример иллюстрирует проблему (компиляция с помощью Jupyter notebook Cython magic).

установки

ноутбуков:

%load_ext Cython 
import numpy as np 

Cython код:

%%cython --compile-args=-fopenmp --link-args=-fopenmp -a 

from cython cimport boundscheck 
import numpy as np 
from cython.parallel cimport prange, parallel 

@boundscheck(False) 
def my_parallel_statistic(double[:] X, double[:,::1] bins, int num_threads): 

    cdef: 
     int N = X.shape[0] 
     int nbins = bins.shape[0] 
     double Xij,Yij 
     double[:] Z = np.zeros(nbins,dtype=np.float64) 
     int i,j,b 

    with nogil, parallel(num_threads=num_threads): 
     for i in prange(N,schedule='static',chunksize=1): 
      for j in range(i): 
       #some pairwise quantities 
       Xij = X[i]-X[j] 
       Yij = 0.5*(X[i]+X[j]) 
       #check if in bin 
       for b in range(nbins): 
        if (Xij < bins[b,0]) or (Xij > bins[b,1]): 
         continue 
        Z[b] += Xij*Yij 

    return np.asarray(Z) 

фиктивные данные и контейнеры

X = np.random.rand(10000) 
bin_edges = np.linspace(0.,1,11) 
bins = np.array([bin_edges[:-1],bin_edges[1:]]).T 
bins = bins.copy(order='C') 

Timing через

%timeit my_parallel_statistic(X,bins,1) 
%timeit my_parallel_statistic(X,bins,4) 

дает

1 loop, best of 3: 728 ms per loop 
1 loop, best of 3: 330 ms per loop 

, которая не является идеальным масштабирование, но это не главное вопроса. (Но дайте мне знать, если у вас есть предложения, помимо добавления обычных декораторов или тонких настройка аргументов Прейнджа.)

Однако этот расчет, по-видимому, не поточно-:

Z1 = my_parallel_statistic(X,bins,1) 
Z4 = my_parallel_statistic(X,bins,4) 
np.allclose(Z1,Z4) 

показывает существенную разницу между двумя результатами (до 20% в этом примере).

Я сильно подозреваю, что проблема заключается в том, что несколько потоков могут сделать

Z[b] += Xij*Yij 

одновременно. Но я не знаю, как это исправить, не жертвуя ускорением.

В моем фактическом прецеденте расчет Xij и Yij дороже, поэтому я хотел бы делать их только один раз за пару. Кроме того, предварительная вычисление и хранение Xij и Yij для всех пар, а затем просто зацикливание бункеров не является хорошим вариантом либо потому, что N может стать очень большим, и я не могу хранить в памяти массивы 100 000 x 100 000 numpy (это было фактически главная мотивация переписывать его в Китоне!).

Информация о системе (добавлено следующее предложение в комментариях):

CPU(s): 8 
Model name: Intel(R) Core(TM) i7-4790K CPU @ 4.00GHz 
OS: Red Hat Linux v6.8 
Memory: 16 GB 
+0

Является ли действие в каждой теме действительно независимым от действия любого другого? Имеет ли значение, на какой из них нужно работать первым? Если есть какая-либо зависимость, это не является хорошим кандидатом для параллельных операций. – hpaulj

+0

Пока каждый поток создает свои собственные Xij и Yij, они должны быть независимыми (но, возможно, это проблема?) Что касается математики, то Xij и Yij вычисляются независимо для каждой пары (i, j) , и, следовательно, вклад в статистику Z также. – user4319496

+1

Благодарим вас за включение такого превосходного [mcve] в ваш вопрос! Такой хорошо изученный и сформулированный вопрос слишком редок для SO. Единственное, что вы могли бы включить, это ваша модель процессора и память, чтобы прокомментировать исполнение, но это не главное в любом случае. – Zulan

ответ

4

Да, Z[b] += Xij*Yij действительно состояние гонки.

Есть несколько вариантов сделать это atomic или critical. Проблемы с выпуском с Cython в сторону, вы в любом случае имели бы плохую производительность из-за ложного обмена на общем Z векторе.

Таким образом, лучшей альтернативой является резервирование частного массива для каждого потока. Есть еще пара (не) вариантов. Можно использовать частный указатель malloc 'd, но я хотел придерживаться np. Сплиты памяти нельзя назначить как частные переменные. Двухмерный массив (num_threads, nbins) работает, но по какой-то причине генерирует очень сложный неэффективный индексный индекс массива. Это работает, но медленнее и не масштабируется.

Плоская матрица с ручным «2D» индексированием хорошо работает. Вы получаете немного дополнительную производительность, избегая заполнения отдельных частей массива до 64 байт, что является типичным размером строки кеша. Это позволяет избежать ложного обмена между ядрами. Частные части просто суммируются последовательно за пределами параллельной области.

%%cython --compile-args=-fopenmp --link-args=-fopenmp -a 
from cython cimport boundscheck 
import numpy as np 
from cython.parallel cimport prange, parallel 
cimport openmp 

@boundscheck(False) 
def my_parallel_statistic(double[:] X, double[:,::1] bins, int num_threads): 

    cdef: 
     int N = X.shape[0] 
     int nbins = bins.shape[0] 
     double Xij,Yij 
     # pad local data to 64 byte avoid false sharing of cache-lines 
     int nbins_padded = (((nbins - 1) // 8) + 1) * 8 
     double[:] Z_local = np.zeros(nbins_padded * num_threads,dtype=np.float64) 
     double[:] Z = np.zeros(nbins) 
     int i,j,b, bb, tid 

    with nogil, parallel(num_threads=num_threads): 
     tid = openmp.omp_get_thread_num() 
     for i in prange(N,schedule='static',chunksize=1): 
      for j in range(i): 
       #some pairwise quantities 
       Xij = X[i]-X[j] 
       Yij = 0.5*(X[i]+X[j]) 
       #check if in bin 
       for b in range(nbins): 
        if (Xij < bins[b,0]) or (Xij > bins[b,1]): 
         continue 
        Z_local[tid * nbins_padded + b] += Xij*Yij 
    for tid in range(num_threads): 
     for bb in range(nbins): 
      Z[bb] += Z_local[tid * nbins_padded + bb] 


    return np.asarray(Z) 

Это работает очень хорошо на моем 4 ядра машины, с 720 ms/191 ms, с ускорением 3,6. Оставшийся промежуток может быть обусловлен турбо-режимом. У меня нет доступа к надлежащей машине для тестирования прямо сейчас.

+0

Спасибо за отличный ответ, дающий как фиксированную версию, так и информацию о ней, чтобы понять ее! PS: Я исправил одну крошечную ошибку в вашем коде: в последовательном цикле в конце индекс должен быть bb, а не b (исправить ожидание для проверки/одобрения). – user4319496

1

Вы правы, что доступ к Z находится в состоянии гонки.

Возможно, вам лучше определить num_threads копии Z, как cdef double[:] Z = np.zeros((num_threads, nbins), dtype=np.float64), и выполнить сумму вдоль оси 0 после цикла prange.

return np.sum(Z, axis=0) 

Cython код может иметь with gil заявления в параллельной области, но документирован только для обработки ошибок. Вы могли бы взглянуть на общий код C, чтобы узнать, вызовет ли это атомную операцию OpenMP, но я сомневаюсь.

Смежные вопросы