2015-03-23 2 views
1

Я пишу брокерскую, сбалансированную службу клиент-работника, написанную на питоне с ZeroMQ.Случайное зависание/зависание в Python ZeroMQ

Клиенты приобретают адрес рабочего, устанавливают соединение (zmq.REQ/zmq.REP), отправляют один запрос, получают один ответ, а затем отключают.

Я выбрал брокерскую архитектуру, потому что объем данных, которые должны быть переданы между клиентами и рабочими, относительно велик, несмотря на то, что существует только одна пара REQ/REP для каждого подключения и использование брокера как " средний человек "создаст узкое место.

При тестировании системы я заметил, что связь между клиентами и рабочими прерывалась случайным образом, но иногда возобновлялась через пару секунд (часто несколько минут).

Я сузил выпуск до .connect()/.disconnect() клиентов для рабочих.

Я написал два небольших скрипта python, которые воспроизводят ошибку.

import zmq 

class Site: 

     def __init__(self): 
     ctx = zmq.Context() 
     self.pair_socket = ctx.socket(zmq.REQ) 
     self.num = 0 


     def __del__(self): 
     print "closed" 


     def run_site(self): 
     print "running..." 
     while True: 
      self.pair_socket.connect('tcp://127.0.0.1:5555') 
      print 'connected' 
      self.pair_socket.send_pyobj(self.num) 
      print 'sent', self.num 
      print self.pair_socket.recv_pyobj() 
      self.pair_socket.disconnect('tcp://127.0.0.1:5555') 
      print 'disconnected' 
      self.num += 1 

s = Site() 
s.run_site() 

и

import zmq 

class Server: 

     def __init__(self): 
      ctx = zmq.Context() 
      self.pair_socket = ctx.socket(zmq.REP) 
      self.pair_socket.bind('tcp://127.0.0.1:5555') 


     def __del__(self): 
      print " closed" 


     def run_server(self): 
      print "running..." 
      while True: 
       x = self.pair_socket.recv_pyobj() 
       print x 
       self.pair_socket.send_pyobj(x) 


s = Server() 
s.run_server() 

Я не думаю, что проблема связана с памятью или gc как я попытался отключить gc - без особого эффекта.

Я попытался с помощью zmq.LINGER, как описано здесь: Zeromq with python hangs if connecting to invalid socket

Что может вызвать эти Randoms замерзает?

+0

Используйте анализатор пакетов, чтобы увидеть, какая сторона пары висит ... это сообщение висит на клиенте, прежде чем он посылает, или он висит на сервере после того, как он получает. Как вы определяете замораживание, т. Е. Какое последнее сообщение вы видите перед запуском замораживания? – Jason

+0

При всем уважении создавайте ** 'while True: .connect(); ...; .disconnect() '** является довольно грубым ** способом относительно базовых ресурсов и связанных с ними системных накладных расходов. Конечно, гораздо лучше и «более экологичные»/«Более зеленые» способы выразить ваши намерения дизайна в коде, что не будет тратить ресурсы на выделение ресурсов ЦП/ресурсов – user3666197

ответ

0

Соединитель REP по определению является синхронным. Таким образом, ваш сервер может обслуживать только один запрос за раз, остальные из них просто заполняют буфер и теряются в какой-то момент.

Чтобы устранить основную причину, вместо этого необходимо использовать гнездо ROUTER.

class Server: 
    def __init__(self): 
     ctx = zmq.Context() 
     self.pair_socket = ctx.socket(zmq.ROUTER) 
     self.pair_socket.bind('tcp://127.0.0.1:5555') 
     self.poller = zmq.Poller() 
     self.poller.register(self.pair_socket, zmq.POLLIN) 

    def __del__(self): 
     print " closed" 

    def run_server(self): 
     print "running..." 
     while True: 
      try: 
       items = dict(self.poller.poll()) 
      except KeyboardInterrupt: 
       break 
      if self.pair_socket in items: 
       x = self.pair_socket.recv_multipart() 
       print x 
       self.pair_socket.send_multipart(x) 
Смежные вопросы