2013-04-15 9 views
11

Это должен быть мой третий и последний вопрос, касающийся моих попыток повысить производительность некоторых статистических анализов, которые я делаю с помощью python. У меня есть 2 версии моего кода (одноядерный или многопроцессорный), я ожидал получить производительность, используя несколько ядер, поскольку я ожидаю, что мой код распакует/распаковывает несколько двоичных строк, к сожалению, я заметил, что производительность фактически уменьшилась за счет использования нескольких сердечники.Производительность многопроцессорности Python

Мне интересно, есть ли у кого есть возможное объяснение того, что я наблюдаю (прокрутите вниз до обновления 16 апреля для получения дополнительной информации)?

Ключевая часть программы является функция numpy_array (+ декодирования в многопроцессорной), фрагмент кода ниже (полный код, доступного через Pastebin, далее ниже):

def numpy_array(data, peaks): 
    rt_counter=0 
    for x in peaks: 
     if rt_counter %(len(peaks)/20) == 0: 
      update_progress() 
     peak_counter=0 
     data_buff=base64.b64decode(x) 
     buff_size=len(data_buff)/4 
     unpack_format=">%dL" % buff_size 
     index=0 
     for y in struct.unpack(unpack_format,data_buff): 
      buff1=struct.pack("I",y) 
      buff2=struct.unpack("f",buff1)[0] 
      if (index % 2 == 0): 
       data[rt_counter][1][peak_counter][0]=float(buff2) 
      else: 
       data[rt_counter][1][peak_counter][1]=float(buff2) 
       peak_counter+=1 
      index+=1 
     rt_counter+=1 

версия многопроцессорной выполняет это с набором функции, я буду показывать ключ 2 ниже:

def tonumpyarray(mp_arr): 
    return np.frombuffer(mp_arr.get_obj()) 

def numpy_array(shared_arr,peaks): 
    processors=mp.cpu_count() 
    with contextlib.closing(mp.Pool(processes=processors, 
            initializer=pool_init, 
            initargs=(shared_arr,))) as pool: 
     chunk_size=int(len(peaks)/processors) 
     map_parameters=[] 
     for i in range(processors): 
      counter = i*chunk_size 
      chunk=peaks[i*chunk_size:(i+1)*chunk_size] 
      map_parameters.append((chunk, counter)) 
     pool.map(decode,map_parameters) 

def decode ((chunk, counter)): 
    data=tonumpyarray(shared_arr).view(
     [('f0','<f4'), ('f1','<f4',(250000,2))]) 
    for x in chunk: 
     peak_counter=0 
     data_buff=base64.b64decode(x) 
     buff_size=len(data_buff)/4 
     unpack_format=">%dL" % buff_size 
     index=0 
     for y in struct.unpack(unpack_format,data_buff): 
      buff1=struct.pack("I",y) 
      buff2=struct.unpack("f",buff1)[0] 
      #with shared_arr.get_lock(): 
      if (index % 2 == 0): 
       data[counter][1][peak_counter][0]=float(buff2) 
      else: 
       data[counter][1][peak_counter][1]=float(buff2) 
       peak_counter+=1 
      index+=1 
     counter+=1 

Полные программные коды могут быть доступны через эти Pastebin ссылки

Pastebin for single core version

Pastebin for multiprocessing version

Спектакль, который я наблюдаю с файлом, содержащим 239 временных точек и ~ пар измерений 180k на временной точке составляет ~ 2,5м для одного ядра и ~ 3,5 для многопроцессорной.

PS: Предыдущие два вопроса (из моих первых когда-либо попытки paralellization):

  1. Python multi-processing
  2. Making my NumPy array shared across processes

- 16 апреля -

Я профилировал свою программу с помощью t он Cprofile библиотека (с cProfile.run('main()') в __main__, который показывает, что существует один шаг, который замедляет все вниз:

ncalls tottime percall cumtime percall filename:lineno(function) 
23 85.859 3.733 85.859 3.733 {method 'acquire' of 'thread.lock' objects} 

Дело в том, что я не понимаю, в том, что thread.lock объекты используются в threading (в моем понимании), но не следует использовать для многопроцессорности, поскольку каждое ядро ​​должно запускать один поток (помимо собственного механизма блокировки), так как это происходит, и почему один вызов занимает 3,7 секунды?

+1

Вы можете поделиться ссылками на свои предыдущие вопросы в этом вопросе? и вставьте функции, которые, по вашему мнению, важны для самого вопроса. – 0x90

+0

Offcourse, позвольте мне изменить вопрос. –

+0

Это очень хорошо могло иметь какое-то отношение к GIL. Ознакомьтесь с этой презентацией: http://www.youtube.com/watch?v=Obt-vMVdM8s – alexhb

ответ

2

Общие данные - это известный случай замедлений из-за синхронизации.

Можете ли вы разделить свои данные между процессами или предоставить каждому процессу независимую копию? Тогда вашим процессам не потребуется синхронизировать что-либо до момента, когда все вычисления будут выполнены.

Тогда я бы позволил мастер-процессу объединить выходные данные всех рабочих процессоров в один когерентный набор.

Подход может занять дополнительную оперативную память, но оперативная память сейчас дешевая.

Если вы спросите, я также озадачен 3700 мс за захват блокировки потоков. Профилирование OTOH может быть ошибочно принято в отношении таких специальных вызовов.

+0

Я мог бы разбить массив данных на n копий, но мне все же хотелось бы понять, почему и как весь процесс занимает так много времени. Вы бы посоветовали использовать другой способ профилирования? –

0

Ваши пустые пустые.

Проблема в том, что для многопроцессорности используется fork, если она доступна (вместо того, чтобы порождать новый процесс python). Процесс Forked имеет один и тот же env (например, файловые дескрипторы). Может быть, у них есть некоторые замки среди них.

Вот некоторое разочарование о том, что: Multiprocessing or os.fork, os.exec?

0

Что касается последней части вашего вопроса, то документы Python в основном говорят, что multiprocessing.lock клон threading.lock. Приобретение вызовов на замках может занять много времени, потому что если блокировка уже получена, она будет блокироваться до тех пор, пока блокировка не будет отпущена. Это может стать проблемой, когда несколько процессов конкурируют за доступ к тем же данным, как в вашем коде. Поскольку я не могу просмотреть ваш pastebin, я могу только догадываться, что именно происходит, но, скорее всего, вы процессы приобретаете блокировку в течение длительных периодов времени, что останавливает другие процессы от запуска, даже если есть много свободное время процессора. Это не должно зависеть от GIL, поскольку это должно ограничивать только многопоточные приложения, а не многопроцессорные. Итак, как это исправить? Я предполагаю, что у вас есть какая-то блокировка, защищающая ваш общий массив, который остается заблокированным, в то время как процесс проводит интенсивные вычисления, которые занимают относительно длительное время, поэтому запрет доступа для других процессов, которые впоследствии блокируют их lock.acquire() звонки. Предполагая, что у вас достаточно ОЗУ, я решительно рекомендую ответ, который предлагает хранить несколько копий массива в адресном пространстве каждого процесса. Однако просто отметьте, что передача больших структур данных через карту может вызвать неожиданные узкие места, поскольку для этого требуется сбор и депиляция.

Смежные вопросы