2016-06-16 3 views
3

Для автоматизации сервера мы пытаемся разработать инструмент, который может обрабатывать и выполнять множество задач на разных серверах. Мы отправляем задачу и имя хоста сервера в очередь. Затем очередь прерывается от реквестера, который передает информацию в api. Для этого мы можем выполнить более одной задачи сразу, мы используем потоки.Pika: используйте следующее сообщение, даже последнее сообщение не было подтверждено

Теперь мы застряли с квитирования сообщения ...

То, что мы делали до сих пор:
requester.py потребляет очереди и начинает затем поток, в котором анзибль задача выполняется. Затем результат переводится в другую очередь. Поэтому каждое новое сообщение создает новый поток. Завершена ли задача, нить умирает.
Но теперь идет сложная часть. Мы должны сделать сообщения постоянными, если наш сервер умрет. Таким образом, каждое сообщение должно быть подтверждено после результат от пользователя был отклонён назад.

Наша проблема заключается в том, что когда мы пытаемся подтвердить сообщение в самой теме, больше нет «одновременной» работы, потому что consume pika ждет подтверждения. Итак, как мы можем добиться, что consume потребляет сообщения и не ждет подтверждения? Или как мы можем работать или улучшать нашу маленькую программу?

requester.py

#!/bin/python 

    from worker import * 
    import ansible.inventory 
    import ansible.runner 
    import threading 

    class Requester(Worker): 
     def __init__(self): 
      Worker.__init__(self) 
      self.connection(self.selfhost, self.from_db) 
      self.receive(self.from_db) 

     def send(self, result, ch, method): 
      self.channel.basic_publish(exchange='', 
            routing_key=self.to_db, 
            body=result, 
            properties=pika.BasicProperties(
                delivery_mode=2, 
            )) 

      print "[x] Sent \n" + result 
      ch.basic_ack(delivery_tag = method.delivery_tag) 

     def callAnsible(self, cmd, ch, method): 
      #call ansible api pre 2.0 

      result = json.dumps(result, sort_keys=True, indent=4, separators=(',', ': ')) 
      self.send(result, ch, method) 

     def callback(self, ch, method, properties, body): 
      print(" [x] Received by requester %r" % body) 
      t = threading.Thread(target=self.callAnsible, args=(body,ch,method,)) 
      t.start() 

worker.py

import pika 
    import ConfigParser 
    import json 
    import os 

    class Worker(object): 
     def __init__(self): 
      #read some config files 

     def callback(self, ch, method, properties, body): 
      raise Exception("Call method in subclass") 

     def receive(self, queue): 
      self.channel.basic_qos(prefetch_count=1) 
      self.channel.basic_consume(self.callback,queue=queue) 
      self.channel.start_consuming() 

     def connection(self,server,queue): 
      self.connection = pika.BlockingConnection(pika.ConnectionParameters(
       host=server, 
       credentials=self.credentials)) 
      self.channel = self.connection.channel() 
      self.channel.queue_declare(queue=queue, durable=True) 

Мы работаем с Python 2.7 и Pika 0.10.0.

И да, мы заметили в pika FAQ: http://pika.readthedocs.io/en/0.10.0/faq.html
что pika не является потокобезопасным.

ответ

3

Отключить автоподтверждение и установить счетчик предварительной выборки на что-то большее, чем 1, в зависимости от того, сколько сообщений вы хотели бы получить от своего потребителя.

Как установить предварительную выборку channel.basic_qos(prefetch_count=1), found here.

+0

Удивительный! Спасибо! Как я могу обернуть это количество предварительной выборки. Это делает все волшебство. – Rumpli

+0

@Rumpli Я добавил это к ответу. Теперь я собираюсь нанести себе собственный урон, но так как вы новичок здесь, я коротко объясню вопрос о выкупе и принятии ответов: если ответ вам поможет, дайте ему побольше голосов. Если он решает вашу проблему, дайте ей переверните и примите. Здесь вы принимали только без перевыбора, но вы не пробовали это, если это работает. Возможно, сейчас просто повыситесь, и как только вы подтвердите согласие. Кто-то, пожалуйста, поправьте меня, если я не объясню это голосование/принятие правильно. – cantSleepNow

+0

Спасибо за ваше объяснение. Я попробовал это и установил 'channel.basic_qos (prefetch_count = 1)' более чем на 1, в то время он выполняет более одной задачи. И я попытался поддержать ваш ответ, но пока у меня нет репутации 15, он не отобразит его ... :( – Rumpli

Смежные вопросы