Предположим, у меня есть бассейн с несколькими процессами, протекающими внутри класса, который я использую, чтобы сделать некоторую обработку, например:Многопроцессорная обработка, что делает pool.ready?
class MyClass:
def __init_(self):
self.pool = Pool(processes = NUM_PROCESSES)
self.pop = []
self.finished = []
def gen_pop(self):
self.pop = [ self.pool.apply_async(Item.test, (Item(),)) for _ in range(NUM_PROCESSES) ]
while (not self.check()):
continue
# Do some other stuff
def check(self):
self.finished = filter(lambda t: self.pop[t].ready(), range(NUM_PROCESSES))
new_pop = []
for f in self.finished:
new_pop.append(self.pop[f].get(timeout = 1))
self.pop[f] = None
# Do some other stuff
Когда я запускаю этот код я получаю cPickle.PicklingError
в котором говорится, что <type 'function'>
не может быть маринованным. Это говорит о том, что одна из функций apply_async
еще не вернулась, поэтому я пытаюсь добавить запущенную функцию в другой список. Но это не должно происходить, потому что все выполняемые вызовы должны быть отфильтрованы с использованием функции ready()
.
Относительно примечания, фактическая природа класса Item
несущественна, но важно то, что в верхней части моей функции Item.test
у меня есть оператор печати, который должен срабатывать для целей отладки. Однако этого не происходит. Это говорит мне, что функция была инициирована, но на самом деле не начала выполнение.
Итак, похоже, что ready()
на самом деле не говорит мне, закончилось ли выполнение или нет. Что именно делает ready()
и как мне изменить свой код, чтобы я мог отфильтровать процессы, которые все еще работают?
Почему вы думаете, что 'cPickle.PicklingError' означает это? – zwol
@zwol Я честно не знаю. Скорее, я думаю, что ошибка является симптомом фактической проблемы, которая была подробно описана в моем вопросе. – Woody1193
Что я говорю, я думаю, что ваш диагноз фактической проблемы ошибочен. – zwol