2013-06-25 2 views
0

У меня TCP сервер на питоне с asyncore:asyncore закрыть старые розетки

class AsyncClientHandler(asyncore.dispatcher_with_send): 
    def __init__(self,sock): 
     asyncore.dispatcher_with_send.__init__(self,sock) 

     self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 
     self.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 

     self.message="" 
     self.protocol=Protocol(DBSession, logger) 

    def handle_read(self): 
     data = self.recv(8192) 
     if data: 
      self.message+=data 
      while TERMINATOR in self.message: 
       index=self.message.index(TERMINATOR) 
       msg=self.message[:index] 
       self.message=self.message[index+len(TERMINATOR):] 

       answer=self.protocol.process_msg(msg, DBSession, tarif_dict) 
       if answer: 
        msg = HEADER+answer+TERMINATOR 
        self.send(msg) 

    def handle_close(self): 
     self.close() 

class AsyncServer(asyncore.dispatcher): 
    def __init__(self, host, port): 
     asyncore.dispatcher.__init__(self) 

     self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 

     self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 
     self.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) 

     self.set_reuse_addr() 
     self.bind((host, port)) 
     self.listen(5) 

    def handle_accept(self): 
     pair = self.accept() 
     if pair is None: 
      pass 
     else: 
      sock, addr = pair 
      logging.info("Incoming connection from %s",repr(addr)) 
      AsyncClientHandler(sock) 

Некоторые клиенты не закрыть соединение, так что в какой-то момент аварии сервера из-за большого количества розеток.

Как я могу закрыть неактивный сокет через некоторое время? урегулирование не работает.

ответ

0

Для этого вы можете использовать TCP-хранилище Keepalive (как и вы уже сделали) и установить его задержку, пинг ... Но этот apporach должен использоваться только для длительных соединений и доступен только в Unix. Попросите некоторых читателей here.

Вы также можете выполнить некоторое планирование сокетов, закрыв их, когда пройдет некоторое время, или задержите их, когда они активны. Я привел пример работы с вашим кодом:

import sched, time 

class SocketSched(threading.Thread): 
    def __init__(self): 
     threading.Thread.__init__(self) 
     self.daemon = True 
     self.to_run = [] 
     self.scheds = {} 
     self.start() 

    def add(self, what): 
     self.to_run.append(what.values()[0]) 
     self.scheds.update(what) 

    def run(self): 
     while True: 
      if self.to_run: 
       run = self.to_run.pop() 
       if not run.empty(): run.run() 
       else: self.to_run.append(run) 

Здесь мы определяем новый класс планировщика в другом потоке - это важно, sched модуль будет постоянно блокировать, как asyncore.loop(). Это необходимо модифицирования код немного:

class AsyncClientHandler(asyncore.dispatcher_with_send): 
    def __init__(self,sock, sch_class): 
     ... 
     self.delay = 10 
     self.sch_class = sch_class 
     self.sch = sched.scheduler(time.time, time.sleep) 
     self.sch_class.add({self.fileno(): self.sch}) 
     self.event = self.sch_class.scheds[self.fileno()].enter(self.delay, 1, self.handle_close,()) 

    ... 

    def delay_close(self): 
     self.sch_class.scheds[self.fileno()].cancel(self.event) 
     self.event = self.sch_class.scheds[self.fileno()].enter(self.delay, 1, self.handle_close,()) 

    ... 

    def handle_close(self): 
     try: 
      self.sch_class.scheds[self.fileno()].cancel(self.event) 
     except: 
      pass 
     ... 

self.delay является тайм-аут в секундах. По истечении этого времени, и никакие действия не задерживают его, сокет будет закрыт. Линия в handle_close() гарантирует, что она не будет вызвана дважды из-за задачи в планировщике.

Теперь вы должны добавить self.delay_close() в начало каждого метода, обеспечивающего активность сокета, например. handle_read().

класс сервера (получает экземпляр SocketSched и передает его на новые каналы):

class AsyncServer(asyncore.dispatcher): 
    def __init__(self, host, port, sch_class): 
     ... 
     self.sch_class = sch_class 

    ... 

    def handle_accept(self): 
    ... 
      AsyncClientHandler(sock, self.sch_class) 

Ready. Используя это:

server = AsyncServer('', 1337, SocketSched()) 
asyncore.loop() 

Это решение работает, но может быть подвержено ошибкам при некоторых близких событиях. В любом случае, сокеты будут считываться, задерживаться и закрываться при заданном тайм-ауте. К сожалению, для запуска такого цикла планирования используется некоторый процессор.