2012-06-30 3 views
6

Я пытаюсь имитировать сеть приложений, которые работают с использованием скрученных. В рамках моей симуляции я хотел бы синхронизировать определенные события и уметь кормить каждый процесс большими объемами данных. Я решил использовать многопроцессорные события и очереди. Однако мои процессы повесились.скручивается несовместимым с событиями и очередями многопроцессорности?

Для иллюстрации проблемы я написал код примера ниже. В частности, (около 95% времени на моей машине с песчаным мостом), функция «run_in_thread» заканчивается, однако обратный вызов «print_done» не вызывается до тех пор, пока я не нажму Ctrl-C.

Кроме того, я могу изменить несколько вещей в примере кода, чтобы сделать эту работу более надежной, например: уменьшение количества возникающих процессов, вызов self.ready.set from react_ready или изменение задержки deferLater.

Я предполагаю, что между скрученным реактором и блокировкой многопроцессорных вызовов есть состояние гонки, например Queue.get() или Event.wait()?

В чем проблема, с которой я столкнулся? Есть ли ошибка в моем коде, который мне не хватает? Могу ли я исправить это или скручен, несовместимый с событиями/очередями многопроцессорности?

Во-вторых, будет ли что-то вроде spawnProcess или Ampoule быть рекомендуемой альтернативой? (Как это было предложено в Mix Python Twisted with multiprocessing?)

Редактирует (по запросу):

Я столкнулся с проблемами со всеми реакторами, которые я пробовал glib2reactor selectreactor, pollreactor и epollreactor. Кажется, что epollreactor дает наилучшие результаты и, кажется, отлично работает для приведенного ниже примера, но все же дает мне такую ​​же (или аналогичную) проблему в моем приложении. Я продолжу расследование.

Я запускаю Gentoo Linux kernel 3.3 и 3.4, python 2.7, и я пробовал Twisted 10.2.0, 11.0.0, 11.1.0, 12.0.0 и 12.1.0.

В дополнение к моей машине из песчаного моста, я вижу ту же проблему на моей двухъядерной машине.

#!/usr/bin/python 
# -*- coding: utf-8 *-* 

from twisted.internet import reactor 
from twisted.internet import threads 
from twisted.internet import task 

from multiprocessing import Process 
from multiprocessing import Event 

class TestA(Process): 
    def __init__(self): 
     super(TestA, self).__init__() 
     self.ready = Event() 
     self.ready.clear() 
     self.start() 

    def run(self): 
     reactor.callWhenRunning(self.reactor_ready) 
     reactor.run() 

    def reactor_ready(self, *args): 
     task.deferLater(reactor, 1, self.node_ready) 
     return args 

    def node_ready(self, *args): 
     print 'node_ready' 
     self.ready.set() 
     return args 

def reactor_running(): 
    print 'reactor_running' 
    df = threads.deferToThread(run_in_thread) 
    df.addCallback(print_done) 

def run_in_thread(): 
    print 'run_in_thread' 
    for n in processes: 
     n.ready.wait() 

def print_done(dfResult=None): 
    print 'print_done' 
    reactor.stop() 

if __name__ == '__main__': 
    processes = [TestA() for i in range(8)] 
    reactor.callWhenRunning(reactor_running) 
    reactor.run() 
+0

Точная информация о том, какую версию Twisted вы используете, какую ОС вы используете, какой реактор вы используете (и так далее), будет полезен при ответе на этот вопрос. – Glyph

+0

Я столкнулся с проблемами со всеми реакторами, которые я пробовал glib2reactor selectreactor, pollreactor и epollreactor. Я запускаю Gentoo Linux, python 2.7, и я пробовал Twisted 11.1.0, 12.0.0 и 12.1.0. – Agrajag

ответ

10

Короткий ответ да, Twisted и многопроцессорная не совместимы друг с другом, и вы не можете надежно использовать их, как вы пытаетесь.

На всех платформах POSIX детальное управление процессом тесно связано с обработкой SIGCHLD. Обработчики сигналов POSIX являются глобальными процессами, и может быть только один тип сигнала.

Скрученные и stdlib multiprocessing не могут оба иметь установленный обработчик SIGCHLD. Только один из них может. Это означает, что только один из них может надежно управлять дочерними процессами. Ваш пример приложения не контролирует, кто из них выиграет эту способность, поэтому я ожидаю, что в его поведении возникнет какой-то детерминизм, связанный с этим фактом.

Однако более насущная проблема с вашим примером является то, что вы загружаете Twisted в родительском процессе, а затем использовать multiprocessing раскошелиться и не EXEC всех дочерних процессов. Twisted не поддерживает такое использование. Если вы используете fork, а затем exec, нет проблем. Однако отсутствие exec нового процесса (возможно, процесс Python с использованием Twisted) приводит ко всем видам дополнительного общего состояния, которое Twisted не учитывает.В вашем конкретном случае разделяемым состоянием, которое вызывает эту проблему, является внутренний «waker fd», который используется для реализации deferToThread. Когда fd делится между родительским и всеми дочерними элементами, когда родитель пытается просыпать основной поток, чтобы доставить результат вызова deferToThread, он скорее всего просыпает один из дочерних процессов вместо. У детского процесса нет ничего полезного, так что это просто пустая трата времени. Между тем основной поток в родительском доме никогда не просыпается и никогда не замечает, что ваша резьбовая задача выполнена.

Возможно, вы можете избежать этой проблемы, не загружая ни одного из Twisted, пока вы уже не создали дочерние процессы. Это превратило бы ваше использование в однопроцессорный прецедент по отношению к Twisted (в каждом процессе он был бы первоначально загружен, а затем этот процесс был бы , а не, чтобы перейти к fork вообще, так что нет вопросов о том, как fork и Twisted). Это означает, что вы даже не импортируете Twisted до тех пор, пока не создадите дочерние процессы.

Конечно, это только поможет вам дойти до «Скрученного». Любые другие библиотеки, которые вы используете, могут столкнуться с подобной проблемой (вы упомянули glib2, это отличный пример другой библиотеки, которая полностью задохнется, если вы попытаетесь использовать ее так).

Я настоятельно рекомендую не использовать модуль multiprocessing вообще. Вместо этого используйте любой многопроцессорный подход, который включает в себя fork и exec, а не fork в одиночку. В эту категорию попадает ампула.

+0

Спасибо за подробное объяснение. Теперь все проблемы кажутся очевидными! Мне удалось заставить мое симулятор работать отлично, используя subprocess.Popen и многопроцессорный. Listener/Client (используя AF_UNIX) для облегчения коммутации/синхронизации между скрученными узлами и процессом контроллера. Еще раз спасибо! – Agrajag