2012-01-10 10 views
136

Прошу прощения, что я не могу воспроизвести ошибку с более простым примером, и мой код слишком сложный для публикации. Если я запускаю программу в оболочке IPython вместо обычного python, все будет хорошо.Ошибка трассировки многопроцессорности Python

Я искал некоторые предыдущие заметки по этой проблеме. Все они были вызваны использованием функции пула для вызова, определенной внутри функции класса. Но это не относится ко мне.

Exception in thread Thread-3: 
Traceback (most recent call last): 
    File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner 
    self.run() 
    File "/usr/lib64/python2.7/threading.py", line 505, in run 
    self.__target(*self.__args, **self.__kwargs) 
    File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks 
    put(task) 
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

Я был бы признателен за любую помощь.

UPDATE: Функция I pickle определена на верхнем уровне модуля. Хотя он вызывает функцию, содержащую вложенную функцию. i.e, f() вызывает g() вызовы h(), которые имеют вложенную функцию i(), и я вызываю pool.apply_async (f). f(), g(), h() определены на верхнем уровне. Я попробовал более простой пример с этим шаблоном, и он работает.

+1

Ответ на верхний уровень/принятый ответ хорош, но это может означать необходимость переструктурирования кода, что может быть болезненным. Я бы рекомендовал всем, у кого есть эта проблема, также прочитать дополнительные ответы с использованием 'dill' и' patos'. Тем не менее, мне не повезло ни с одним из решений при работе с vtkobjects :(Кто-нибудь успел запустить python-код в параллельной обработке vtkPolyData? – Chris

ответ

178

Адрес list of what can be pickled. В частности, функции только разборчивы, если они определены на верхнем уровне модуля.

Этот фрагмент кода:

import multiprocessing as mp 

class Foo(): 
    @staticmethod 
    def work(self): 
     pass 

pool = mp.Pool() 
foo = Foo() 
pool.apply_async(foo.work) 
pool.close() 
pool.join() 

дает ошибку почти идентичный тому, который разместил:

Exception in thread Thread-2: 
Traceback (most recent call last): 
    File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner 
    self.run() 
    File "/usr/lib/python2.7/threading.py", line 505, in run 
    self.__target(*self.__args, **self.__kwargs) 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks 
    put(task) 
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

Проблема заключается в том, что pool методы все используют queue.Queue передать задачи в рабочих процессов. Все, что проходит через queue.Queue, должно быть подбираем, а foo.work не подбирается, так как оно не определено на верхнем уровне модуля.

Это может быть исправлено путем определения функции на верхнем уровне, который вызывает foo.work():

def work(foo): 
    foo.work() 

pool.apply_async(work,args=(foo,)) 

Обратите внимание, что foo является pickable, поскольку Foo определяется на верхнем уровне и foo.__dict__ является пригодны для консервирования.

+0

Спасибо за ваш ответ. Я обновил свой вопрос. Я не думаю, что это причина, хотя – CodeNoob

+5

Чтобы получить PicklingError, что-то должно быть помещено в очередь, которая не подбирается. Это может быть функция или ее аргументы. Чтобы узнать больше об этой проблеме, я предлагаю сделать копию вашей программы и начать ее обработку, проще и проще, каждый раз заново запуская программу, чтобы проверить, остается ли проблема. Когда это станет очень простым, вы либо сами обнаружите проблему, либо получите что-то, что вы можете разместить здесь. – unutbu

+1

Также: если вы определите функция на верхнем уровне модуля, но она украшена, тогда ссылка будет на выходе декоратора, и вы все равно получите эту ошибку. – bobpoekert

1

Вы случайно передаете множество строк?

У меня была такая же точная ошибка, когда я передавал массив, в котором содержится пустая строка. Я думаю, это может быть связано с этой ошибкой: http://projects.scipy.org/numpy/ticket/1658

13

Я обнаружил, что я могу также генерировать именно этот вывод ошибки на отлично работающем фрагменте кода, пытаясь использовать на нем профайлер.

Обратите внимание, что это было на Windows (где разветвление немного менее изящно).

я бег:

python -m profile -o output.pstats <script> 

И обнаружил, что удаление профилирования удалил ошибку и размещение профилирования восстановить его. Меня тоже затихало, потому что я знал, что код работал. Я проверял, не обновил ли что-то пул.py ... тогда возникло чувство тонуса и устранил профилирование, и все.

Публикация здесь для архивов в случае, если кто-то еще сталкивается с этим.

+2

WOW, спасибо за упоминание! Это заставило меня замочить в течение последнего часа или около того; Я пробовал все на простом примере - ничего не работало. Но у меня также был профайлер, запускающий мой пакетный файл :( – tim

+0

Вот что случилось со мной! –

50

Я бы использовал pathos.multiprocesssing, а не multiprocessing. pathos.multiprocessing - это вилка multiprocessing, которая использует dill. dill может сериализовать практически все, что угодно, в python, так что вы можете отправлять гораздо больше по параллели. Вилка pathos также имеет возможность работать непосредственно с несколькими функциями аргументов, как вам нужно для методов класса.

>>> from pathos.multiprocessing import ProcessingPool as Pool 
>>> p = Pool(4) 
>>> class Test(object): 
... def plus(self, x, y): 
...  return x+y 
... 
>>> t = Test() 
>>> p.map(t.plus, x, y) 
[4, 6, 8, 10] 
>>> 
>>> class Foo(object): 
... @staticmethod 
... def work(self, x): 
...  return x+1 
... 
>>> f = Foo() 
>>> p.apipe(f.work, f, 100) 
<processing.pool.ApplyResult object at 0x10504f8d0> 
>>> res = _ 
>>> res.get() 
101 

Получить pathos (и если вы хотите, dill) здесь: https://github.com/uqfoundation

+2

работал с удовольствием.Для всех остальных я установил обе библиотеки через: 'sudo pip install git + https: // github.com/uqfoundation/dill.git @ master' и ' sudo pip install git + https: //github.com/ uqfoundation/pathos.git @ master' –

+3

@AlexanderMcFarlane Я бы не устанавливал пакеты python с 'sudo' (из внешних источников, таких как github). Вместо этого я бы рекомендовал запустить: 'pip install --user git + ...' – Chris

+0

Использование только 'pip install patos' не работает печально и дает это сообщение:' Не удалось найти версию, которая удовлетворяет требованию pp == 1.5 .7-пафос (от пафоса) ' – xApple

15

Как уже сказал multiprocessing могут передавать только Python объекты для рабочих процессов, которые могут быть солеными. Если вы не можете реорганизовать свой код, как описано в unutbu, вы можете использовать расширенные возможности травления/разбрасывания s для передачи данных (особенно данных кода), как показано ниже.

Это решение требует только установок dill и никаких других библиотек как pathos:

import os 
from multiprocessing import Pool 

import dill 


def run_dill_encoded(payload): 
    fun, args = dill.loads(payload) 
    return fun(*args) 


def apply_async(pool, fun, args): 
    payload = dill.dumps((fun, args)) 
    return pool.apply_async(run_dill_encoded, (payload,)) 


if __name__ == "__main__": 

    pool = Pool(processes=5) 

    # asyn execution of lambda 
    jobs = [] 
    for i in range(10): 
     job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1)) 
     jobs.append(job) 

    for job in jobs: 
     print job.get() 
    print 

    # async execution of static method 

    class O(object): 

     @staticmethod 
     def calc(): 
      return os.getpid() 

    jobs = [] 
    for i in range(10): 
     job = apply_async(pool, O.calc,()) 
     jobs.append(job) 

    for job in jobs: 
     print job.get() 
+3

Я автор 'dill' и' patos' ... и пока вы правы, разве это не намного приятнее и чище и более гибко использовать также «пафос», как в моем ответе? Или, может быть, я немного предвзято ... –

+3

Я не знал о статусе '' пафоса' на момент написания и хотел представить решение, которое очень близко к ответу. Теперь, когда я увидел ваше решение, я согласен, что это путь. – rocksportrocker

+0

Я прочитал ваше решение и подумал: «Дох ... Я даже не думал об этом так». Так что это было здорово. –

4

This solution requires only the installation of dill and no other libraries as pathos

def apply_packed_function_for_map((dumped_function, item, args, kwargs),): 
    """ 
    Unpack dumped function as target function and call it with arguments. 

    :param (dumped_function, item, args, kwargs): 
     a tuple of dumped function and its arguments 
    :return: 
     result of target function 
    """ 
    target_function = dill.loads(dumped_function) 
    res = target_function(item, *args, **kwargs) 
    return res 


def pack_function_for_map(target_function, items, *args, **kwargs): 
    """ 
    Pack function and arguments to object that can be sent from one 
    multiprocessing.Process to another. The main problem is: 
     «multiprocessing.Pool.map*» or «apply*» 
     cannot use class methods or closures. 
    It solves this problem with «dill». 
    It works with target function as argument, dumps it («with dill») 
    and returns dumped function with arguments of target function. 
    For more performance we dump only target function itself 
    and don't dump its arguments. 
    How to use (pseudo-code): 

     ~>>> import multiprocessing 
     ~>>> images = [...] 
     ~>>> pool = multiprocessing.Pool(100500) 
     ~>>> features = pool.map(
     ~...  *pack_function_for_map(
     ~...   super(Extractor, self).extract_features, 
     ~...   images, 
     ~...   type='png' 
     ~...   **options, 
     ~... ) 
     ~...) 
     ~>>> 

    :param target_function: 
     function, that you want to execute like target_function(item, *args, **kwargs). 
    :param items: 
     list of items for map 
    :param args: 
     positional arguments for target_function(item, *args, **kwargs) 
    :param kwargs: 
     named arguments for target_function(item, *args, **kwargs) 
    :return: tuple(function_wrapper, dumped_items) 
     It returs a tuple with 
      * function wrapper, that unpack and call target function; 
      * list of packed target function and its' arguments. 
    """ 
    dumped_function = dill.dumps(target_function) 
    dumped_items = [(dumped_function, item, args, kwargs) for item in items] 
    return apply_packed_function_for_map, dumped_items 

Он также работает для Numpy массивов.

0
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

Эта ошибка также возникает, если у вас есть встроенная функция внутри объекта модели, которая была передана заданию async.

Так что не забудьте проверить объекты модели , которые не имеют встроенных функций. (В нашем случае мы использовали FieldTracker() функцию django-model-utils внутри модели для отслеживания определенного поля). Вот link к соответствующей проблеме GitHub.

Смежные вопросы