2016-03-04 4 views
1

Я новичок в rabbitmq и пытаюсь выяснить, как я могу сделать запрос клиента сервером с информацией об использовании памяти и процессора с помощью этого урока (https://www.rabbitmq.com/tutorials/tutorial-six-python.html).Удаленный вызов Rabbitmq с Pika

Таким образом, клиент запрашивает процессор и память (я думаю, мне понадобятся две очереди), и сервер отвечает со значениями.

В любом случае, просто создайте client.py и server.py с этим футляром, используя библиотеку Pika в Python.

ответ

1

Я бы порекомендовал вам следить за первым RabbitMQ tutorials, если вы еще этого не сделали. Пример RPC основывается на концепциях, рассмотренных в предыдущих примерах (прямые очереди, эксклюзивные очереди, подтверждения и т. Д.).

RPC-решение, предложенное на уроке требует, по меньшей мере, две очереди, в зависимости от того, сколько клиентов вы хотите использовать:

  • Прямой очередь (rpc_queue), используемая для отправки запросов от клиента к серверу ,
  • Одна эксклюзивная очередь для каждого клиента, используемая для получения ответов.

Цикл запроса/ответа:

  • Клиент посылает сообщение к rpc_queue. Каждое сообщение включает в себя свойство reply_to с именем клиентской исключительной очереди, на которую должен отвечать сервер, и свойство correlation_id, которое является уникальным id, используемым для отслеживания запроса.
  • Сервер ожидает сообщений на rpc_queue. Когда приходит сообщение, он подготавливает ответ, добавляет correlation_id к новому сообщению и отправляет его в очередь, определенную в свойстве сообщения reply_to.
  • Клиент ждет своей очереди, пока не найдет сообщение с первоначально созданным correlation_id.

Прыгая прямо к вашей проблеме, первое, что нужно сделать, это определить формат сообщения, который вы хотите использовать в своих ответах. Вы можете использовать JSON, msgpack или любую другую библиотеку сериализации. Например, если с помощью JSON, одно сообщение может выглядеть примерно так:

{ 
    "cpu": 1.2, 
    "memory": 0.3 
} 

Затем на вашем server.py:

def on_request(channel, method, props, body): 
    response = {'cpu': current_cpu_usage(), 
       'memory': current_memory_usage()} 
    properties = pika.BasicProperties(correlation_id=props.correlation_id) 

    channel.basic_publish(exchange='', 
          routing_key=props.reply_to, 
          properties=properties, 
          body=json.dumps(response)) 
    channel.basic_ack(delivery_tag=method.delivery_tag) 

# ... 

И на вашем client.py:

class ResponseTimeout(Exception): pass 

class Client: 
    # similar constructor as `FibonacciRpcClient` from tutorial... 

    def on_response(self, channel, method, props, body): 
     if self.correlation_id == props.correlation_id: 
      self.response = json.loads(body.decode()) 

    def call(self, timeout=2): 
     self.response = None 
     self.correlation_id = str(uuid.uuid4()) 
     self.channel.basic_publish(exchange='', 
            routing_key='rpc_queue', 
            properties=pika.BasicProperties(
             reply_to=self.callback_queue, 
             correlation_id=self.correlation_id), 
            body='') 

     start_time = time.time() 
     while self.response is None: 
      if (start_time + timeout) < time.time(): 
       raise ResponseTimeout() 
      self.connection.process_data_events() 
     return self.response 

Как вы видите, , код в значительной степени совпадает с кодом FibonacciRpcClient. Основные отличия:

  • Мы используем JSON в качестве формата данных для наших сообщений.
  • Нашего метод клиент call() не требует body аргумента (нет ничего, чтобы отправить на сервер)
  • Мы заботимся о время ожидания ответа (если сервер выключен, или если он не отвечает на наши сообщения)

Тем не менее, там уже много вещей, чтобы улучшить здесь:

  • Нет обработка ошибок: Например, если клиент «забывает», чтобы отправить reply_to очередь, наш сервер собирается аварии, и снова перезагрузится при перезагрузке (сломанное сообщение будет быть requeued бесконечно, пока он не признается нашим сервером)
  • Мы не обрабатываем нарушенные соединения (без механизма переподключение)
  • ...

Вы можете также рассмотреть вопрос о замене подход RPC с опубликовать/подписаться модель; таким образом, сервер просто транслирует свое состояние CPU/памяти каждый X-интервал времени, и один или несколько клиентов получают обновления.

Смежные вопросы