2016-09-20 4 views
2

Я хочу распараллелить обработку словаря, используя библиотеку многопроцессорности.Словарь многопроцессорности

Моей проблема может быть сведен к этому коду:

from multiprocessing import Manager,Pool 

def modify_dictionary(dictionary): 
    if((3,3) not in dictionary): 
     dictionary[(3,3)]=0. 
    for i in range(100): 
     dictionary[(3,3)] = dictionary[(3,3)]+1 
    return 0 

if __name__ == "__main__": 

    manager = Manager() 

    dictionary = manager.dict(lock=True) 
    jobargs = [(dictionary) for i in range(5)] 

    p = Pool(5) 
    t = p.map(modify_dictionary,jobargs) 
    p.close() 
    p.join() 

    print dictionary[(3,3)] 

создать пул 5 рабочих, и каждый работник должен увеличивать словарь [(3,3)] 100 раз. Итак, если процесс блокировки работает правильно, я ожидаю, что словарь [(3,3)] будет равен 500 в конце скрипта.

Однако; что-то в моем коде должно быть неправильным, потому что это не то, что я получаю: процесс блокировки не кажется «активированным», а словарь [(3,3)] всегда имеет оценщик < 500 в конце скрипта.

Не могли бы вы мне помочь?

+0

modify_array - отличное имя. Для чего используется метод? – MKesper

+0

Спасибо за замечание, я просто забыл переименовать функцию. Я разрабатываю алгоритм, в котором я добавляю вероятности к некоторым элементам 2D-массива, но из-за небольшого числа элементов я полагал, что использование словаря было более умным. – pete

+0

'manager.dict (lock = True)' не выделяет никакой блокировки, он создает управляемый эквивалент dict '{' lock ': True} ' – mata

ответ

0

Проблема с этой линии:

dictionary[(3,3)] = dictionary[(3,3)]+1 

Три вещи происходят на этой линии:

  • Считать значение словарной ключа (3,3)
  • Increment значение по 1
  • Запишите стоимость обратно

Но часть приращения происходит за пределами любой блокировки.

Вся последовательность должна быть атомарной и должна быть синхронизирована во всех процессах. В противном случае процессы будут чередовать, давая вам более низкую, чем ожидалось, сумму.

Держа замок виста приращением значение гарантирует, что вы получите в общей сложности 500 вы ожидаете:

from multiprocessing import Manager,Pool,Lock 

lock = Lock() 

def modify_array(dictionary): 
    if((3,3) not in dictionary): 
     dictionary[(3,3)]=0. 
    for i in range(100): 
     with lock: 
      dictionary[(3,3)] = dictionary[(3,3)]+1 
    return 0 

if __name__ == "__main__": 

    manager = Manager() 

    dictionary = manager.dict(lock=True) 
    jobargs = [(dictionary) for i in range(5)] 

    p = Pool(5) 
    t = p.map(modify_array,jobargs) 
    p.close() 
    p.join() 

    print dictionary[(3,3)] 
+0

Спасибо, это решение, которое я искал. Я думал, что фаза блокировки была автоматизирована с помощью структуры диспетчера диспетчера, добавив параметр lock = True. – pete

+0

Вы не должны использовать глобальную «блокировку», но явно передавать ее функции работника. Это может работать в контексте разветвления, но не при запуске метода запуска (windows) – mata

+0

Также часть 'if' также должна быть защищена блокировкой. Теоретически процесс может проходить if, затем останавливаться и возобновляться, в то время как другой уже мутирует dict и сбрасывает счетчик на 0. – mata

0

я ве удалось много раз, чтобы найти здесь правильное решение программирования сложности я имел. Поэтому я хотел бы внести небольшой вклад. У выше кода по-прежнему возникает проблема не корректировки словаря. Чтобы получить правильный результат, вам нужно пройти блокировку и исправить рабочие задания до f. В приведенном выше коде вы создаете новый словарь для каждого процесса. Код, который я нашел работоспособным:

from multiprocessing import Process, Manager, Pool, Lock 
from functools import partial 

def f(dictionary, l, k): 
    with l: 
     for i in range(100): 
      dictionary[3] += 1 

if __name__ == "__main__": 
    manager = Manager() 
    dictionary = manager.dict() 
    lock = manager.Lock() 

    dictionary[3] = 0 
    jobargs = list(range(5)) 
    pool = Pool() 
    func = partial(f, dictionary, lock) 
    t = pool.map(func, jobargs) 

    pool.close() 
    pool.join() 

    print(dictionary) 
Смежные вопросы