2015-10-23 2 views
1

Я попытался поставить цикл for параллельно, чтобы ускорить некоторый код. считают это:Python multiprocessing.Pool.map умирает молча

from multiprocessing import Pool 

results = [] 

def do_stuff(str): 
    print str 
    results.append(str) 

p = Pool(4) 
p.map(do_stuff, ['str1','str2','str3',...]) # many strings here ~ 2000 
p.close() 

print results 

У меня есть некоторые отладочные сообщения, показывающие от do_stuff отслеживать, насколько программа получает перед смертью. Кажется, он умирает в разных точках каждый раз. Например, он напечатает «str297», а затем он просто перестанет работать, я увижу, что все процессоры перестают работать, и программа просто сидит там. Должно возникнуть некоторая ошибка, но нет сообщения об ошибке. Кто-нибудь знает, как отладить эту проблему?

UPDATE

Я попытался повторно работать Кодекса немного. Вместо того, чтобы использовать функцию map Я попробовал apply_async функцию:

 pool = Pool(5) 
     results = pool.map(do_sym, underlyings[0::10]) 
     results = [] 
     for sym in underlyings[0::10]: 
      r = pool.apply_async(do_sym, [sym]) 
      results.append(r) 

     pool.close() 
     pool.join() 

     for result in results: 
      print result.get(timeout=1000) 

Это работало так же хорошо, как функции map, но в конечном итоге висит таким же образом. Он никогда не попадет в цикл for, где он печатает результаты.

После того, как вы немного поработали над этим, и попробовав некоторые отладочные записи, как это было предложено в ответе unutbu, я приведу здесь дополнительную информацию. Проблема очень странная. Кажется, что пул просто висит там и не может закрыть и продолжить программу. Я использую среду PyDev для тестирования своих программ, но я думал, что попробую запустить python в консоли. В консоли я получаю такое же поведение, но когда я нажимаю Ctrl + C, чтобы убить программу, я получаю некоторый вывод, который мог бы объяснить, где проблема:

> KeyboardInterrupt ^CProcess PoolWorker-47: Traceback (most recent call 
> last): File "/usr/lib/python2.7/multiprocessing/process.py", line 
> 258, in _bootstrap Process PoolWorker-48: Traceback (most recent call 
> last): File "/usr/lib/python2.7/multiprocessing/process.py", line 
> 258, in _bootstrap Process PoolWorker-45: Process PoolWorker-46: 
> Process PoolWorker-44: 
>  self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run 
>  self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker 
> Traceback (most recent call last): Traceback (most recent call last): 
> Traceback (most recent call last): File 
> "/usr/lib/python2.7/multiprocessing/process.py", line 258, in 
> _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File 
> "/usr/lib/python2.7/multiprocessing/process.py", line 258, in 
> _bootstrap 
>  task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get 
>  self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run 
>  racquire() 
>  self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker 
> KeyboardInterrupt 
>  task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get 
>  self.run() 
>  self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run 
>  self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run File 
> "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run 
>  self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker 
>  self._target(*self._args, **self._kwargs) 
>  self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker 
>  racquire() File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker KeyboardInterrupt 
>  task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get 
>  task = get() 
>  task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get 
> File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get 
>  racquire() 
>  return recv() 
>  racquire() KeyboardInterrupt KeyboardInterrupt KeyboardInterrupt 

Тогда на самом деле программа никогда не умирает. Мне нужно закрыть окно терминала, чтобы убить его.

UPDATE 2

я сузил проблему внутри функции, которая выполняется в бассейне, и это была транзакция базы данных MySQL, которая была причина проблемы. Раньше я использовал пакет MySQLdb. Я включил функцию a pandas.read_sql для транзакции, и теперь она работает.

+2

'results' не будет использоваться совместно между процессами. Кроме того, каждый процесс будет реимпортировать модуль и создать новую функцию «Пул» и «Карта». Вам нужно установить их в отдельный блок '__main__'. См. [Docs] (https://docs.python.org/2/library/multiprocessing.html#windows). –

+0

результаты не обязательно должны быть разделены, пока все результаты будут добавлены в конце концов. Это должно работать, не так ли? –

ответ

3

pool.map возвращает результаты в списке. Таким образом, вместо вызова results.append в параллельных процессах (которые не будут работать, так как каждый процесс будет иметь свою собственную независимую копию results), присвоить results к значению, возвращаемый pool.map в основном процессе:

import multiprocessing as mp 

def do_stuff(text): 
    return text 

if __name__ == '__main__': 
    p = mp.Pool(4) 
    tasks = ['str{}'.format(i) for i in range(2000)] 
    results = p.map(do_stuff, tasks) 
    p.close() 

    print(results) 

урожайность

['str0', 'str1', 'str2', 'str3', ...] 

Один из способов отладки сценариев, которые используют многопроцессорность, чтобы добавить заявление протоколирования. Модуль multiprocessing обеспечивает вспомогательную функцию, mp.log_to_stderr, для этой цели.Например,

import multiprocessing as mp 
import logging 

logger = mp.log_to_stderr(logging.DEBUG) 

def do_stuff(text): 
    logger.info('Received {}'.format(text)) 
    return text 

if __name__ == '__main__': 
    p = mp.Pool(4) 
    tasks = ['str{}'.format(i) for i in range(2000)] 
    results = p.map(do_stuff, tasks) 
    p.close() 

    logger.info(results) 

, который дает выходной сигнал, как при входе:

[DEBUG/MainProcess] created semlock with handle 139824443588608 
[DEBUG/MainProcess] created semlock with handle 139824443584512 
[DEBUG/MainProcess] created semlock with handle 139824443580416 
[DEBUG/MainProcess] created semlock with handle 139824443576320 
[DEBUG/MainProcess] added worker 
[INFO/PoolWorker-1] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[INFO/PoolWorker-2] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[INFO/PoolWorker-3] child process calling self.run() 
[DEBUG/MainProcess] added worker 
[INFO/PoolWorker-4] child process calling self.run() 
[INFO/PoolWorker-1] Received str0 
[INFO/PoolWorker-2] Received str125 
[INFO/PoolWorker-3] Received str250 
[INFO/PoolWorker-4] Received str375 
[INFO/PoolWorker-3] Received str251 
... 
[INFO/PoolWorker-4] Received str1997 
[INFO/PoolWorker-4] Received str1998 
[INFO/PoolWorker-4] Received str1999 
[DEBUG/MainProcess] closing pool 
[INFO/MainProcess] ['str0', 'str1', 'str2', 'str3', ...] 
[DEBUG/MainProcess] worker handler exiting 
[DEBUG/MainProcess] task handler got sentinel 
[INFO/MainProcess] process shutting down 
[DEBUG/MainProcess] task handler sending sentinel to result handler 
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0 
[DEBUG/MainProcess] finalizing pool 
[DEBUG/MainProcess] task handler sending sentinel to workers 
[DEBUG/MainProcess] helping task handler/workers to finish 
[DEBUG/MainProcess] result handler got sentinel 
[DEBUG/PoolWorker-3] worker got sentinel -- exiting 
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished 
[DEBUG/MainProcess] ensuring that outqueue is not full 
[DEBUG/MainProcess] task handler exiting 
[DEBUG/PoolWorker-3] worker exiting after 2 tasks 
[INFO/PoolWorker-3] process shutting down 
[DEBUG/MainProcess] result handler exiting: len(cache)=0, thread._state=0 
[DEBUG/PoolWorker-3] running all "atexit" finalizers with priority >= 0 
[DEBUG/MainProcess] joining worker handler 
[DEBUG/MainProcess] terminating workers 
[DEBUG/PoolWorker-3] running the remaining "atexit" finalizers 
[DEBUG/MainProcess] joining task handler 
[DEBUG/MainProcess] joining result handler 
[DEBUG/MainProcess] joining pool workers 
[DEBUG/MainProcess] cleaning up worker 4811 
[DEBUG/MainProcess] running the remaining "atexit" finalizers 

Обратите внимание, что каждая строка указывает, какой процесс, излучаемый запись журнала. Таким образом, вывод в некоторой степени преобразует порядок событий из ваших параллельных процессов.

При разумном размещении звонков logging.info вы должны уметь сузить место и, возможно, почему ваш скрипт «умирает молча» (или, по крайней мере, он не будет настолько тихим, как он умирает).

+0

Спасибо, что это отличный insite, и я уже изменил свой код, чтобы работать так, к сожалению, функция карты все еще умирает молча, и я точно не знаю почему. –

+0

Можете ли вы опубликовать runnable пример, который «умирает молча»? – unutbu

+0

Наверное, нет, функция, выполняющая итерацию, довольно длинная. Я просто хочу, чтобы я мог получить сообщение об ошибке или что-то в этом роде. С поверхности он просто выглядит так, будто полностью останавливает петлю. Как и у вас там, я печатаю результаты как сразу после pool.close(). Результаты печатаются несколько раз, а в других случаях их нет. Я попытаюсь прибить пример. Спасибо за помощь. –

Смежные вопросы