2015-10-15 4 views
4

Я использую мультипроцессирование таким образом:Почему многопроцессорность.Process.join() висит?

import multiprocessing as mp 

def worker(thread_id, tasks, results): 
    tmp_dir = 'temp_for_{}'.format(thread_id) 
    os.makedirs(tmp_dir) 
    try: 
     while not tasks.empty(): 
      data = tasks.get() 
      response = process_pdf(data, tmp_dir) 
      results.put(response) 
    except (KeyboardInterrupt, SystemExit): 
     log.info('Interrupt signal received in thread %s.', thread_id) 
    except Queue.Empty: 
     pass 
    except Exception: 
     log.error("Unexpected error in %s", thread_id, exc_info=True) 
    finally: 
     shutil.rmtree(tmp_dir) 
     log.info("Thread %s exit", thread_id) 

if __name__ == "__main__": 
    tasks, results = mp.Queue(), mp.Queue() 
    for record in cursor.select(query): 
     tasks.put(record) 
    manager = mp.Manager() 
    workers = [mp.Process(target=worker, args=(i, tasks, results)) for i in xrange(8)] 
    for worker in workers: 
     worker.start() 
    try: 
     for worker in workers: 
      worker.join() 
    except (KeyboardInterrupt, SystemExit): 
     log.info('Interrupt signal received in main. Cleaning up main') 
    finally: 
     log.info('Got %s results. Saving', results.qsize()) 
     while not results.empty(): 
      cursor.update_one('documents', 'id', results.get()) 
     cursor.close() 

Вот вывод, когда я запускаю этот код:

14:34:04 15/10 INFO: Thread 6 exit 
14:34:04 15/10 INFO: Thread 7 exit 
14:34:21 15/10 INFO: Thread 3 exit 
14:34:24 15/10 INFO: Thread 2 exit 
14:34:24 15/10 INFO: Thread 1 exit 
14:34:29 15/10 INFO: Thread 5 exit 
14:34:36 15/10 INFO: Thread 0 exit 
14:35:37 15/10 INFO: Thread 4 exit 

Тогда я вхожу^C, прождав некоторое время без какого-либо прогресса, и получить это выход:

^C14:37:16 15/10 INFO: Interrupt signal received in main. Cleaning up main 
14:37:16 15/10 INFO: Got 16 results. Saving 

И я получаю эту отслеживающий для всех потоков:

Process Process-9: 
Traceback (most recent call last): 
    File "/usr/lib64/python2.7/multiprocessing/process.py", line 261, in _bootstrap 
    util._exit_function() 
    File "/usr/lib64/python2.7/multiprocessing/util.py", line 328, in _exit_function 
    util._exit_function() 
    File "/usr/lib64/python2.7/multiprocessing/util.py", line 274, in _run_finalizers 
    finalizer() 
    File "/usr/lib64/python2.7/multiprocessing/util.py", line 207, in __call__ 
    res = self._callback(*self._args, **self._kwargs) 
    File "/usr/lib64/python2.7/multiprocessing/queues.py", line 218, in _finalize_join 
    thread.join() 
    File "/usr/lib64/python2.7/threading.py", line 952, in join 
    thread.join() 
    File "/usr/lib64/python2.7/threading.py", line 340, in wait 
    waiter.acquire() 
KeyboardInterrupt 

Почему это висит? Если это важно, я могу добавить, что process_pdf() выполняет несколько подпроцессов с subprocess.Popen().

+0

идти ли проблема, если у вас 'get' все из очереди 'results' до вызова' worker.join() 'для всех процессов? – dano

+1

Вне темы: вы управляете рабочим пулом и ставите здесь очереди, чтобы делать то, что «multiprocessing.Pool» будет делать для вас с помощью «imap_unordered». Редизайн для его использования значительно упростит ваш код. – ShadowRanger

+0

@dano Как это может привести к этой проблеме? Это не JoinableQueue –

ответ

5

Большое спасибо dano за подсказку. Исправление этой проблемы является создание очереди с помощью Manager():

manager = mp.Manager() 
tasks, results = manager.Queue(), manager.Queue() 

Редактировать
Tnx к ShadowRanger. Похоже, исключение в отправке фиксированное для 2.7.10 и теперь мы можем использовать multiprocessing.Pool с imap_unordered и не необходимости write wall of code для простой работы :) Но я не пробовал еще

+1

Хотя исключения в отправке фиксируются в 2.7.10, сбор блокировки остается бесперебойным во всех выпусках 2.x, и это большая часть проблемы. Я могу определенно заблокировать даже на 2.7.10 при определенных условиях, потому что специальные потоки не прерываются 'SIGINT', а другие' присоединяются' к ним. – ShadowRanger

+1

@ShadowRanger Этот разговор, UnicodDedecodeError в 2.x :) и новый синтаксис 'yield' в 3.x определенно заставил меня снова рассмотреть вопрос о переходе на Python 3 –

Смежные вопросы