Это должен быть мой третий и последний вопрос, касающийся моих попыток повысить производительность некоторых статистических анализов, которые я делаю с помощью python. У меня есть 2 версии моего кода (одноядерный или многопроцессорный), я ожидал получить производительность, используя несколько ядер, поскольку я ожидаю, что мой код распакует/распаковывает несколько двоичных строк, к сожалению, я заметил, что производительность фактически уменьшилась за счет использования нескольких сердечники.Производительность многопроцессорности Python
Мне интересно, есть ли у кого есть возможное объяснение того, что я наблюдаю (прокрутите вниз до обновления 16 апреля для получения дополнительной информации)?
Ключевая часть программы является функция numpy_array (+ декодирования в многопроцессорной), фрагмент кода ниже (полный код, доступного через Pastebin, далее ниже):
def numpy_array(data, peaks):
rt_counter=0
for x in peaks:
if rt_counter %(len(peaks)/20) == 0:
update_progress()
peak_counter=0
data_buff=base64.b64decode(x)
buff_size=len(data_buff)/4
unpack_format=">%dL" % buff_size
index=0
for y in struct.unpack(unpack_format,data_buff):
buff1=struct.pack("I",y)
buff2=struct.unpack("f",buff1)[0]
if (index % 2 == 0):
data[rt_counter][1][peak_counter][0]=float(buff2)
else:
data[rt_counter][1][peak_counter][1]=float(buff2)
peak_counter+=1
index+=1
rt_counter+=1
версия многопроцессорной выполняет это с набором функции, я буду показывать ключ 2 ниже:
def tonumpyarray(mp_arr):
return np.frombuffer(mp_arr.get_obj())
def numpy_array(shared_arr,peaks):
processors=mp.cpu_count()
with contextlib.closing(mp.Pool(processes=processors,
initializer=pool_init,
initargs=(shared_arr,))) as pool:
chunk_size=int(len(peaks)/processors)
map_parameters=[]
for i in range(processors):
counter = i*chunk_size
chunk=peaks[i*chunk_size:(i+1)*chunk_size]
map_parameters.append((chunk, counter))
pool.map(decode,map_parameters)
def decode ((chunk, counter)):
data=tonumpyarray(shared_arr).view(
[('f0','<f4'), ('f1','<f4',(250000,2))])
for x in chunk:
peak_counter=0
data_buff=base64.b64decode(x)
buff_size=len(data_buff)/4
unpack_format=">%dL" % buff_size
index=0
for y in struct.unpack(unpack_format,data_buff):
buff1=struct.pack("I",y)
buff2=struct.unpack("f",buff1)[0]
#with shared_arr.get_lock():
if (index % 2 == 0):
data[counter][1][peak_counter][0]=float(buff2)
else:
data[counter][1][peak_counter][1]=float(buff2)
peak_counter+=1
index+=1
counter+=1
Полные программные коды могут быть доступны через эти Pastebin ссылки
Pastebin for single core version
Pastebin for multiprocessing version
Спектакль, который я наблюдаю с файлом, содержащим 239 временных точек и ~ пар измерений 180k на временной точке составляет ~ 2,5м для одного ядра и ~ 3,5 для многопроцессорной.
PS: Предыдущие два вопроса (из моих первых когда-либо попытки paralellization):
- 16 апреля -
Я профилировал свою программу с помощью t он Cprofile библиотека (с cProfile.run('main()')
в __main__
, который показывает, что существует один шаг, который замедляет все вниз:
ncalls tottime percall cumtime percall filename:lineno(function)
23 85.859 3.733 85.859 3.733 {method 'acquire' of 'thread.lock' objects}
Дело в том, что я не понимаю, в том, что thread.lock
объекты используются в threading
(в моем понимании), но не следует использовать для многопроцессорности, поскольку каждое ядро должно запускать один поток (помимо собственного механизма блокировки), так как это происходит, и почему один вызов занимает 3,7 секунды?
Вы можете поделиться ссылками на свои предыдущие вопросы в этом вопросе? и вставьте функции, которые, по вашему мнению, важны для самого вопроса. – 0x90
Offcourse, позвольте мне изменить вопрос. –
Это очень хорошо могло иметь какое-то отношение к GIL. Ознакомьтесь с этой презентацией: http://www.youtube.com/watch?v=Obt-vMVdM8s – alexhb