2011-11-26 7 views
4

Я хочу добавить список dicts вместе с модулем многопроцессорности python.проблема блокировки многопроцессорности python

Вот упрощенная версия моего кода:

#!/usr/bin/python2.7 
# -*- coding: utf-8 -*- 

import multiprocessing 
import functools 
import time 

def merge(lock, d1, d2): 
    time.sleep(5) # some time consuming stuffs 
    with lock: 
     for key in d2.keys(): 
      if d1.has_key(key): 
       d1[key] += d2[key] 
      else: 
       d1[key] = d2[key] 

l = [{ x % 10 : x } for x in range(10000)] 
lock = multiprocessing.Lock() 
d = multiprocessing.Manager().dict() 

partial_merge = functools.partial(merge, d1 = d, lock = lock) 

pool_size = multiprocessing.cpu_count() 
pool = multiprocessing.Pool(processes = pool_size) 
pool.map(partial_merge, l) 
pool.close() 
pool.join() 

print d 
  1. Я получаю эту ошибку при запуске этого сценария. Как я могу это решить?

    RuntimeError: Lock objects should only be shared between processes through inheritance

  2. в merge функции, необходимые в этом состоянии lock? или python позаботится об этом?

  3. Я думаю, что map должен сделать что-то из одного списка в другой список, а не сбрасывать все вещи в одном списке на один объект. Так есть ли более элегантный способ сделать такие вещи?

ответ

11

Следующие должны работать кросс-платформенной (т.е. на Windows, тоже) как в Python 2 и 3. Он использует процесс пула инициализатор для установки менеджера Dict как глобальный в каждом дочернем процессе.

FYI:

  • Использование замка ненужно с менеджером Dict.
  • Число процессов в Pool по умолчанию соответствует количеству CPU.
  • Если вас не интересует результат, вы можете использовать apply_async вместо map.
import multiprocessing 
import time 

def merge(d2): 
    time.sleep(1) # some time consuming stuffs 
    for key in d2.keys(): 
     if key in d1: 
      d1[key] += d2[key] 
     else: 
      d1[key] = d2[key] 

def init(d): 
    global d1 
    d1 = d 

if __name__ == '__main__': 

    d1 = multiprocessing.Manager().dict() 
    pool = multiprocessing.Pool(initializer=init, initargs=(d1,)) 

    l = [{ x % 5 : x } for x in range(10)] 

    for item in l: 
     pool.apply_async(merge, (item,)) 

    pool.close() 
    pool.join() 

    print(l) 
    print(d1) 
Смежные вопросы