2013-04-01 2 views
6

Я ищу способ использовать kombu как адаптер MQ между сервером торнадо-sockjs и сервером приложений Django. Я сделал что-то вроде:Kombu в неблокирующемся способе

class BrokerClient(ConsumerMixin): 
    clients = [] 

    def __init__(self): 
     self.connection = BrokerConnection(settings.BROKER_URL) 
     self.io_loop = ioloop.IOLoop.instance() 
     self.queue = sockjs_queue 
     self._handle_loop() 

    @staticmethod 
    def instance(): 
     if not hasattr(BrokerClient, '_instance'): 
      BrokerClient._instance = BrokerClient() 
     return BrokerClient._instance 

    def add_client(self, client): 
     self.clients.append(client) 

    def remove_client(self, client): 
     self.clients.remove(client) 

    def _handle_loop(self): 
     try: 
      if self.restart_limit.can_consume(1): 
       for _ in self.consume(limit=5): 
        pass 
     except self.connection.connection_errors: 
      print ('Connection to broker lost. ' 
      'Trying to re-establish the connection...') 
     self.io_loop.add_timeout(datetime.timedelta(0.0001), self._handle_loop) 

    def get_consumers(self, Consumer, channel): 
     return [Consumer([self.queue, ], callbacks=[self.process_task])] 

    def process_task(self, body, message): 
     for client in self.clients: 
      if hasattr(body, 'users') and client.user.pk in body.users: 
       client.send(body) 
     message.ack() 

Но торнадо заблокирован при исполнении _handle_loop (как и ожидалось).

Есть ли способ предотвратить это?

Мне известно об адаптере библиотеки Pika для Tornado, но я бы хотел использовать kombu, потому что он уже используется в проекте и имеет гибкие транспортные средства.

UPDATE:

Измененного _handle_loop к функции генератора

def drain_events(self, callback): 
    with self.Consumer() as (connection, channel, consumers): 
     with self.extra_context(connection, channel): 
      try: 
       connection.drain_events(timeout=1) 
      except: 
       pass 
    callback(None) 


@tornado.gen.engine 
def _handle_loop(self): 
    response = yield tornado.gen.Task(self.drain_events) 
    self.io_loop.add_timeout(datetime.timedelta(0.0001), self._handle_loop) 

ответ

3

Наконец я нашел правильное решение для RabbitMQ бэкэнда:

class BrokerClient(object): 
    clients = [] 

    @staticmethod 
    def instance(): 
     if not hasattr(BrokerClient, '_instance'): 
      BrokerClient._instance = BrokerClient() 
     return BrokerClient._instance 

    def __init__(self): 
     self.connection = BrokerConnection(settings.BROKER_URL) 
     self.consumer = Consumer(self.connection.channel(), [queue, ], callbacks=[self.process_task]) 
     self.consumer.consume() 
     io_loop = tornado.ioloop.IOLoop.instance() 
     for sock, handler in self.connection.eventmap.items(): 
      def drain_nowait(fd, events): 
       handler() 
      io_loop.add_handler(sock.fileno(), drain_nowait, l.READ | l.ERROR) 

    def process_task(self, body, message): 
     #something 
     message.ack() 

    def add_client(self, client): 
     self.clients.append(client) 

    def remove_client(self, client): 
     self.clients.remove(client) 

Для других движков, вы можете использовать решение размещенного в вопрос

ПРИМЕЧАНИЕ: Не работает с libra bbitmq

+0

ПРИМЕЧАНИЕ. На момент написания этой статьи последние py-amqp и kombu официально не поддерживают асинхронные чтения. [см. это] (https://github.com/celery/py-amqp/issues/25). Тем не менее, есть [панировочные сухари] (https://github.com/celery/kombu/blob/master/examples/experimental/async_consume.py) для асинхронных покупок – Realistic

1

У меня была аналогичная необходимость, чтобы не блокировать между Kombu/RabbitMQ и ZeroMQ. Решение состояло в том, чтобы использовать Gevent для обезьяны, заплатив библиотеку сокетов, чтобы Kombu также стал неблокирующим. Мой «основной» поток имел обратный вызов kill_events Kombu, и в другом потоке gevent у меня был цикл, который получал сообщения из сокета ZeroMQ. Хорошо работает.

Это также не работает с librabbitmq, поскольку он имеет свой собственный материал сокета в C, на который не влияет Gevent.

Смежные вопросы