Я бы порекомендовал вам следить за первым RabbitMQ tutorials, если вы еще этого не сделали. Пример RPC основывается на концепциях, рассмотренных в предыдущих примерах (прямые очереди, эксклюзивные очереди, подтверждения и т. Д.).
RPC-решение, предложенное на уроке требует, по меньшей мере, две очереди, в зависимости от того, сколько клиентов вы хотите использовать:
- Прямой очередь (
rpc_queue
), используемая для отправки запросов от клиента к серверу ,
- Одна эксклюзивная очередь для каждого клиента, используемая для получения ответов.
Цикл запроса/ответа:
- Клиент посылает сообщение к
rpc_queue
. Каждое сообщение включает в себя свойство reply_to
с именем клиентской исключительной очереди, на которую должен отвечать сервер, и свойство correlation_id
, которое является уникальным id, используемым для отслеживания запроса.
- Сервер ожидает сообщений на
rpc_queue
. Когда приходит сообщение, он подготавливает ответ, добавляет correlation_id
к новому сообщению и отправляет его в очередь, определенную в свойстве сообщения reply_to
.
- Клиент ждет своей очереди, пока не найдет сообщение с первоначально созданным
correlation_id
.
Прыгая прямо к вашей проблеме, первое, что нужно сделать, это определить формат сообщения, который вы хотите использовать в своих ответах. Вы можете использовать JSON, msgpack или любую другую библиотеку сериализации. Например, если с помощью JSON, одно сообщение может выглядеть примерно так:
{
"cpu": 1.2,
"memory": 0.3
}
Затем на вашем server.py
:
def on_request(channel, method, props, body):
response = {'cpu': current_cpu_usage(),
'memory': current_memory_usage()}
properties = pika.BasicProperties(correlation_id=props.correlation_id)
channel.basic_publish(exchange='',
routing_key=props.reply_to,
properties=properties,
body=json.dumps(response))
channel.basic_ack(delivery_tag=method.delivery_tag)
# ...
И на вашем client.py
:
class ResponseTimeout(Exception): pass
class Client:
# similar constructor as `FibonacciRpcClient` from tutorial...
def on_response(self, channel, method, props, body):
if self.correlation_id == props.correlation_id:
self.response = json.loads(body.decode())
def call(self, timeout=2):
self.response = None
self.correlation_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.correlation_id),
body='')
start_time = time.time()
while self.response is None:
if (start_time + timeout) < time.time():
raise ResponseTimeout()
self.connection.process_data_events()
return self.response
Как вы видите, , код в значительной степени совпадает с кодом FibonacciRpcClient
. Основные отличия:
- Мы используем JSON в качестве формата данных для наших сообщений.
- Нашего метод клиент
call()
не требует body
аргумента (нет ничего, чтобы отправить на сервер)
- Мы заботимся о время ожидания ответа (если сервер выключен, или если он не отвечает на наши сообщения)
Тем не менее, там уже много вещей, чтобы улучшить здесь:
- Нет обработка ошибок: Например, если клиент «забывает», чтобы отправить
reply_to
очередь, наш сервер собирается аварии, и снова перезагрузится при перезагрузке (сломанное сообщение будет быть requeued бесконечно, пока он не признается нашим сервером)
- Мы не обрабатываем нарушенные соединения (без механизма переподключение)
- ...
Вы можете также рассмотреть вопрос о замене подход RPC с опубликовать/подписаться модель; таким образом, сервер просто транслирует свое состояние CPU/памяти каждый X-интервал времени, и один или несколько клиентов получают обновления.