У меня есть инструмент питона, который имеет в основном этот вид установки:питона многопроцессорный/потоковых очистки
main process (P1) -> spawns a process (P2) that starts a tcp connection
-> spawns a thread (T1) that starts a loop to receive
messages that are sent from P2 to P1 via a Queue (Q1)
server process (P2) -> spawns two threads (T2 and T3) that start loops to
receive messages that are sent from P1 to P2 via Queues (Q2 and Q3)
Проблема у меня в том, что, когда я остановил свою программу (с помощью Ctrl + C), его не уходит. Процесс сервера завершен, но основной процесс просто висит там, и я должен его убить.
Функции петли нити все выглядит одинаково:
def _loop(self):
while self.running:
res = self.Q1.get()
if res is None:
break
self._handle_msg(res)
Всех потоки запускаются как демон:
t = Thread(target=self._loop)
t.setDaemon(True)
t.start()
В моем основном процессе, я использую atexit, для выполнения задачи очистки:
atexit.register(self.on_exit)
Эти задачи очистки, по существу, следующие:
1) установить self.running
в P1, чтобы False
и послал None
к Q1, так что T1 Thread должен закончить
self.running = False
self.Q1.put(None)
2) отправить сообщение P2 через Q2 сообщить этот процесс, что он окончание
self.Q2.put("stop")
3) в P2, реагируют на сообщение "стоп" и делать то, что мы делали в P1
self.running = False
self.Q2.put(None)
self.Q3.put(None)
То есть, и в моем понимании, это должно сделать все закрытым красиво, но это не так.
Основной код P1 также содержит следующий бесконечный цикл, так как в противном случае программа закончится преждевременно:
while running:
sleep(1)
Может быть, есть что-то делать с этой проблемой, но я не могу понять, почему это должно быть.
Так что же я сделал не так? У моей установки есть серьезные недостатки дизайна? Я забыл что-то закрыть?
EDIT
Ok, я изменил свой код и сумел сделать это корректно завершить работу большую часть времени. К сожалению, время от времени он все еще застрял.
Мне удалось написать небольшой рабочий пример моего кода. Чтобы продемонстрировать, что происходит, вам нужно просто запустить скрипт, а затем использовать Ctrl + C
, чтобы остановить его. Похоже, проблема возникает сейчас, если вы нажмете Ctrl + C
как можно скорее после запуска инструмента.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import signal
import sys
import logging
from multiprocessing import Process, Queue
from threading import Thread
from time import sleep
logger = logging.getLogger("mepy-client")
class SocketClientProtocol(object):
def __init__(self, q_in, q_out, q_binary):
self.q_in = q_in
self.q_out = q_out
self.q_binary = q_binary
self.running = True
t = Thread(target=self._loop)
#t.setDaemon(True)
t.start()
t = Thread(target=self._loop_binary)
#t.setDaemon(True)
t.start()
def _loop(self):
print "start of loop 2"
while self.running:
res = self.q_in.get()
if res is None:
break
self._handle_msg(res)
print "end of loop 2"
def _loop_binary(self):
print "start of loop 3"
while self.running:
res = self.q_binary.get()
if res is None:
break
self._handle_binary(res)
print "end of loop 3"
def _handle_msg(self, msg):
msg_type = msg[0]
if msg_type == "stop2":
print "STOP RECEIVED"
self.running = False
self.q_in.put(None)
self.q_binary.put(None)
def _put_msg(self, msg):
self.q_out.put(msg)
def _handle_binary(self, data):
pass
def handle_element(self):
self._put_msg(["something"])
def run_twisted(q_in, q_out, q_binary):
s = SocketClientProtocol(q_in, q_out, q_binary)
while s.running:
sleep(2)
s.handle_element()
class MediatorSender(object):
def __init__(self):
self.q_in = None
self.q_out = None
self.q_binary = None
self.p = None
self.running = False
def start(self):
if self.running:
return
self.running = True
self.q_in = Queue()
self.q_out = Queue()
self.q_binary = Queue()
print "!!!!START"
self.p = Process(target=run_twisted, args=(self.q_in, self.q_out, self.q_binary))
self.p.start()
t = Thread(target=self._loop)
#t.setDaemon(True)
t.start()
def stop(self):
print "!!!!STOP"
if not self.running:
return
print "STOP2"
self.running = False
self.q_out.put(None)
self.q_in.put(["stop2"])
#self.q_in.put(None)
#self.q_binary.put(None)
try:
if self.p and self.p.is_alive():
self.p.terminate()
except:
pass
def _loop(self):
print "start of loop 1"
while self.running:
res = self.q_out.get()
if res is None:
break
self._handle_msg(res)
print "end of loop 1"
def _handle_msg(self, msg):
self._put_msg(msg)
def _put_msg(self, msg):
self.q_in.put(msg)
def _put_binary(self, msg):
self.q_binary.put(msg)
def send_chunk(self, chunk):
self._put_binary(chunk)
running = True
def signal_handler(signal, frame):
global running
if running:
running = False
ms.stop()
else:
sys.exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, signal_handler)
ms = MediatorSender()
ms.start()
for i in range(100):
ms.send_chunk("some chunk of data")
while running:
sleep(1)
Было бы полезно, если бы вы могли бы собрать полную программу, демонстрирующую проблему, а не просто в том числе фрагменты. В противном случае нам трудно понять, действительно ли мы воссоздаем то, что вы делаете. – dano
Какую версию Python вы используете? – dano
@basilikum Вы на окнах или Linux? – TysonU