2015-02-23 3 views
0

при использовании кода, как этоттребуется синхронизация с многопроцессорным процессором в python?

def execute_run(list_out): 
    ... do something 

pool = ThreadPoolExecutor(6) 
for i in list1: 
    for j in list2: 
      pool.submit(myfunc, list_out) 
pool.join() 

предполагающей нити изменить list_out, они делают это в синхронном режиме?

+0

Это 'ThreadPoolExecutor' из библиотеки' concurrent.futures'? Если это так, я не думаю, что у него есть метод join. Вы имели в виду 'shutdown'? –

+1

Что происходит в 'execute_run'? Зависит ли результат от предыдущего выполнения? – Jimilian

+0

Мне в основном нужно место, где каждая нить может помещать результаты, возможно список, таким образом, чтобы не было условий гонки. – Bobo

ответ

1

Если ваша цель - вычислить что-то многопроцессорным способом, лучше не делиться государством. я предлагаю вам использовать простой map из multiprocessing, если это возможно:

from multiprocessing import Pool 

input_list = [] 
for i in list1: 
    for j in list2: 
     input_list.append((i, j)) 

p = Pool() 
result_list = p.map(do_something, input_list) 

map работает как для цикла:

def naive_map(input_list, do_something): 
    result = [] 
    for i in input_list: 
    result.append(do_something(i)) 
    return result 

Итак, если вы хотите использовать функцию, которая принимает несколько аргументов, вы может использовать лямбда-функцию для распаковки кортежа.

>> def your_function(v1, v2): 
    >>  return v1+v2 
    >> f = lambda (x,y): your_function(x, y) 
    >> map(f, [(1,2),(3,4),(5,6)]) 
     [3, 7, 11] 
+0

таким образом они бы изменили входной список в синхронизированном виде правильно? – Bobo

+0

@Bobo, таким образом они ** не ** изменяют входной список вообще. Они читали его только. Итак, если вы можете подготовить свои данные к обработке таким образом, вы должны это сделать. – Jimilian

+0

Мне нужно, чтобы они могли записать результаты где-нибудь, когда они будут сделаны. Как я могу это сделать? – Bobo

1

Ответ заключается в том, что каждый процесс получает копию списка и поэтому не видит изменений, внесенных другими процессами.

Для достижения желаемого вам потребуется использовать Manager для создания прокси-сервера. Обратите внимание, что классы прокси-сервера менеджера не знают, когда член мутирован. Например, если какой-либо элемент прокси-сервера списка каким-то образом изменен, прокси-сервер списка не знает этого. Вы должны переназначить участника, чтобы сбросить изменения. Пример из документации:

# create a list proxy and append a mutable object (a dictionary) 
lproxy = manager.list() 
lproxy.append({}) 
# now mutate the dictionary 
d = lproxy[0] 
d['a'] = 1 
d['b'] = 2 
# at this point, the changes to d are not yet synced, but by 
# reassigning the dictionary, the proxy is notified of the change 
lproxy[0] = d 
+2

ОП использует «ThreadPoolExecutor», а не «ProcessPoolExecutor». Поэтому каждый поток изменяет один и тот же список. –

+0

У вас есть указатель на пример? Мне нужно сделать что-то вроде кода, который я написал выше, где каждый поток изменяет тот же список, что и аргумент. спасибо – Bobo

+0

Я думаю, вы могли бы перефразировать это, чтобы быть немного более понятным. Я думаю, что способ, которым он в настоящее время сформулирован, создает впечатление, что такие изменения, как добавление или переключение из управляемого списка, не будут распространяться. Было бы неплохо указать, что отсутствие распространения применимо только к изменяемым элементам. – skrrgwasme

2

multiprocessing пулов потоков являются только темой и не имеет никакой магии вообще синхронизировать общие объекты. Вам необходимо защитить общие объекты с помощью блокировки.

+0

Или еще лучше реорганизуйте свой код, чтобы вы не делили какие-либо объекты (кроме как только для чтения) и вместо этого сортировали выходные объекты в основном потоке, после другие потоки запущены. –