2013-03-12 4 views
11

Я искал хорошую реализацию простого пула пулов python и действительно не могу найти ничего, что бы соответствовало моим потребностям. Я использую python 2.7, и все модули, которые я нашел, либо не работают, либо не обрабатывают исключения в рабочих правильно. Мне было интересно, знал ли кто-нибудь о библиотеке, которая могла бы предложить тип функциональности, которую я ищу. Помощь очень ценится.Пул потоков Python, который обрабатывает исключения

Multiprocessing

Моя первая попытка была с помощью встроенного модуля multiprocessing, но так как это не использовать потоки, но подпроцессы вместо этого мы столкнулись с проблемой, что объекты не могут быть маринованный. Не иди сюда.

from multiprocessing import Pool 

class Sample(object): 
    def compute_fib(self, n): 
     phi = (1 + 5**0.5)/2 
     self.fib = int(round((phi**n - (1-phi)**n)/5**0.5)) 

samples = [Sample() for i in range(8)] 
pool = Pool(processes=8) 
for s in samples: pool.apply_async(s.compute_fib, [20]) 
pool.join() 
for s in samples: print s.fib 

# PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed 

Futures

Так что я вижу, есть задний порт некоторых из прохладных параллельных функций питона 3.2 here. Это кажется идеальным и простым в использовании. Проблема в том, что когда вы получаете исключение в одном из рабочих, вы получаете только тип исключения, такого как «ZeroDivisionError», но не трассируетесь и, следовательно, не указали, какая строка вызвала исключение. Код становится невозможным для отладки. Нет.

from concurrent import futures 

class Sample(object): 
    def compute_fib(self, n): 
     phi = (1 + 5**0.5)/2 
     1/0 
     self.fib = int(round((phi**n - (1-phi)**n)/5**0.5)) 

samples = [Sample() for i in range(8)] 
pool = futures.ThreadPoolExecutor(max_workers=8) 
threads = [pool.submit(s.compute_fib, 20) for s in samples] 
futures.wait(threads, return_when=futures.FIRST_EXCEPTION) 
for t in threads: t.result() 
for s in samples: print s.fib 


# futures-2.1.3-py2.7.egg/concurrent/futures/_base.pyc in __get_result(self) 
# 354  def __get_result(self): 
# 355   if self._exception: 
#--> 356    raise self._exception 
# 357   else: 
# 358    return self._result 
# 
# ZeroDivisionError: integer division or modulo by zero 

Workerpool

Я нашел другую реализацию этой модели here. На этот раз, когда возникает исключение, оно печатается, но тогда мой интерактивный интерпретатор ipython остается в зависании и его нужно убить из другой оболочки. Нет.

import workerpool 

class Sample(object): 
    def compute_fib(self, n): 
     phi = (1 + 5**0.5)/2 
     1/0 
     self.fib = int(round((phi**n - (1-phi)**n)/5**0.5)) 

samples = [Sample() for i in range(8)] 
pool = workerpool.WorkerPool(size=8) 
for s in samples: pool.map(s.compute_fib, [20]) 
pool.wait() 
for s in samples: print s.fib 

# ZeroDivisionError: integer division or modulo by zero 
# ^C^C^C^C^C^C^C^C^D^D 
# $ kill 1783 

Threadpool

Еще прочей реализации here. На этот раз, когда возникает исключение, оно печатается на stderr, но сценарий не прерывается и вместо этого продолжает выполнение, что не соответствует цели исключения и может сделать вещи небезопасными. Все еще не годный к употреблению.

import threadpool 

class Sample(object): 
    def compute_fib(self, n): 
     phi = (1 + 5**0.5)/2 
     1/0 
     self.fib = int(round((phi**n - (1-phi)**n)/5**0.5)) 

samples = [Sample() for i in range(8)] 
pool = threadpool.ThreadPool(8) 
requests = [threadpool.makeRequests(s.compute_fib, [20]) for s in samples] 
requests = [y for x in requests for y in x] 
for r in requests: pool.putRequest(r) 
pool.wait() 
for s in samples: print s.fib 

# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
# ZeroDivisionError: integer division or modulo by zero 
#---> 17 for s in samples: print s.fib 
# 
#AttributeError: 'Sample' object has no attribute 'fib' 

- Обновление -

Представляется, что в отношении futures библиотеки, поведение Python 3 не то же самое, как питон 2.

futures_exceptions.py:

from concurrent.futures import ThreadPoolExecutor, as_completed 

def div_zero(x): 
    return x/0 

with ThreadPoolExecutor(max_workers=4) as executor: 
    futures = executor.map(div_zero, range(4)) 
    for future in as_completed(futures): print(future) 

Python 2.7.6 мощность:

Traceback (most recent call last): 
    File "...futures_exceptions.py", line 12, in <module> 
    for future in as_completed(futures): 
    File "...python2.7/site-packages/concurrent/futures/_base.py", line 198, in as_completed 
    with _AcquireFutures(fs): 
    File "...python2.7/site-packages/concurrent/futures/_base.py", line 147, in __init__ 
    self.futures = sorted(futures, key=id) 
    File "...python2.7/site-packages/concurrent/futures/_base.py", line 549, in map 
    yield future.result() 
    File "...python2.7/site-packages/concurrent/futures/_base.py", line 397, in result 
    return self.__get_result() 
    File "...python2.7/site-packages/concurrent/futures/_base.py", line 356, in __get_result 
    raise self._exception 
ZeroDivisionError: integer division or modulo by zero 

Python 3.3.2 выходные:

Traceback (most recent call last): 
    File "...futures_exceptions.py", line 11, in <module> 
    for future in as_completed(futures): 
    File "...python3.3/concurrent/futures/_base.py", line 193, in as_completed 
    with _AcquireFutures(fs): 
    File "...python3.3/concurrent/futures/_base.py", line 142, in __init__ 
    self.futures = sorted(futures, key=id) 
    File "...python3.3/concurrent/futures/_base.py", line 546, in result_iterator 
    yield future.result() 
    File "...python3.3/concurrent/futures/_base.py", line 392, in result 
    return self.__get_result() 
    File "...python3.3/concurrent/futures/_base.py", line 351, in __get_result 
    raise self._exception 
    File "...python3.3/concurrent/futures/thread.py", line 54, in run 
    result = self.fn(*self.args, **self.kwargs) 
    File "...futures_exceptions.py", line 7, in div_zero 
    return x/0 
ZeroDivisionError: division by zero 
+0

Это не полностью решить эту проблему, но один трюк я часто используется при отладке этих проблем временно заменить вызов 'pool.map' с вызовом встроенная «карта». –

ответ

0

Простое решение: использование независимо от альтернативных подходит вам лучше всего, и реализовать свой собственный try-except блок в ваших работников. Если вам нужно, вызовите корневой вызов.

Я бы не сказал, что эти библиотеки обрабатывают исключения «неправильно». Они имеют поведение по умолчанию, но примитивное. Вы должны будете справиться с этим сами, если по умолчанию вас не устраивает.

+0

Добавление блока 'try-execpt' не может решить ни одну из проблем. В случае 'concurrent', я все еще не могу добраться до исходной трассировки после обнаружения нового исключения. В случае «рабочего пула» я никогда не попадаю в блок исключений, так как раньше прерыватель прерывался. В случае 'threadpool' я никогда не попадаю в блок исключений, так как никаких исключений не возникает вообще. – xApple

+1

Вы думаете о блоке 'try' в основном потоке или процессе. Я говорю, что вы используете блок 'try' вокруг рабочих процессов функции. Если вы ожидаете «поднять» исключение в рабочем потоке/процессе и отправить его на ваш основной скрипт, вам нужно сначала поймать его там, где оно произошло. – slezica

+0

Ну, я не собираюсь писать обработку ошибок для каждой из функций, которые я хочу запустить. Итак, вы говорите, что я должен написать свою собственную глобальную обработку ошибок. Да, я мог просто выбрать одну из библиотек и начать редактирование исходного кода, чтобы добавить функциональность, но этого я и хотел избежать:) – xApple