2013-09-30 4 views
2

Я новичок в многопроцессорной обработке, и я просто пытаюсь написать простую программу в Python 3.2, у которой счетчик увеличивается бесконечно в одном потоке, а второй поток проверяет, заданное значение достигнуто в первом потоке. Как только значение будет достигнуто, я хочу, чтобы поток многопроцессорности закрылся, и чтобы программа отображала инструкцию «Complete Complete».Python Multiprocessing: Завершение бесконечного счетчика

Насколько я понимаю, эта программа будет выглядеть примерно так (где данное значение 10):

import multiprocessing as mp 


def Counter(): 
    i=1 
    while i > 0: 
     print("i: ",i) 
     i+=1 


def ValueTester(i): 
    if i >= 10: 
     *End Counter Function Thread* 


if __name__ == '__main__': 

    *Begin multiprocessing, one thread for "Counter" and a second for "ValueTester"* 

    print("Process Complete") 

Извиняюсь за расплывчатость psuedocode; Я прочитал документацию Python рядом с несколькими разными примерами, и я не могу найти простого решения.

Кроме того, как только это будет работать, как бы я решил установить заданное значение остановки (т. Е. Передать переменную ValueTester, а не просто использовать 10)?

Большое спасибо за помощь.

ответ

2

Мы должны быть осторожны, чтобы сделать четкое различие между потоками и процессами.

Нити все работают в рамках одного процесса. Значения Доступ к может использоваться совместно между потоками. Значения могут быть изменены потоками в безопасном (скоординированном) порядке, только если значение защищено threading.Lock до его изменения. В CPython, наиболее распространенной реализации Python, а также PyPy, но в отличие от других реализаций, таких как Jython или Iron Python, GIL (глобальная блокировка интерпретатора) предотвращает запуск более одного потока в любой момент времени. Таким образом, под CPython несколько потоков фактически запускаются серийно, а не одновременно. Тем не менее, несколько потоков могут быть полезны для интенсификации работы ввода-вывода, например, для запросов на многие веб-сайты, поскольку большая часть времени тратится на ожидание активности сети (I/O). Таким образом, множественные потоки не должны ждать столько же, сражаясь за доступ к одному процессору, по сравнению с задачами, которые являются CPU-интенсивными, как математические вычисления.

Теперь, сказав все это, вы имеете дело с несколькими процессами , а не потоками. Процессы независимы друг от друга. Они могут работать и выполняться одновременно на нескольких процессорах, если они доступны (в том числе под CPython). Когда вы создаете процесс, глобальные значения копируются из исходного процесса в порожденный процесс. На некоторых ОС, таких как Linux, которые имеют «копирование при записи», значения фактически разделяются между процессами до, процесс пытается перезаписать значение, и в это время значение копируется, чтобы стать независимым от другого процесса. Поэтому, когда вы меняете значения, , два процесса заканчиваются двумя переменными с одинаковыми именами, но могут иметь совершенно разные значения.

Для облегчения sharing state between processes имеются специальные объекты, предоставляемые модулем многопроцессорной обработки. К ним относятся mp.Value, mp.Array, mp.Manager. Обратите внимание, что при использовании этих объектов за кулисами происходит блокировка, которая должна быть получена до того, как значения могут быть изменены. Это не позволяет процессу изменять значение, а другой процесс пытается сделать то же самое. Однако блокировка также замедляет процессы, потому что приходится ждать выхода блокировки.

Теперь, чтобы сигнализировать процесс, когда состояние было достигнуто в другом процессе, используют mp.Event:

import multiprocessing as mp 
import time 

def Counter(i, event): 
    i.value=1 
    while i.value > 0 and not event.is_set(): 
     print("i: ",i.value) 
     i.value += 1 

def ValueTester(i, stopval, event): 
    while True: 
     if i.value >= stopval: 
      event.set() 
      break 
     else: 
      time.sleep(0.1) 

if __name__ == '__main__': 
    num = mp.Value('d', 0.0) 
    event = mp.Event()  
    counter = mp.Process(target=Counter, args=(num, event)) 
    counter.start() 
    tester = mp.Process(target=ValueTester, args=(num, 10, event)) 
    tester.start() 
    tester.join() 
    counter.join() 
    print("Process Complete") 

Дополнительные примеры о том, как использовать мультипроцессирование см Doug Hellman's Python Module of the Week tutorial.

+0

Благодарим вас за объяснение, но я боюсь сказать, что ваш код, похоже, не работает для меня; он работает, но счетчик никогда не останавливается и продолжает работать бесконечно. Хотел бы я предложить улучшения/изменения, но это очень ново для меня! – FreeBixi

+1

Если ваша машина очень быстрая, тогда 'ValueTester' может работать * до того, как' 'i.value' будет больше, чем' stopval'. В этом случае функция «ValueTester» заканчивается без установки события. Тогда 'Counter' может продолжаться бесконечно. Я изменю код, чтобы предотвратить это ... – unutbu

+0

Это, кажется, работает сейчас, спасибо! Я не знал, что счетчик остановится в разных местах в зависимости от скорости вашего компьютера. – FreeBixi

1

Так что вам нужен механизм для двух потоков (процессов) для связи друг с другом. К счастью, многопроцессорный модуль Python дает вам несколько вариантов, один из которых представляет собой очередь.

Итак, первое, что нужно сделать, это запустить эти два def и пройти в общей очереди, которую они могут использовать для связи. Поскольку вы хотите, чтобы основной процесс убил дочерний процесс, первый proc должен запустить второй.

import multiprocessing as mp 
from multiprocessing import Queue 

def counter(): #use lowercase c, 'Counter' is importable 
    threshold = 10 
    output = Queue(1) #for placing numbers on the queue 
    input = Queue(1) #for looking for the signal that child is done 
    child = Process(target=valuetester, args=(threshold, output, input)) 
    child.start() 
    i=1 
    while i > 0: 
     output.put(i) 
     print("i: ",i) 
     i+=1 
     try: 
      done = input.get_nowait() 
      if done == 'Im Done!': 
       print 'Process Complete!' 
       child.join() #clean up the child proc 
       return 
     except Empty: 
      pass #input is empty, no big deal 


def valuetester(threshold, input, output): 
    thresholdNotPassed = False 
    while thresholdNotPassed: 
     i = input.get() 
     if i >= threshold: 
      output.put('Im Done!') 
      return 
     else: 
      pass #nothing to do... just wait 

#Start the counter proc. You could use the main process since its easier 
if __name__ == 'main': 
    counter() 

Несколько вещей, чтобы отметить: Я сделал очереди есть MAXSIZE одного ... что это будет сделать, это блок (удержание на этой линии), если очередь заполнена.

Вы можете видеть, что я также использовал get_nowait() для основного процесса, чтобы проверить, закончен ли ребенок, иначе использование нормального get заблокировало бы там, и мы зашли бы в тупик!

+0

Во-первых, спасибо за помощь! Я пробую это сейчас; Я получил ошибку на 'child = Process (...)', потому что я думаю, что это должно быть 'child = mp.Process (...)'. Однако у меня была аналогичная проблема с 'except Empty:'; в каком модуле я бы нашел «Empty»? – FreeBixi

+0

Ах, извините, я обычно структурирую свой импорт как 'от многопроцессорного импорта ...' Пустой должен быть в 'from Queue import Empty' – gregb212