2013-06-21 2 views
12

Я с этой проблемой в Python:Заполнение очереди и управление многопроцессорной в питона

  • У меня есть очередь URL-адресов, что мне нужно, чтобы проверить время от времени
  • , если очередь заполняется , мне нужно обрабатывать каждый элемент в очереди
  • Каждый элемент в очереди должны быть обработаны одним процессом (многопроцессорной)

до сих пор мне удалось достичь этого «вручную», как это:

while 1: 
     self.updateQueue() 

     while not self.mainUrlQueue.empty(): 
      domain = self.mainUrlQueue.get() 

      # if we didn't launched any process yet, we need to do so 
      if len(self.jobs) < maxprocess: 
       self.startJob(domain) 
       #time.sleep(1) 
      else: 
       # If we already have process started we need to clear the old process in our pool and start new ones 
       jobdone = 0 

       # We circle through each of the process, until we find one free ; only then leave the loop 
       while jobdone == 0: 
        for p in self.jobs : 
         #print "entering loop" 
         # if the process finished 
         if not p.is_alive() and jobdone == 0: 
          #print str(p.pid) + " job dead, starting new one" 
          self.jobs.remove(p) 
          self.startJob(domain) 
          jobdone = 1 

Однако это приводит к множеству проблем и ошибок. Я задавался вопросом, не лучше ли я, используя пул процессов. Каким будет правильный способ сделать это?

Однако много раз моя очередь пуста, и она может быть заполнена на 300 единиц за секунду, поэтому я не слишком уверен, как это делать.

ответ

20

Вы можете использовать возможности блокировки queue, чтобы вызвать многократный процесс при запуске (используя multiprocessing.Pool) и позволить им спать, пока в очереди не будут доступны некоторые данные. Если вы не знакомы с этим, вы можете попробовать «поиграть» с этой простой программой:

import multiprocessing 
import os 
import time 

the_queue = multiprocessing.Queue() 


def worker_main(queue): 
    print os.getpid(),"working" 
    while True: 
     item = queue.get(True) 
     print os.getpid(), "got", item 
     time.sleep(1) # simulate a "long" operation 

the_pool = multiprocessing.Pool(3, worker_main,(the_queue,)) 
#       don't forget the coma here^

for i in range(5): 
    the_queue.put("hello") 
    the_queue.put("world") 


time.sleep(10) 

Испытан с Python 2.7.3 на Linux

Это породит 3 процесс (в добавлении родительский процесс). Каждый ребенок выполняет функцию worker_main. Это простой цикл, получающий новый элемент из очереди на каждой итерации. Рабочие блокируются, если ничего не готово к процессу.

При запуске все 3 процесса будут спать до тех пор, пока очередь не будет загружена некоторыми данными. Когда данные доступны, один из ожидающих работников получает этот элемент и начинает его обрабатывать. После этого он пытается получить другой элемент из очереди, снова ожидая, если ничего не доступно ...

+0

это не работает на windows в python 2.7.4, вам нужно иметь if __name__ = '__main__' part, и вы должны передать the_queue в качестве третьего параметра в функцию multiprocessing.Pool, иначе рабочий_main не получит данные – jhexp

+0

. Меня также интересует, как заставить этот кусок кода работать. Когда я запускаю его так, как он есть, он запускается, но ничего не печатает, возможно, потому, что worker_main не получает данные. Но когда я передаю the_queue в качестве третьего параметра, я получил аргумент TypeError: worker_main() после *, должен быть последовательностью, а не Queue – ziky90

+0

@ ziky90 Вероятно, вы забыли кому в '(queue,)'. Я редактировал код, чтобы добавить комментарий, указывающий на возможный источник ошибки. –