Для автоматизации сервера мы пытаемся разработать инструмент, который может обрабатывать и выполнять множество задач на разных серверах. Мы отправляем задачу и имя хоста сервера в очередь. Затем очередь прерывается от реквестера, который передает информацию в api. Для этого мы можем выполнить более одной задачи сразу, мы используем потоки.Pika: используйте следующее сообщение, даже последнее сообщение не было подтверждено
Теперь мы застряли с квитирования сообщения ...
То, что мы делали до сих пор:
requester.py
потребляет очереди и начинает затем поток, в котором анзибль задача выполняется. Затем результат переводится в другую очередь. Поэтому каждое новое сообщение создает новый поток. Завершена ли задача, нить умирает.
Но теперь идет сложная часть. Мы должны сделать сообщения постоянными, если наш сервер умрет. Таким образом, каждое сообщение должно быть подтверждено после результат от пользователя был отклонён назад.
Наша проблема заключается в том, что когда мы пытаемся подтвердить сообщение в самой теме, больше нет «одновременной» работы, потому что consume
pika ждет подтверждения. Итак, как мы можем добиться, что consume
потребляет сообщения и не ждет подтверждения? Или как мы можем работать или улучшать нашу маленькую программу?
requester.py
#!/bin/python
from worker import *
import ansible.inventory
import ansible.runner
import threading
class Requester(Worker):
def __init__(self):
Worker.__init__(self)
self.connection(self.selfhost, self.from_db)
self.receive(self.from_db)
def send(self, result, ch, method):
self.channel.basic_publish(exchange='',
routing_key=self.to_db,
body=result,
properties=pika.BasicProperties(
delivery_mode=2,
))
print "[x] Sent \n" + result
ch.basic_ack(delivery_tag = method.delivery_tag)
def callAnsible(self, cmd, ch, method):
#call ansible api pre 2.0
result = json.dumps(result, sort_keys=True, indent=4, separators=(',', ': '))
self.send(result, ch, method)
def callback(self, ch, method, properties, body):
print(" [x] Received by requester %r" % body)
t = threading.Thread(target=self.callAnsible, args=(body,ch,method,))
t.start()
worker.py
import pika
import ConfigParser
import json
import os
class Worker(object):
def __init__(self):
#read some config files
def callback(self, ch, method, properties, body):
raise Exception("Call method in subclass")
def receive(self, queue):
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(self.callback,queue=queue)
self.channel.start_consuming()
def connection(self,server,queue):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host=server,
credentials=self.credentials))
self.channel = self.connection.channel()
self.channel.queue_declare(queue=queue, durable=True)
Мы работаем с Python 2.7 и Pika 0.10.0.
И да, мы заметили в pika FAQ: http://pika.readthedocs.io/en/0.10.0/faq.html
что pika не является потокобезопасным.
Удивительный! Спасибо! Как я могу обернуть это количество предварительной выборки. Это делает все волшебство. – Rumpli
@Rumpli Я добавил это к ответу. Теперь я собираюсь нанести себе собственный урон, но так как вы новичок здесь, я коротко объясню вопрос о выкупе и принятии ответов: если ответ вам поможет, дайте ему побольше голосов. Если он решает вашу проблему, дайте ей переверните и примите. Здесь вы принимали только без перевыбора, но вы не пробовали это, если это работает. Возможно, сейчас просто повыситесь, и как только вы подтвердите согласие. Кто-то, пожалуйста, поправьте меня, если я не объясню это голосование/принятие правильно. – cantSleepNow
Спасибо за ваше объяснение. Я попробовал это и установил 'channel.basic_qos (prefetch_count = 1)' более чем на 1, в то время он выполняет более одной задачи. И я попытался поддержать ваш ответ, но пока у меня нет репутации 15, он не отобразит его ... :( – Rumpli