2015-08-15 3 views
-2

Я попытался написать python для многопроцессорной обработки. Я читал в Интернете, но я до сих пор не понимаю, как его написать.многопроцессорный питон: как разбить задания

Мой сценарий:

import multiprocessing as mp 
import numpy as np 
import ctypes 

def func(M, j, : 
    coor_i = np.zeros(1000) 
    coor_j = np.ones(1000) # In reality it is loaded from a txt 
    A = np.square(coor_i - coor_j) 
    a = A.sum 
    M[j] = a 

for i in range(1,100) : 
    M = mp.Array(ctypes.c_double, np.ones(i)) 
    p = mp.Process(target = func, args = (coor_i, j)) for j in range(1,i) 

    p.start() 
    p.join() 

    print(M) 

Я смотрю онлайн, и я видел - 'mp.Pool', 'процессы', 'mp.Queue'

Большое спасибо.

+1

Во-первых - в чем проблема, с которой вы столкнулись? Второй - этот код не является синтаксически действительным, так как отступы сломаны. – Makoto

+0

Вам нужно исправить первый отступ – kezzos

ответ

0

Прежде всего, ваша первая ошибка - не возвращать ничего из вашей функции func(). В python все значения являются ссылками на объекты в памяти, а когда вы выполняете назначение, вы заменяете ссылочное значение новым объектом. Так что M = -M не мутирует объект M, но создает новый и меняет ссылку M в рамках функции.

Что это означает, что ваша функция будет всегда возвращение None:

>>> from multiprocessing import Pool 
>>> def func(M): # if you call with M=1 
... # M==1 here 
... M = -M 
... # M==-1 here 
... 
>>> M = 1 
>>> M = func(M) 
>>> print(M) 
None 

Чтобы это исправить, нужно сделать его вернуть значение:

>>> def func(M): 
...  return -M 
... 
>>> print(func(1)) 
-1 

Тогда лучше способом распараллеливания работы было бы использование пула процессов, чтобы вы могли контролировать, сколько экземпляров выполняется параллельно, адаптируя прямой from the documentation examples:

>>> def func(M): 
>>>  return -M 
... 
>>> pool = Pool(processes=4)    # start 4 worker processes 
>>> results = [] 
>>> for i in range(1,100): 
...  results.append(pool.apply_async(func, [i])) # evaluate "func(i)" asynchronously 
... 
>>> print [result.get() for result in results] 
[-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15, -16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -29, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39, -40, -41, -42, -43, -44, -45, -46, -47, -48, -49, -50, -51, -52, -53, -54, -55, -56, -57, -58, -59, -60, -61, -62, -63, -64, -65, -66, -67, -68, -69, -70, -71, -72, -73, -74, -75, -76, -77, -78, -79, -80, -81, -82, -83, -84, -85, -86, -87, -88, -89, -90, -91, -92, -93, -94, -95, -96, -97, -98, -99] 

Другой способ сделать то же самое было бы использовать:

>>> print(pool.map(func, range(1,100))) 
[-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15, -16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -29, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39, -40, -41, -42, -43, -44, -45, -46, -47, -48, -49, -50, -51, -52, -53, -54, -55, -56, -57, -58, -59, -60, -61, -62, -63, -64, -65, -66, -67, -68, -69, -70, -71, -72, -73, -74, -75, -76, -77, -78, -79, -80, -81, -82, -83, -84, -85, -86, -87, -88, -89, -90, -91, -92, -93, -94, -95, -96, -97, -98, -99] 

Это, как говорится, если все, что вы делаете, действительно отрицая значение (или что-то, что простой) - который я думаю, вы не - то лучше нЕ использовать распараллеливание, так как питон и ваш микропроцессор vectorise работу и бежать быстрее:

>>> print([-M for M in range(1,100)]) 
[-1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15, -16, -17, -18, -19, -20, -21, -22, -23, -24, -25, -26, -27, -28, -29, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39, -40, -41, -42, -43, -44, -45, -46, -47, -48, -49, -50, -51, -52, -53, -54, -55, -56, -57, -58, -59, -60, -61, -62, -63, -64, -65, -66, -67, -68, -69, -70, -71, -72, -73, -74, -75, -76, -77, -78, -79, -80, -81, -82, -83, -84, -85, -86, -87, -88, -89, -90, -91, -92, -93, -94, -95, -96, -97, -98, -99] 

вот Метрика, как и побежал на моей машине с python2:

>>> from timeit import timeit 
>>> timeit("[-M for M in range(1,100)]", number=100000) 
0.6327948570251465 
>>> def test(): 
...  pool.map(func, range(1,100)) 
... 
>>> timeit(test, number=100000) 
31.26303195953369 

редактировать:

вопрос с вопросом является то, что совершенно не ясно, что вы пытаетесь достичь, и что вы пытаясь сделать. Обычно первый путь к распараллеливанию состоит в том, чтобы сделать версию, не распараллеленную, и попытаться распараллелить вещи, которые будут работать лучше и не будут иметь взаимного исключения.

Но одна вещь, которая поражает меня в своем коде, что для каждой итерации цикла вашего range(1,100) вы на самом деле ждет процесс, чтобы закончить перед запуском нового:

for i in range(1,100) : 
    M = mp.Array(ctypes.c_double, np.ones(i)) 
    # Create process 
    p = mp.Process(target = func, args = (coor_i, j)) for j in range(1,i) 

    # Start a process 
    p.start() 
    # wait for the process p to finish before going on 
    p.join() 
    # will continue when p has finished 

Если вы хотите обратиться это вы можете либо использовать Pool, как показано в моем примере или:

for i in range(1,100) : 
    M = mp.Array(ctypes.c_double, np.ones(i)) 
    # Create process 
    p = mp.Process(target = func, args = (coor_i, j)) for j in range(1,i) 

    # Start a process 
    p.start() 
    # wait for the process p to finish before going on 
    p.join() 
    # will continue when p has finished 

так быстро улучшение было бы:

processes = [] # keep a list of all the processes 
for i in range(1,100) : 
    M = mp.Array(ctypes.c_double, np.ones(i)) 
    # Create process 
    for j in range(1,i): 
     p = mp.Process(target = func, args = (coor_i, j)) 
     # Append to processes list 
     processes.append(p) 
     # Start a process 
     p.start() 

# wait for all processes to have finished before quitting (or going on) 
for p in processes: 
    p.join() 

поэтому я только что обновил код: там вы запускаете все процессы в одном цикле, а в другом цикле вы блокируете, пока не закончите. .join() блокирует текущий процесс, пока другой процесс не завершится, поэтому второй цикл гарантирует, что каждый процесс завершит работу до завершения вашего фрагмента или продолжения с последующим кодом.


Так что для «генератора» вопрос, я не заметил, что вы делали for j in range(1, i) на вашей mp.Process линии, и, таким образом, не присоединяя процесс в списке процессов, а генератор списка.

Что происходит, что на самом деле ваш Конде НЕ работы, и должны терпеть неудачу при компиляции, потому что:

p = Process(target = func, args = (coor_i, j)) for j in range(1 ,i) 
              ^
SyntaxError: invalid syntax 

если вы исправить это следующим образом:

>>> (p = Process(target = func, args = (coor_i, j)) for j in range(1 ,i)) 

тогда исходный код CAN NOT Работа, потому что p не является процессом, это генератор списка и p.start() не может существовать:

>>> p.next() 
<Process(Process-5, initial)> 
>>> p.next() 
<Process(Process-6, initial)> 
>>> p.next() 
<Process(Process-7, initial)> 
>>> p.next() 
<Process(Process-8, initial)> 
>>> p.next() 
<Process(Process-9, initial)> 

Поэтому вместо использования для одного вкладыша в этом случае это плохая идея, которая исправляется с использованием традиционного для.

+0

Извините, я должен уточнить мой код. Я отредактировал его, так что он больше похож на мой настоящий код. Я пытаюсь распараллелить цикл for вместо самой функции. – Simon

+0

Как-то появляется сообщение об ошибке - процессы [-1] .start() - Объект 'generator' не имеет атрибута 'start' Также вы бы не поняли, почему цикл «для p» должен быть размещен вне цикл «for i»? И почему -1 необходим для «процессов [-1]»? – Simon

+0

Я не пробовал код, и на самом деле я вижу, что не так ... но мне нужно идти, я вернусь, чтобы исправить его позже. – zmo

Смежные вопросы