2015-05-08 6 views
2

У меня есть инструмент питона, который имеет в основном этот вид установки:питона многопроцессорный/потоковых очистки

main process (P1) -> spawns a process (P2) that starts a tcp connection 
        -> spawns a thread (T1) that starts a loop to receive 
        messages that are sent from P2 to P1 via a Queue (Q1) 

server process (P2) -> spawns two threads (T2 and T3) that start loops to 
         receive messages that are sent from P1 to P2 via Queues (Q2 and Q3) 

Проблема у меня в том, что, когда я остановил свою программу (с помощью Ctrl + C), его не уходит. Процесс сервера завершен, но основной процесс просто висит там, и я должен его убить.

Функции петли нити все выглядит одинаково:

def _loop(self): 
    while self.running: 
     res = self.Q1.get() 
     if res is None: 
      break 
     self._handle_msg(res) 

Всех потоки запускаются как демон:

t = Thread(target=self._loop) 
t.setDaemon(True) 
t.start() 

В моем основном процессе, я использую atexit, для выполнения задачи очистки:

atexit.register(self.on_exit) 

Эти задачи очистки, по существу, следующие:

1) установить self.running в P1, чтобы False и послал None к Q1, так что T1 Thread должен закончить

self.running = False 
self.Q1.put(None) 

2) отправить сообщение P2 через Q2 сообщить этот процесс, что он окончание

self.Q2.put("stop") 

3) в P2, реагируют на сообщение "стоп" и делать то, что мы делали в P1

self.running = False 
self.Q2.put(None) 
self.Q3.put(None) 

То есть, и в моем понимании, это должно сделать все закрытым красиво, но это не так.

Основной код P1 также содержит следующий бесконечный цикл, так как в противном случае программа закончится преждевременно:

while running: 
    sleep(1) 

Может быть, есть что-то делать с этой проблемой, но я не могу понять, почему это должно быть.

Так что же я сделал не так? У моей установки есть серьезные недостатки дизайна? Я забыл что-то закрыть?

EDIT

Ok, я изменил свой код и сумел сделать это корректно завершить работу большую часть времени. К сожалению, время от времени он все еще застрял.

Мне удалось написать небольшой рабочий пример моего кода. Чтобы продемонстрировать, что происходит, вам нужно просто запустить скрипт, а затем использовать Ctrl + C, чтобы остановить его. Похоже, проблема возникает сейчас, если вы нажмете Ctrl + C как можно скорее после запуска инструмента.

#!/usr/bin/env python 
# -*- coding: utf-8 -*- 

import signal 
import sys 
import logging 
from multiprocessing import Process, Queue 
from threading import Thread 
from time import sleep 


logger = logging.getLogger("mepy-client") 


class SocketClientProtocol(object): 

    def __init__(self, q_in, q_out, q_binary): 
     self.q_in = q_in 
     self.q_out = q_out 
     self.q_binary = q_binary 
     self.running = True 
     t = Thread(target=self._loop) 
     #t.setDaemon(True) 
     t.start() 
     t = Thread(target=self._loop_binary) 
     #t.setDaemon(True) 
     t.start() 

    def _loop(self): 
     print "start of loop 2" 
     while self.running: 
      res = self.q_in.get() 
      if res is None: 
       break 
      self._handle_msg(res) 
     print "end of loop 2" 

    def _loop_binary(self): 
     print "start of loop 3" 
     while self.running: 
      res = self.q_binary.get() 
      if res is None: 
       break 
      self._handle_binary(res) 
     print "end of loop 3" 

    def _handle_msg(self, msg): 
     msg_type = msg[0] 
     if msg_type == "stop2": 
      print "STOP RECEIVED" 
      self.running = False 
      self.q_in.put(None) 
      self.q_binary.put(None) 

    def _put_msg(self, msg): 
     self.q_out.put(msg) 

    def _handle_binary(self, data): 
     pass 

    def handle_element(self): 
     self._put_msg(["something"]) 

def run_twisted(q_in, q_out, q_binary): 
    s = SocketClientProtocol(q_in, q_out, q_binary) 
    while s.running: 
     sleep(2) 
     s.handle_element() 


class MediatorSender(object): 

    def __init__(self): 
     self.q_in = None 
     self.q_out = None 
     self.q_binary = None 
     self.p = None 
     self.running = False 

    def start(self): 
     if self.running: 
      return 
     self.running = True 
     self.q_in = Queue() 
     self.q_out = Queue() 
     self.q_binary = Queue() 
     print "!!!!START" 
     self.p = Process(target=run_twisted, args=(self.q_in, self.q_out, self.q_binary)) 
     self.p.start() 
     t = Thread(target=self._loop) 
     #t.setDaemon(True) 
     t.start() 

    def stop(self): 
     print "!!!!STOP" 
     if not self.running: 
      return 
     print "STOP2" 
     self.running = False 
     self.q_out.put(None) 
     self.q_in.put(["stop2"]) 
     #self.q_in.put(None) 
     #self.q_binary.put(None) 

     try: 
      if self.p and self.p.is_alive(): 
       self.p.terminate() 
     except: 
      pass 

    def _loop(self): 
     print "start of loop 1" 
     while self.running: 
      res = self.q_out.get() 
      if res is None: 
       break 
      self._handle_msg(res) 
     print "end of loop 1" 

    def _handle_msg(self, msg): 
     self._put_msg(msg) 

    def _put_msg(self, msg): 
     self.q_in.put(msg) 

    def _put_binary(self, msg): 
     self.q_binary.put(msg) 

    def send_chunk(self, chunk): 
     self._put_binary(chunk) 

running = True 
def signal_handler(signal, frame): 
    global running 
    if running: 
     running = False 
     ms.stop() 
    else: 
     sys.exit(0) 

if __name__ == "__main__": 
    signal.signal(signal.SIGINT, signal_handler) 
    ms = MediatorSender() 
    ms.start() 
    for i in range(100): 
     ms.send_chunk("some chunk of data") 
    while running: 
     sleep(1) 
+0

Было бы полезно, если бы вы могли бы собрать полную программу, демонстрирующую проблему, а не просто в том числе фрагменты. В противном случае нам трудно понять, действительно ли мы воссоздаем то, что вы делаете. – dano

+0

Какую версию Python вы используете? – dano

+0

@basilikum Вы на окнах или Linux? – TysonU

ответ

1

Я думаю, что вы развращаете свой multiprocessing.Queue, вызывая p.terminate() о дочернем процессе. Документов есть предупреждение об этом:

Предупреждения: Если этот метод используется, когда связанный процесс использует трубу или в очереди, то трубы или очередь подлежит повредиться и может стать непригодным для использования по другой процесс. Аналогично, если процесс имеет , он приобрел замок или семафор и т. Д., Тогда его завершение может быть заставляет другие процессы затормозить.

В некоторых случаях, это выглядит как p прекращает до вашего метода MediatorSender._loop может потребить часовую загруженные в нее, чтобы сообщить ему, что он должен выйти.

Кроме того, вы устанавливаете обработчик сигнала, который ожидает, чтобы работать только в основном процессе, но SIGINT фактически получен как родитель и дочерних процессов, что означает signal_handler вызывается в обоих процессах, может результат в ms.stop вызывался дважды, из-за гонки в том, как вы ручки настройки ms.running к False

Я бы рекомендовал просто эксплуатируют, что оба процесса получить SIGINT, и иметь как родитель и ребенок обращаться KeyboardInterrupt непосредственно. Таким образом, каждый из них каждый закрывает себя чисто, вместо того, чтобы родительский закончить ребенка. Следующий код демонстрирует, что и в моем тестировании никогда не висел.Я упростил свой код в нескольких местах, но функционально это точно так же:

#!/usr/bin/env python 
# -*- coding: utf-8 -*- 

import logging 
from multiprocessing import Process, Queue 
from threading import Thread 
from time import sleep 

logger = logging.getLogger("mepy-client") 

class SocketClientProtocol(object): 

    def __init__(self, q_in, q_out, q_binary): 
     self.q_in = q_in 
     self.q_out = q_out 
     self.q_binary = q_binary 
     t = Thread(target=self._loop) 
     t.start() 
     t = Thread(target=self._loop_binary) 
     t.start() 

    def _loop(self): 
     print("start of loop 2") 
     for res in iter(self.q_in.get, None): 
      self._handle_msg(res) 
     print("end of loop 2") 

    def _loop_binary(self): 
     print("start of loop 3") 
     for res in iter(self.q_binary.get, None): 
      self._handle_binary(res) 
     print("end of loop 3") 

    def _handle_msg(self, msg): 
     msg_type = msg[0] 
     if msg_type == "stop2": 
      self.q_in.put(None) 
      self.q_binary.put(None) 

    def _put_msg(self, msg): 
     self.q_out.put(msg) 

    def stop(self): 
     print("STOP RECEIVED") 
     self.q_in.put(None) 
     self.q_binary.put(None) 

    def _handle_binary(self, data): 
     pass 

    def handle_element(self): 
     self._put_msg(["something"]) 

def run_twisted(q_in, q_out, q_binary): 
    s = SocketClientProtocol(q_in, q_out, q_binary) 
    try: 
     while True: 
      sleep(2) 
      s.handle_element() 
    except KeyboardInterrupt: 
     s.stop() 

class MediatorSender(object): 

    def __init__(self): 
     self.q_in = None 
     self.q_out = None 
     self.q_binary = None 
     self.p = None 
     self.running = False 

    def start(self): 
     if self.running: 
      return 
     self.running = True 
     self.q_in = Queue() 
     self.q_out = Queue() 
     self.q_binary = Queue() 
     print("!!!!START") 
     self.p = Process(target=run_twisted, 
         args=(self.q_in, self.q_out, self.q_binary)) 
     self.p.start() 
     self.loop = Thread(target=self._loop) 
     self.loop.start() 

    def stop(self): 
     print("!!!!STOP") 
     if not self.running: 
      return 
     print("STOP2") 
     self.running = False 
     self.q_out.put(None) 

    def _loop(self): 
     print("start of loop 1") 
     for res in iter(self.q_out.get, None): 
      self._handle_msg(res) 
     print("end of loop 1") 

    def _handle_msg(self, msg): 
     self._put_msg(msg) 

    def _put_msg(self, msg): 
     self.q_in.put(msg) 

    def _put_binary(self, msg): 
     self.q_binary.put(msg) 

    def send_chunk(self, chunk): 
     self._put_binary(chunk) 

if __name__ == "__main__": 
    ms = MediatorSender() 
    try: 
     ms.start() 
     for i in range(100): 
      ms.send_chunk("some chunk of data") 
     # You actually have to join w/ a timeout in a loop on 
     # Python 2.7. If you just call join(), SIGINT won't be 
     # received by the main process, and the program will 
     # hang. This is a bug, and is fixed in Python 3.x. 
     while True: 
      ms.loop.join() 
    except KeyboardInterrupt: 
     ms.stop() 

Edit:

Если вы предпочитаете использовать обработчик сигнала, а не ловить KeyboardInterrupt, вам просто нужно убедитесь, что дочерний процесс использует свой собственный обработчик сигнала, а не наследует родителя:

#!/usr/bin/env python 
# -*- coding: utf-8 -*- 

import signal 
import logging 
from functools import partial 
from multiprocessing import Process, Queue 
from threading import Thread 
from time import sleep 

logger = logging.getLogger("mepy-client") 

class SocketClientProtocol(object): 

    def __init__(self, q_in, q_out, q_binary): 
     self.q_in = q_in 
     self.q_out = q_out 
     self.q_binary = q_binary 
     self.running = True 
     t = Thread(target=self._loop) 
     t.start() 
     t = Thread(target=self._loop_binary) 
     t.start() 

    def _loop(self): 
     print("start of loop 2") 
     for res in iter(self.q_in.get, None): 
      self._handle_msg(res) 
     print("end of loop 2") 

    def _loop_binary(self): 
     print("start of loop 3") 
     for res in iter(self.q_binary.get, None): 
      self._handle_binary(res) 
     print("end of loop 3") 

    def _handle_msg(self, msg): 
     msg_type = msg[0] 
     if msg_type == "stop2": 
      self.q_in.put(None) 
      self.q_binary.put(None) 

    def _put_msg(self, msg): 
     self.q_out.put(msg) 

    def stop(self): 
     print("STOP RECEIVED") 
     self.running = False 
     self.q_in.put(None) 
     self.q_binary.put(None) 

    def _handle_binary(self, data): 
     pass 

    def handle_element(self): 
     self._put_msg(["something"]) 

def run_twisted(q_in, q_out, q_binary): 
    s = SocketClientProtocol(q_in, q_out, q_binary) 
    signal.signal(signal.SIGINT, partial(signal_handler_child, s)) 
    while s.running: 
     sleep(2) 
     s.handle_element() 

class MediatorSender(object): 

    def __init__(self): 
     self.q_in = None 
     self.q_out = None 
     self.q_binary = None 
     self.p = None 
     self.running = False 

    def start(self): 
     if self.running: 
      return 
     self.running = True 
     self.q_in = Queue() 
     self.q_out = Queue() 
     self.q_binary = Queue() 
     print("!!!!START") 
     self.p = Process(target=run_twisted, 
         args=(self.q_in, self.q_out, self.q_binary)) 
     self.p.start() 
     self.loop = Thread(target=self._loop) 
     self.loop.start() 

    def stop(self): 
     print("!!!!STOP") 
     if not self.running: 
      return 
     print("STOP2") 
     self.running = False 
     self.q_out.put(None) 

    def _loop(self): 
     print("start of loop 1") 
     for res in iter(self.q_out.get, None): 
      self._handle_msg(res) 
     print("end of loop 1") 

    def _handle_msg(self, msg): 
     self._put_msg(msg) 

    def _put_msg(self, msg): 
     self.q_in.put(msg) 

    def _put_binary(self, msg): 
     self.q_binary.put(msg) 

    def send_chunk(self, chunk): 
     self._put_binary(chunk) 

def signal_handler_main(ms, *args): 
    ms.stop() 

def signal_handler_child(s, *args): 
    s.stop() 

if __name__ == "__main__": 
    ms = MediatorSender() 
    signal.signal(signal.SIGINT, partial(signal_handler_main, ms)) 
    ms.start() 
    for i in range(100): 
     ms.send_chunk("some chunk of data") 
    while ms.loop.is_alive(): 
     ms.loop.join(9999999) 
    print('done main') 
+0

Удивительный ответ! Я просто попробовал свой скрипт и, похоже, отлично справился с настройкой тайм-аута для 'join()' или используя 'while ms.running: sleep (1)'. Знаете ли вы, есть ли недостаток в использовании просто произвольно большого количества для таймаута? Единственное, что меня беспокоит, это исключение 'KeyboardInterrupt'. Я подозреваю, что это будет работать, только если я на самом деле использую 'Ctrl + C', но не если я отправлю SIGINT любым другим способом? Так есть ли способ заставить его работать с исходным сигналом? В любом случае, например, я могу различать оба процесса в обработчике сигналов? – basilikum

+0

@basilikum Использование супер большого таймаута с 'join' работает так же хорошо. Если вы хотите использовать обработчики сигналов вместо обработки 'KeyboardInterrupt', вы можете просто использовать отдельный обработчик сигнала в дочернем процессе. См. Мое редактирование. – dano

+0

Отлично, это именно то, что я искал. Благодаря! – basilikum

0

Это обычно работает для меня, если я использую модуль резьбонарезания. Это не сработает, если вы используете многопроцессорную систему. Если вы запускаете скрипт из терминала, попробуйте запустить его в фоновом режиме, например.

python scriptFoo.py & 

После запуска процесса он будет выводить PID, как этот

[1] 23107 

Всякий раз, когда вам нужно выйти из сценария просто введите убить и сценарий PID, как это.

kill 23107 

Попадите внутрь, и он должен уничтожить все подпроцессы и вывести это.

[1]+ Terminated    python scriptFoo.py 

Насколько я знаю, вы не можете убить всех подпроцессов с «Ctrl + C»

+0

Спасибо за ваш ответ, но это не совсем то, что я ищу. Я не хочу менять способ запуска или остановки инструмента. Люди должны иметь возможность просто использовать 'Ctrl + C', чтобы закрыть все. Я уверен, что для этого должно быть решение. – basilikum

1

Может быть, вы должны попытаться захватить SIGINT сигнал, который генерируется Ctrl + C использованием signal.signal так:

#!/usr/bin/env python 
import signal 
import sys 
def signal_handler(signal, frame): 
     print('You pressed Ctrl+C!') 
     sys.exit(0) 
signal.signal(signal.SIGINT, signal_handler) 
print('Press Ctrl+C') 
signal.pause() 

код украден из here

+0

Спасибо, да! На самом деле это то, что я сделал в моем улучшенном коде (см. Править). Тем не менее, по-прежнему возникают проблемы. – basilikum

+0

Вы пытались использовать 'put_nowait' вместо' put' в функции остановки 'SocketClientProtocol._handle_msg' и' MediatorSender.stop'? –