2016-07-20 2 views
2

Чтобы сделать вещи более общими, возьмите, например, gsl_integration_qag (так как для управления интервалами требуется рабочее пространство).Как использовать интеграцию gsl в блоке prange cython?

Вот преднамеренно простой пример (problem.pyx):

from libc.math cimport cos 
from cython.parallel import prange 
from cython_gsl cimport * 

cdef double mean(double[:] arr): 
    cdef Py_ssize_t i 
    cdef double sum_ = 0.0 
    for i in range(arr.shape[0]): 
     sum_ += arr[i] 
    return sum_/arr.shape[0] 

cdef double integrand(double x, void *params) nogil: 
    cdef double a = (<double*>params)[0] 
    cdef double b = (<double*>params)[1] 
    return a*x + b 

def use_gsl_integration_sequential(): 
    cdef double result, error 
    cdef Py_ssize_t i, j 
    cdef double result_sum=0.0 
    cdef double results[3] 
    cdef double a=0.0, b=0.0 
    cdef double da = 0.1 
    cdef size_t space_size = 1000 

    cdef gsl_function gsl_func 
    cdef double params[2] 
    gsl_func.function = &integrand 

    cdef gsl_integration_workspace *ws 
    ws= gsl_integration_workspace_alloc(space_size) 

    for i in range(3): 
     a += da 
     result_sum = 0.0 # reset the sum(thank J.J.Hakala for pointing out this) 
     for j in range(4): # here may be range(100000) in practice! 
      b = a + j 
      params[0] = a; params[1] = b 
      gsl_func.params = params 
      gsl_integration_qag(&gsl_func, 0.0, 1.0, 1e-8, 1e-8, space_size, GSL_INTEG_GAUSS15, ws, &result, &error) 
      result_sum += result 
     results[i] = result_sum 

    gsl_integration_workspace_free(ws) 
    # do some other stuff with the 'results', for example: 
    return mean(results) 


# Now, I want to parallel the inner loop 
def use_gsl_integration_parallel(): 
    cdef double result, error 
    cdef Py_ssize_t i, j 
    cdef double result_sum=0.0 
    cdef double results[3] 
    cdef double a, b 
    cdef double da = 0.1 
    cdef size_t space_size = 1000 

    cdef gsl_function gsl_func 
    cdef double params[2] 
    gsl_func.function = &integrand 

    cdef gsl_integration_workspace *ws 
    ws= gsl_integration_workspace_alloc(space_size) 

    for i in range(3): 
     # a should be read-only in the follow prange block. 
     a += da 
     # here may be prange(100000,...) in practice! 
     for j in prange(4, nogil=True): 
      # b should be local private. 
      b = a + j 

      # params also should be local private 
      params[0] = a; params[1] = b 

      # gsl_func is shared, only its params differ. 
      # According to the gsl manual(see follow link), workspaces should be allocated on a per-thread basis, but how? 
      gsl_func.params = params 
      gsl_integration_qag(&gsl_func, 0.0, 1.0, 1e-8, 1e-8, space_size, GSL_INTEG_GAUSS15, ws, &result, &error) 

      result_sum += result 

     results[i] = result_sum 
     gsl_integration_workspace_free(ws) 
    return mean(results) 

Немного длинный код, только для полностью (для копирования), но я думаю, что довольно просто (читать) (1 • ᴗ • 1)

Затем компилировать & ссылка:

cython -a -2 problem.pyx 

gcc -O3 -fopenmp -c problem.c -o problem.o -IC:\gsl2.1x86\include -IC:\python-2.7.10\include 

gcc -shared -fPIC -fopenmp -o problem.pyd -LC:\gsl2.1x86\libs -LC:\python-2.7.10\libs problem.o -l:libgsl.a -l:libgslcblas.dll.a -l:libpython27.dll.a 

В IPython:

In [1]: from problem import * 
In [2]: use_gsl_integration_sequential() 
Out[2]: 7.2 

Результат use_gsl_integration_parallel() не определен, я пробовал несколько раз, в лучшем случае, он сбой, с некоторым удачливым, получил неопределенное значение, поэтому должно быть что-то не так! Я просто не могу найти такой параллельный пример. Кто-нибудь мне поможет?

Окружающая среда:

win10-64bit, gcc (tdm-1) 5.1.0 32bit, python-2.7.10 32bit, cython0.24, gsl-2.1

Некоторые полезные ссылки ?:

https://github.com/twiecki/CythonGSL

https://www.gnu.org/software/gsl/manual/html_node/Thread_002dsafety.html#Thread_002dsafety

(извините, не можете больше информации, две ссылки мой предел. ..

+0

Я думаю, что 'gsl_func' также должен быть локальным. Я также не на 100% уверен, что он будет обрабатывать «результат» как локальный здесь (я не знаю, что это не так!) – DavidW

+0

Действительно, что меня смутило, так это то, где «распределять» эти вещи (gsl_function, params, рабочее пространство). На первый взгляд, это кажется легким - легко параллельным, но я ошибаюсь. Кроме того, поддержка cython [omp cython] (https://github.com/cython/cython/wiki/enhancements-prange#simple-usecases) довольно слабая и неявная (нарушает «Явный лучше, чем неявный»). Я начинаю сомневаться, возможен ли вопрос в цитоне. На самом деле, любой пример кода C мог бы помочь (хотя, я не знаю, как использовать omp в C.) – oz1

+0

Я думаю, вы используете параллельный блок, выделите его там. Затем используйте prange внутри параллельного блока. Я не могу проверить это сейчас, хотя я не могу обещать, что это правильно. – DavidW

ответ

0

Когда я протестировал этот код, у меня возникла ошибка сегментации (linux, gcc 4.9.2).

Я думаю, что исходный код имеет следующие проблемы

  • четыре нити разделяет несколько переменных, каждый из них должен иметь свое собственное
  • же рабочее пространство, используемые в этих нитях

Следующая версия дает 7.2. Является ли последовательная версия на самом деле правильно, так как он не установлен result_sum = 0.0 в

for i in range(3): 
    a += da 
    result_sum = 0.0 
from openmp cimport omp_get_max_threads, omp_get_thread_num 

def use_gsl_integration_parallel(): 
    cdef double result 
    cdef Py_ssize_t i, j 
    cdef double result_sum 
    cdef double results[3] 
    cdef double results2[4] 
    cdef double a 
    cdef double b[4] 
    cdef double error[4] 
    cdef double da = 0.1 
    cdef size_t space_size = 1000 

    cdef gsl_function gsl_func[4] 
    cdef double params[4][2] 
    cdef int tid 

    cdef gsl_integration_workspace ** ws = <gsl_integration_workspace **>malloc(omp_get_max_threads() * sizeof(gsl_integration_workspace *)) 

    for j in range(omp_get_max_threads()): 
     ws[j] = gsl_integration_workspace_alloc(space_size) 

    for i in range(3): 
     a += da 

     for j in prange(4, nogil=True): 
      tid = omp_get_thread_num() 
      b[tid] = a + j 

      params[tid][0] = a; 
      params[tid][1] = b[tid] 

      gsl_func[tid].function = &integrand 
      gsl_func[tid].params = params[tid] 
      gsl_integration_qag(&gsl_func[tid], 0.0, 1.0, 1e-8, 1e-8, 
           space_size, GSL_INTEG_GAUSS15, ws[tid], 
           &results2[j], &error[j]) 
      # printf("tid %d\n", tid) 
     results[i] = sum(results2) 

    for j in range(omp_get_max_threads()): 
     gsl_integration_workspace_free(ws[j]) 
    free(ws) 

    return mean(results) 
+0

Прохладный! Я был несколько введен в заблуждение от примера cython prange, я думаю, что они (params, gsl_func, ...) будут автоматически сделаны локальными частными, а затем «скопированы» по каждому потоку (простить такую ​​дураковую мысль). Я приступил к реформированию своих кодов, теперь я столкнулся с новой проблемой: такая интеграция будет называться сильно и часто и, следовательно, не может позволить себе malloc и освобождать эти огромные массивы (результаты, параметры) локально, моя идея заключается в распределении их где-то еще один раз, а затем просто проверяя этот буфер в интеграции, наконец, освобождая их где-то должным образом. Любое лучшее предложение @@J.J. Hakala – oz1

+0

, т. Е. В приведенном выше примере измените цикл на: 'для i в диапазоне (1000): ... для j в диапазоне (1000000), ...', как правильно обрабатывать эти массивы (╹◡╹) – oz1

+0

@ oz1 Я переместил 'gsl_integration_workspace_alloc' из цикла в' use_gsl_integration_parallel() ', поскольку они многократно используются, а выделение/удаление их повторно неоднократно не было хорошей идеей. Другие распределения пространства для массивов представляют собой распределения стека, поэтому они не являются дорогостоящими. Это распределение рабочей области, вероятно, должно быть только для максимального количества потоков 'openmp.omp_get_max_threads()', а затем внутри цикла 'ws [openmp.omp_get_thread_num()]' вместо 'ws [j] '. –

0

Благодаря @ J.J.Hakala, я, наконец, понять, как сделать эти вещи работу. Я реформировал пример и сделал простой профиль. Я просто разместил его здесь, надеюсь, что это будет полезно для некоторых новичков (как со мной) в научных вычислениях.

Если есть какие-либо ошибки, исправьте меня. Благодаря!

# cython: cdivision=True 
# cython: boundscheck=False 
# cython: wraparound=False 

from libc.stdlib cimport malloc, free 
from libc.math cimport sin 
from cython.parallel import prange 
from cython_gsl cimport * 
cimport openmp 

cdef double arr_sum(double *arr, size_t arr_len): 
    cdef size_t i 
    cdef double sum_ = 0.0 
    for i in range(arr_len): 
     sum_ += arr[i] 
    return sum_ 

cdef double arr_mean(double *arr, size_t arr_len): 
    return arr_sum(arr, arr_len)/arr_len 

# A random chosen integrand function for demo. 
cdef double integrand(double x, void *params) nogil: 
    cdef double a = (<double*>params)[0] 
    cdef double b = (<double*>params)[1] 
    return sin(a*x + b) 

def sequential_solution(int outer_loops, int inner_loops): 
    cdef double result, error 
    cdef size_t i, j 
    cdef double result_sum=0.0, mean_val 
    cdef double *results = <double*>malloc(sizeof(double) * outer_loops) 
    cdef double a=0.0, da = 0.1 
    cdef size_t space_size = 1000 
    cdef gsl_function gsl_func 
    cdef double params[2] 

    gsl_func.function = &integrand 
    cdef gsl_integration_workspace *ws = gsl_integration_workspace_alloc(space_size) 

    for i in range(outer_loops): 
     a += da 
     result_sum = 0.0 
     for j in range(inner_loops): 
      params[0] = a; params[1] = a + j 
      gsl_func.params = params 
      gsl_integration_qag(&gsl_func, 0.0, 1.0, 1e-8, 1e-8, space_size, GSL_INTEG_GAUSS15, ws, &result, &error) 
      result_sum += result 

     results[i] = result_sum 

    mean_val = arr_mean(results, outer_loops) 
    gsl_integration_workspace_free(ws) 
    free(results) 
    return mean_val 


def parallel_solution(int outer_loops, int inner_loops, int num_threads): 
    cdef size_t i, j 
    cdef double a = 0.0 
    cdef double da = 0.1 
    cdef double mean_val = 0.0 
    cdef size_t space_size = 1000 

    # Heavy malloc! 
    cdef double *outer_results = <double*>malloc(sizeof(double) * outer_loops) 
    cdef double *inner_results = <double*>malloc(sizeof(double) * inner_loops) 
    #cdef double *error = <double*>malloc(sizeof(double) * inner_loops) 
    cdef double error # Ignore the integration abserror in parallel 
    cdef gsl_function *gsl_func = <gsl_function*>malloc(sizeof(gsl_function) * inner_loops) 
    cdef double *params = <double*>malloc(sizeof(double) * inner_loops * 2) 
    cdef gsl_integration_workspace **ws = <gsl_integration_workspace**>malloc(sizeof(gsl_integration_workspace*) * num_threads) 
    for i in range(num_threads): 
     ws[i] = gsl_integration_workspace_alloc(space_size) 

    for i in range(outer_loops): 
     a += da 
     for j in prange(inner_loops, nogil=True, num_threads=num_threads): 
      error = 0 # local private. 
      params[2*j] = a; params[2*j+1] = a+j 
      gsl_func[j].function = &integrand 
      gsl_func[j].params = params + 2*j 

      # ws[openmp.omp_get_thread_num()] guarantees each thread shares it's own workspace, 
      # since workspace is resuable in a thread. If two thread access the same workspace, data will corrupt. 
      gsl_integration_qag(&gsl_func[j], 0.0, 1.0, 1e-8, 1e-8, 
           space_size, GSL_INTEG_GAUSS15, ws[openmp.omp_get_thread_num()], 
           &inner_results[j], &error) 

     outer_results[i] = arr_sum(inner_results, inner_loops) 

    mean_val = arr_mean(outer_results, outer_loops) 

    # Now, free 
    for i in range(num_threads): 
     gsl_integration_workspace_free(ws[i]) 
    free(ws) 
    free(params) 
    free(gsl_func) 
    #free(error) 
    free(inner_results) 
    free(outer_results) 
    return mean_val 

def parallel_solution_ver(int outer_loops, int inner_loops, int num_threads): 
    cdef size_t i, j 
    cdef double a = 0.0 
    cdef double da = 0.1 
    cdef double mean_val = 0.0 
    cdef size_t space_size = 1000 

    cdef double *outer_results = <double*>malloc(sizeof(double) * outer_loops) 
    #cdef double *inner_results = <double*>malloc(sizeof(double) * inner_loops) 
    cdef double inner_result, inner_result_sum # no need to malloc a inner_results 
    #cdef double *error = <double*>malloc(sizeof(double) * inner_loops) 
    cdef double error # Ignore the integration abserror in parallel 
    cdef gsl_function *gsl_func = <gsl_function*>malloc(sizeof(gsl_function) * inner_loops) 
    cdef double *params = <double*>malloc(sizeof(double) * inner_loops * 2) 
    cdef gsl_integration_workspace **ws = <gsl_integration_workspace**>malloc(sizeof(gsl_integration_workspace*) * num_threads) 
    for i in range(num_threads): 
     ws[i] = gsl_integration_workspace_alloc(space_size) 

    for i in range(outer_loops): 
     a += da 
     inner_result_sum = 0.0 # reduction(+: inner_result_sum) 
     for j in prange(inner_loops, nogil=True, num_threads=num_threads): 
      inner_result = 0.0 # local private. 
      error = 0 # local private. 
      params[2*j] = a; params[2*j+1] = a+j 
      gsl_func[j].function = &integrand 
      gsl_func[j].params = params + 2*j 

      # ws[openmp.omp_get_thread_num()] guarantees each thread shares it's own workspace, 
      # since workspace is resuable in a thread. If two thread access the same workspace, data will corrupt. 
      gsl_integration_qag(&gsl_func[j], 0.0, 1.0, 1e-8, 1e-8, 
           space_size, GSL_INTEG_GAUSS15, ws[openmp.omp_get_thread_num()], 
           &inner_result, &error) 
      inner_result_sum += inner_result 

     outer_results[i] = inner_result_sum 

    mean_val = arr_mean(outer_results, outer_loops) 

    # Now, free 
    for i in range(num_threads): 
     gsl_integration_workspace_free(ws[i]) 
    free(ws) 
    free(params) 
    free(gsl_func) 
    #free(error) 
    #free(inner_results) 
    free(outer_results) 
    return mean_val 

обновление: нет необходимости таНоса в results массиве, добавлено обновленное решение parallel_solution_ver который я думаю, что лучше, хотя он имеет почти такую ​​же эффективность с parallel_solution

В IPython Notebook:

%matplotlib inline 
from problem import * 
import matplotlib.pyplot as plt 

max_exp = 20 

times_seq = [] 
for loops in [2**n for n in range(max_exp)]: 
    res = %timeit -n 1 -r 5 -o sequential_solution(10, loops) 
    times_seq.append((loops, res)) 

times_p = [] 
for loops in [2**n for n in range(max_exp)]: 
    res = %timeit -n 1 -r 5 -o parallel_solution(10, loops, 8) 
    times_p.append((loops, res)) 

times_p_ver = [] 
for loops in [2**n for n in range(max_exp)]: 
    res = %timeit -n 1 -r 5 -o parallel_solution_ver(10, loops, 8) 
    times_p_ver.append((loops, res)) 

time_results = [times_seq, times_p, times_p_ver] 
labels = ["sequential solution", "parallel solution", "parallel solution_ver"] 
colors = ['r', 'y', 'g'] 
markers = ['s', '+', 'o'] 
line_params = [ 
    {"sequential solution": {"color": "r", "marker": "s", "ms": 6}}, 
    {"parallel solution": {"color": "g", "marker": "+", "ms": 20, "mew": 1}}, 
    {"parallel solution_ver": {"color": "b", "marker": "o", "ms": 6}} 
] 

for results, param in zip(time_results, line_params): 
    loops = [res[0] for res in results] 
    times = [res[1].best for res in results] 
    plt.loglog(loops, times, label=param.keys()[0], **param.values()[0]) 
plt.xlabel("inner loops") 
plt.ylabel("exec time (ms)") 
plt.legend(loc=0) 

print "loops  sequential parallel parallel_ver" 
for s, p, p_v in zip(times_seq, times_p, times_p): 
    print "n = {:6d} {:f}s {:f}s {:f}s".format(s[0], s[1].best, p[1].best, p_v[1].best) 

# Same result? Yes! 
print sequential_solution(10, 2**19) # 0.0616505777102 
print parallel_solution(10, 2**19, 8) # 0.0616505777102 
print parallel_solution_ver(10, 2**19, 8) # 0.0616505777102 

loops  sequential parallel parallel_ver 
n =  1 0.000009s 0.001450s 0.001450s 
n =  2 0.000016s 0.001435s 0.001435s 
n =  4 0.000032s 0.001447s 0.001447s 
n =  8 0.000063s 0.001454s 0.001454s 
n =  16 0.000125s 0.001396s 0.001396s 
n =  32 0.000251s 0.001389s 0.001389s 
n =  64 0.000505s 0.001387s 0.001387s 
n = 128 0.001016s 0.001360s 0.001360s 
n = 256 0.002039s 0.001683s 0.001683s 
n = 512 0.003846s 0.001844s 0.001844s 
n = 1024 0.007717s 0.002416s 0.002416s 
n = 2048 0.015395s 0.003575s 0.003575s 
n = 4096 0.030817s 0.006055s 0.006055s 
n = 8192 0.061689s 0.010820s 0.010820s 
n = 16384 0.123445s 0.020872s 0.020872s 
n = 32768 0.246928s 0.040165s 0.040165s 
n = 65536 0.493903s 0.079077s 0.079077s 
n = 131072 0.987832s 0.156176s 0.156176s 
n = 262144 1.975107s 0.311151s 0.311151s 
n = 524288 3.949421s 0.617181s 0.617181s