2016-08-05 2 views
2

Предположим, у меня есть бассейн с несколькими процессами, протекающими внутри класса, который я использую, чтобы сделать некоторую обработку, например:Многопроцессорная обработка, что делает pool.ready?

class MyClass: 

    def __init_(self): 
     self.pool = Pool(processes = NUM_PROCESSES) 
     self.pop = [] 
     self.finished = [] 

    def gen_pop(self): 
     self.pop = [ self.pool.apply_async(Item.test, (Item(),)) for _ in range(NUM_PROCESSES) ] 
     while (not self.check()): 
      continue 
     # Do some other stuff 

    def check(self): 
     self.finished = filter(lambda t: self.pop[t].ready(), range(NUM_PROCESSES)) 
     new_pop = [] 
     for f in self.finished: 
      new_pop.append(self.pop[f].get(timeout = 1)) 
      self.pop[f] = None 
      # Do some other stuff 

Когда я запускаю этот код я получаю cPickle.PicklingError в котором говорится, что <type 'function'> не может быть маринованным. Это говорит о том, что одна из функций apply_async еще не вернулась, поэтому я пытаюсь добавить запущенную функцию в другой список. Но это не должно происходить, потому что все выполняемые вызовы должны быть отфильтрованы с использованием функции ready().

Относительно примечания, фактическая природа класса Item несущественна, но важно то, что в верхней части моей функции Item.test у меня есть оператор печати, который должен срабатывать для целей отладки. Однако этого не происходит. Это говорит мне, что функция была инициирована, но на самом деле не начала выполнение.

Итак, похоже, что ready() на самом деле не говорит мне, закончилось ли выполнение или нет. Что именно делает ready() и как мне изменить свой код, чтобы я мог отфильтровать процессы, которые все еще работают?

+0

Почему вы думаете, что 'cPickle.PicklingError' означает это? – zwol

+0

@zwol Я честно не знаю. Скорее, я думаю, что ошибка является симптомом фактической проблемы, которая была подробно описана в моем вопросе. – Woody1193

+0

Что я говорю, я думаю, что ваш диагноз фактической проблемы ошибочен. – zwol

ответ

3

Multiprocessing использует pickle модуль внутренне для передачи данных между процессами, поэтому ваши данные должны быть пригодны для консервирования. См. the list of what is considered picklable, метод объекта в этом списке отсутствует.
Чтобы решить эту проблему быстро использовать только функцию обертки вокруг метода:

def wrap_item_test(item): 
    item.test() 

class MyClass: 
    def gen_pop(self): 
     self.pop = [ self.pool.apply_async(wrap_item_test, (Item(),)) for _ in range(NUM_PROCESSES) ] 
     while (not self.check()): 
      continue 
+0

Итак, я бы просто поместил 'wrap_item_test' в верхнюю часть модуля, который содержит' Item'? Или есть что-то еще, что мне нужно знать? – Woody1193

2

Чтобы ответить на вопрос, который вы просили, .ready() действительно говорит вам ли .get()может блок: если .ready() возвращается True, .get() будет не блок, но если .ready() возвращает False, .get()может блок (или может не быть: вполне возможно, что асинхронный вызов будет завершен, прежде чем вы перейдете к вызову .get()).

Так, например, timeout = 1 в вашем .get() не служит никакой цели: так как вы только звоните .get() если .ready() вернулся True, вы уже знаете, за то, что .get() не будет блокировать.

Но .get() не блокирует ли не подразумевает вызов асинхронной был успешным, или даже о том, что рабочий процесс даже начал работать на вызов асинхронной: как говорят документы,

Если удаленный вызов поднял ставку исключение, то это исключение будет ререйзировано get().

То есть, например, если вызов асинхронный не может быть выполнен на всех, .ready() вернется True и .get() будут (перо) поднять исключение, предотвратила попытку работать.

Это похоже на то, что происходит в вашем случае, хотя мы должны угадать, потому что вы не отправляли исполняемый код и не включали трассировку.

Обратите внимание, что если то, что вы действительно хотите знать, является ли завершен вызов асинхронной нормально, после того, как уже получает True назад от .ready(), то .successful() метод для вызова.

Это довольно ясно, что, независимо от того, что может быть Item.test, его невозможно передать через .apply_async(), из-за ограничений на разборку. Это объясняет, почему Item.test никогда ничего (он никогда на самом деле называется!), Почему .ready() возвращает True (.apply_async() вызов не удалось), и почему .get() вызывает исключение (поскольку .apply_async() обнаружил исключение при попытке законсервировать один из его аргументов - вероятно Item.test) никогда не печатает ,

Смежные вопросы