2015-03-04 2 views
3

Я хотел бы реализовать следующий рабочий процесс в Python. На примере у меня есть 5 процессов, выполняющихся параллельно. Один процесс определяется как менеджер, а остальные - все рабочие. Менеджер выполняет процедуру контроля несколько раз круговым способом, пока все работники не остановятся.Лучший способ реализовать этот рабочий процесс в python

Основная идея проблемы заключается в том, что каждый рабочий начинает с отличительного списка работ. Однако, пока они обрабатывают свою работу, они добавят больше работы в свой собственный список, и в некоторых случаях повторение работника будет происходить среди рабочих. Функция менеджера состоит в том, чтобы избежать повторения работы.

Workflow overview

Im в настоящее время пытается сделать это с помощью многопроцессорной и труб Python. Вот мой код только для выполнения тестов на связи:

import multiprocessing as mp 
import time 
import random 


def checkManager(i, data_pipe, com_pipe, work_list): 
    com_out, com_in = com_pipe 
    if com_out.poll(): ##check if there is anything to read 
     msg = com_out.recv() 
     print i, " received message ", msg 
     if msg == "SEND": 
      data_out, data_in = data_pipe 
      data_in.send(work_list) 
      work_list = com_out.recv() 
    return work_list 

def myfunc(i, data_pipe, com_pipe, work_list): 
    print "starting worker ", i, " with list: ", work_list 
    while work_list != []: 
     time.sleep(3) ##sleep just to simulate some work delay 
     work_list = checkManager(i, data_pipe, com_pipe, work_list) ##check if manager wants to comunicate 
    print "stopping worker ", i 


print "Starting..." 

data_pipe = mp.Pipe() ##pipe to receive work lists from all workers 
pipes = [] ##comunication pipe for each worker 
workers = [] 

##spawn workers 
for i in range(0,4): 
    pipe = mp.Pipe() ##pipe for comunication for that process 
    r = random.randint(10, 100) ##create random list just to test 
    p = mp.Process(target=myfunc,args=(i, data_pipe, pipe, range(r))) ##create process 
    pipes.append(pipe) 
    workers.append(p) 
    p.start() 

index = 0 
stopped_workers = [] 
data_out, data_in = data_pipe 
while len(stopped_workers) != len(workers): 
    time.sleep(2) 
    for i in range(0, len(workers)): 
     if i in stopped_workers: ##check if wworker has already stopepd 
      continue 
     r = random.randint(0,100) ##just to avoid send the request all the times.. 
     if r > 80: 
      print "Comunication with ", i 
      output, input = pipes[i] 
      input.send("SEND") ## send message 
      work_list = data_out.recv() #block untill receive data 
      print "data: ", work_list 
      input.send([]) ##send an empty list just to test if it stops 
      stopped_workers.append(i) ##add to the workers that already stopped 

print "Stoping main" 

Все работает отлично на этом простом тесте, однако я хотел бы, чтобы это было как можно более эффективным, и есть некоторые вещи, которые я не люблю на мой код.

Прежде всего, я думаю, что было бы более эффективно, если бы у меня был механизм, который посылал бы сигнал рабочим процессам вместо того, чтобы время от времени проверять функцию. Я пытался использовать сигналы, но так и не смог нормально работать. В дополнение к этому я создаю столько каналов, сколько количество процессов, im не уверен, что это лучшее решение. Я проверил несколько примеров, используя multiprocessing.Pool, однако это не выглядело как хорошее решение для моей проблемы.

Чтобы закончить, было бы лучше реализовать все, используя библиотеку MPI python?

Заранее спасибо

+0

Вы знаете о GIL и многопоточности? https://wiki.python.org/moin/GlobalInterpreterLock Вместо этого вам может понадобиться нечто вроде сельдерея. –

ответ

2

Вы могли бы быть более счастливым с каркасом удобства, как Circuits. Он настоятельно подчеркивает (среди прочего) концепцию компонентов и сигнализацию событий, а также многопроцессорную обработку.

+0

У меня уже есть этот рабочий процесс, реализованный в python с двумя разными стратегиями. Однако я буду читать об этом. спасибо –

Смежные вопросы