2015-04-16 5 views
1

У меня возникает несколько проблем при использовании процессов и очередей.Проблемы с многопроцессорной обработкой python

Когда я запускаю следующий код, целевая функция просто получает элемент из главной очереди и добавляет его в другую очередь, специфичную для этого процесса.

import sys 
import multiprocessing 
from Queue import Empty 

# This is just taking a number from the queue 
# and adding it to another queue 
def my_callable(from_queue, to_queue): 
    while True: 
     try: 
      tmp = from_queue.get(0) 
      to_queue.put(tmp) 
      print to_queue 
     except Empty: 
      break 

# Create a master queue and fill it with numbers 
main_queue = multiprocessing.Queue() 
for i in xrange(100): 
    main_queue.put(i) 

all_queues = [] 
processes = [] 
# Create processes 
for i in xrange(5): 
    # Each process gets a queue that it will put numbers into 
    queue = multiprocessing.Queue() 
    # Keep up with the queue we are creating so we can get it later 
    all_queues.append(queue) 
    # Pass in our master queue and the queue we are transferring data to 
    process = multiprocessing.Process(target=my_callable, 
             args=(main_queue, queue)) 
    # Keep up with the processes 
    processes.append(process) 

for thread in processes: 
    thread.start() 

for thread in processes: 
    thread.join() 

Когда целевая функция печатает используемую очередь, вы заметите, что одна очередь используется почти исключительно.

Если вы затем берете вывод и печатаете его, вы увидите, что большая часть номеров попадает под одну очередь.

def queue_get_all(q): 
    items = [] 
    maxItemsToRetreive = 100 
    for numOfItemsRetrieved in range(0, maxItemsToRetreive): 
     try: 
      if numOfItemsRetrieved == maxItemsToRetreive: 
       break 
      items.append(q.get_nowait()) 
     except Empty, e: 
      break 
    return items 

for tmp in all_queues: 
    print queue_get_all(tmp) 

Что вызывает это? Есть ли что-то в моем коде, который я должен делать, что даже будет работать над этими процессами?

ВЫВОД

[0, 2, 3, 4, 5, 6, 7, 8] 
[1, 9, 10] 
[11, 14, 15, 16] 
[12, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99] 
[13] 

ответ

2

Я думаю, что у вас есть две проблемы здесь:

def my_callable(from_queue, to_queue): 
    while True: 
     try: 
      tmp = from_queue.get(0) 
      to_queue.put(tmp) 
      print to_queue 
     except Empty: 
      break 

Из документов для get:

Извлеките и возвращает элемент из очереди. Если дополнительный блок args имеет значение True (по умолчанию), а таймаут - None (по умолчанию), блокируйте, если необходимо, до тех пор, пока элемент не будет доступен. Если тайм-аут - это положительное число, он блокирует не более времени ожидания и вызывает исключение Queue.Empty, если в течение этого времени не было доступно ни одного элемента. В противном случае (блок является False), возвратите элемент, если он доступен немедленно, иначе создайте исключение Queue.Empty (тогда в этом случае игнорируется время ожидания).

Поскольку вы передаете 0 в качестве первого параметра, это эквивалентно get(False). Это делает его неблокирующим, а это означает, что если он не может сразу получить значение, он поднимет пустое исключение, которое закончит ваш рабочий процесс. Поскольку все ваши «рабочие» функции идентичны и одновременно пытаются вытащить из основной очереди, некоторые могут не сразу получить значение и умереть.

Предоставление .get() небольшого таймаута должно решить эту проблему.

Вторая проблема заключается в том, что ваша функция «work» занимает в основном нулевое время для завершения. Дайте ему небольшую паузу с sleep(.2), чтобы имитировать некоторую не-тривиальную работу, и она будет распространять по рабочим:

def my_callable(from_queue, to_queue): 
    while True: 
     try: 
      tmp = from_queue.get(True, .1) 
      sleep(0.2) 
      to_queue.put(tmp) 
     except Empty: 
      break 

EDIT:

Я забыл сказать, вообще-то лучше для этого типа проблема не полагаться на таймаут .get(), чтобы сигнализировать о завершении очереди. Вы получаете больше контроля, если вы используете какой-либо тип объекта «конец очереди», который вы передаете в очередь, которая сообщает рабочим, что пришло время бросить курить. Таким образом, вы можете заблокировать их все, ожидая ввода нового или команды «exit».

+0

Хорошо. Таким образом, это действительно подтверждает работу. Таким образом, установка тайм-аута 1 и сон 0,01. Итак, любая идея, какой объект должен быть в конце? Может быть, конкретная строка? – user3123576