2014-08-10 2 views
1

У меня возникают очень странные проблемы при работе с данными внутри моей функции, которые вызывается пулом pool.map. Например, следующий код работает, как ожидалось ...python no output при использовании pool.map_async

import csv 
import multiprocessing 
import itertools 
from collections import deque 

cur_best = 0 
d_sol = deque(maxlen=9) 
d_names = deque(maxlen=9) 

**import CSV Data1** 

def calculate(vals): 
    #global cur_best 
    sol = sum(int(x[2]) for x in vals) 
    names = [x[0] for x in vals] 
    print(", ".join(names) + " = " + str(sol)) 

def process(): 
    pool = multiprocessing.Pool(processes=4) 
    prod = itertools.product(([x[2], x[4], x[10]] for x in Data1)) 
    result = pool.map_async(calculate, prod) 
    pool.close() 
    pool.join() 
    return result 

process() 

Теперь, когда я добавить простое заявление, если-вычислять(), я не получаю никакого вывода.

def calculate(vals): 
     #global cur_best 
     sol = sum(int(x[2]) for x in vals) 
     if sol > cur_best: 
      cur_best = sol 
      names = [x[0] for x in vals] 
      print(", ".join(names) + " = " + str(cur_best)) 
      #would like to append cur_best and names to a deque 

Я попытался настроить, где объявляю «cur_best» безрезультатно.

Я пытаюсь отслеживать «текущее лучшее» решение, поскольку я выполняю вычисления. В моем линейном коде эта логика находится в вложенном for-loop, и я добавляю каждый новый cur_best в deque.

Связаны ли мои новые проблемы с пулом pool.map или pool.map_async? Могу ли я больше не обрабатывать функцию calculate() как линейный цикл?

Существует несколько других условных утверждений, которые мне нужно адресовать. Должен ли я обрабатывать это в другой части кода? И если да, то как именно?

+0

является 'global' закомментирована в Вашем коде? –

+1

Использование 'multiprocessing' будет создавать несколько процессов (здесь 4) с каждым их собственным значением глобального' cur_best', поэтому ваша структура кода не будет работать. –

+0

@JasonS да глобально закомментировано – nodoze

ответ

2

Здесь, вероятно, происходят две вещи. Во-первых, причина, по которой вы не видите ничего напечатанного от рабочей функции, вероятно, потому, что она бросает исключение. Поскольку вы используете map_async, вы фактически не увидите исключение, пока не назовете result.get(). Тем не менее, поскольку вы используете callnig close/join в пуле сразу после использования map_async, вместо этого вы должны использовать только map, который будет блокироваться до завершения всей работы (или создается исключение). Я не уверен почему происходит исключение (ничего не выпрыгивает из кода, который вы предоставили), но я предполагаю, что вы вытаскиваете неправильный индекс из своего списка.

Во-вторых, как указывал Армин Риго, cur_best не используется между всеми процессами, поэтому ваша логика не будет работать так, как вы планируете. Я думаю, что самый простой вариант - использовать multiprocessing.Value для создания целого числа в общей памяти, которое будет доступно для всех процессов.

Чтобы добавить результаты, которые вы получаете до deque, вам необходимо создать общие правила, используя multiprocessing.Manager. A Manager порождает серверный процесс, который может управлять совместным доступом к объекту (например, deque). Каждый процесс в вашем пуле (а также родительский процесс) получает доступ к объекту Proxy, который может связываться с процессом Менеджера для чтения/записи на общий объект.

Вот пример, показывающий все обсуждалось выше:

import itertools 
import multiprocessing 
from collections import deque 
from multiprocessing.managers import BaseManager, MakeProxyType 

class DequeManager(BaseManager): 
    pass 

BaseDequeProxy = MakeProxyType('BaseDequeProxy', (
    '__add__', '__contains__', '__delitem__', '__getitem__', '__len__', 
    '__mul__', '__reversed__', '__rmul__', '__setitem__', 
    'append', 'count', 'extend', 'extendleft', 'index', 'insert', 'pop', 
    'remove', 'reverse', 'sort', 'appendleft', 'popleft', 'rotate', 
    '__imul__' 
    )) 
class DequeProxy(BaseDequeProxy): 
    def __iadd__(self, value): 
     self._callmethod('extend', (value,)) 
     return self 
    def __imul__(self, value): 
     self._callmethod('__imul__', (value,)) 
     return self 

DequeManager.register('deque', deque, DequeProxy) 


cur_best = d_sol = d_names = None 

def init_globals(best, sol, names): 
    """ This will be called in each worker process. 

    A global variable (cur_best) will be created in each worker. 
    Because it is a multiprocessing.Value, it will be shared 
    between each worker, too. 

    """ 
    global cur_best, d_sol, d_names 
    cur_best = best 
    d_sol = sol 
    d_names = names 

def calculate(vals): 
    global cur_best 
    sol = sum(int(x[2]) for x in vals) 
    if sol > cur_best.value: 
     cur_best.value = sol 
     names = [x[0] for x in vals] 
     print(", ".join(names) + " = " + str(cur_best.value)) 
     d_sol.append(cur_best.value) 
     d_names.append(names) 
    return sol 

def process(): 
    global d_sol, d_names 
    cur_best = multiprocessing.Value("I", 0) # unsigned int 

    m = DequeManager() 
    m.start() 
    d_sol = m.deque(maxlen=9) 
    d_names = m.deque(maxlen=9) 

    pool = multiprocessing.Pool(processes=4, initializer=init_globals, 
           initargs=(cur_best, d_sol, d_names)) 
    prod = itertools.product([x[2], x[4], x[10]] for x in Data1) 
    result = pool.map(calculate, prod) # map instead of map_async 
    pool.close() 
    pool.join() 
    return result # Result will be a list containing the value of `sol` returned from each worker call 

if __name__ == "__main__":  
    print(process()) 
+0

ty для такого подробного объяснения. несколько вопросов: девки возвращаются только с одним значением (даже если установлено значение maxlen = 9). самые последние изменения: в моем вычислении() мне нужно использовать float, так что теперь это sol = sum (float (x [2]) для x в vals) аналогично in process() Я изменил из unsigned int на float через cur_best = multiprocessing .Value («f», 0) – nodoze

+0

- это каждый рабочий процесс, очищающий условия, а затем добавление последнего результата?глядя на структуру init_globals и как cur_best = d_sol = d_names = None определено ... это мое лучшее предположение. – nodoze

+0

Выполнение некоторого тестирования, im получая сообщение об ошибке, поскольку объект AutoProxy [deque] не поддерживает индексирование. после некоторого исследования, похоже, мне нужно определить _iter_ из BaseProxy, чтобы класс был итерабельным. – nodoze