2015-06-30 3 views
0

В настоящее время я работаю с RabbitMQ на Python, используя клиент Pika для создания сервера, который обрабатывает различные типы сообщений. У базовой настройки есть одна очередь, получающая все входящие сообщения, процесс маршрутизации, который направляет их в правильные адресаты, и несколько процессов для обработки запросов и приема входящих данных. Эта настройка работает нормально, за исключением одного конкретного случая. Когда у меня есть сервер RabbitMQ, запущенный до запуска процессов сервера, и он получает сообщение, он правильно сохраняет их в очереди входящих сообщений. Однако, когда я пытаюсь запустить эти процессы и настроить пользователя на эту непустую входящую очередь с помощью функции pika.basic_consume, программа зависает. Итак, на данный момент, если я хочу запустить свои серверные процессы, я должен очистить все сообщения из очередей, прежде чем он будет работать правильно. Как исправить это, чтобы работать с непустыми очередями?Как настроить потребитель RabbitMQ на потребление из непустой очереди?


Вот пример одного из процессов, все они настроены по существу так же, как и этот.

class Router(Process): 

    def __init__(self,routing_table): 
     super(Router,self).__init__() 
     self.routing_table = routing_table 

     self.routeQueues = { 
      'r' : 'registration', 
      't' : 'util', 
      'p' : 'util', 
      's' : 'data' 
     } 

     # Create a connection to the RabbitMQ server. 
     self.rabbitConn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) 
     self.channel = self.rabbitConn.channel() 

     # Load all of the existing registered node queues 
     with open('registrations/nodes.txt','r') as nodes: 
      for line in nodes: 
       info = line.strip().split(":") 
       self.channel.queue_declare(info[1]) 

     # Declare the default queues 
     queue_list = ["incoming","registration","util"] 
     for queueName in queue_list: 
      self.channel.queue_declare(queueName) 

     # Start consuming things from the incoming queue 
     self.channel.basic_consume(self.gotPacket,queue='incoming') 

    def gotPacket(self,ch,method,params,body): 
     # Does stuff. Not relevant here. 
     pass 

    def run(self): 
     self.channel.start_consuming() 
+0

Это не должно быть проблемой. Я бы рекомендовал вам попробовать обновиться до последней версии pika. Попробуйте альтернативную библиотеку, например, кроличью или мою собственную амф-шторм. – eandersson

+0

@eandersson Я использую pika 0.9.13, последнюю версию. Если это вообще возможно, я предпочел бы продолжать использовать pika, так как это было бы огромным начинанием переключиться на другую библиотеку в этот момент. – Firebarrage

+0

0.9.14 был выпущен около года назад, в котором исправлено много проблем. – eandersson

ответ

0

Эта проблема была вызвана библиотекой pika 0.9.13. Переход на pika 0.9.14 решает эту проблему. @eandersson

Смежные вопросы