2013-07-15 4 views
11

Я создал простой издатель и потребитель, который подписывается на очередь, используя basic.consume.Потребление не подтверждается сообщениями от RabbitMq

Мой потребитель подтверждает сообщения, когда работа выполняется без исключения. Всякий раз, когда я сталкиваюсь с исключением, я не понимаю сообщение и возвращаюсь раньше. Из подтвержденных сообщений исчезают только сообщения с подтвержденными сообщениями, поэтому они работают правильно.
Теперь я хочу, чтобы потребитель снова поднимал неудавшиеся сообщения, но единственный способ пересмотреть эти сообщения - это перезапустить потребитель.

Как мне подойти к этому прецеденту?

код установки

$channel = new AMQPChannel($connection); 

$exchange = new AMQPExchange($channel); 

$exchange->setName('my-exchange'); 
$exchange->setType('fanout'); 
$exchange->declare(); 

$queue = new AMQPQueue($channel); 
$queue->setName('my-queue'); 
$queue->declare(); 
$queue->bind('my-exchange'); 

Потребительский код

$queue->consume(array($this, 'callback')); 

public function callback(AMQPEnvelope $msg) 
{ 
    try { 
     //Do some business logic 
    } catch (Exception $ex) { 
     //Log exception 
     return; 
    } 
    return $queue->ack($msg->getDeliveryTag()); 
} 

код Производитель

$exchange->publish('message'); 
+0

На каком языке вы владеете и можете ли вы предоставить код? – pinepain

+0

@ zaq178miami, см. Мое отредактированное сообщение –

+0

@Bram_Gerritsen, см. Мой ответ update – pinepain

ответ

15

Если сообщение ш как не подтверждено, и приложение не будет выполнено, оно будет автоматически отправлено автоматически, и на конверте будет установлено значение : true (если вы не используете их с флагом no-ack = true).

UPD:

Вы должны nack сообщений с флагом Redelivery в своем улове блоке

try { 
     //Do some business logic 
    } catch (Exception $ex) { 
     //Log exception 
     return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); 
    } 

Берегитесь бесконечно сообщениями голых во время подсчета обратной передачи не осуществляется в RabbitMQ и в AMQP протокола вообще.

Если вы не хотите возиться с такими сообщениями и просто хотите добавить некоторую задержку вы можете захотеть добавить sleep() или usleep() до того nack вызова метода, но это не очень хорошая идея.

Есть несколько методов, чтобы иметь дело с проблемой цикла: возвращать

1. Положитесь на Dead Letter Exchanges

  • плюсов: надежные, стандартных, четких
  • минусов: требует дополнительной логики

2.Используйте per message or per queue TTL

  • плюсы: легко реализовать, а также стандартные, четкие
  • минусы: длинные очереди вы можете потерять некоторое сообщение

Примеры (обратите внимание, что для очереди ТТЛ мы проходим только номер и для сообщения ТТЛ - все, что будет числовой строкой):

2,1 за сообщение ТТЛ:

$queue = new AMQPQueue($channel); 
$queue->setName('my-queue'); 
$queue->declareQueue(); 
$queue->bind('my-exchange'); 

$exchange->publish(
    'message at ' . microtime(true), 
    null, 
    AMQP_NOPARAM, 
    array(
     'expiration' => '1000' 
    ) 
); 

2.2. За очереди ТТЛ:

$queue = new AMQPQueue($channel); 
$queue->setName('my-queue'); 
$queue->setArgument('x-message-ttl', 1000); 
$queue->declareQueue(); 
$queue->bind('my-exchange'); 

$exchange->publish('message at ' . microtime(true)); 

3. Проведение подсчета или повторно доставляет левый ряд повторно доставляет (ака хмель предел или ТТЛ в стеке IP) в теле сообщения или заголовки

  • профи: дать вам дополнительный контроль на срок службы сообщений на уровень приложения
  • минус: существенные накладные расходы, пока вы должны изменить сообщение и опубликовать его снова, не зависит от конкретного приложения

Код:

$queue = new AMQPQueue($channel); 
$queue->setName('my-queue'); 
$queue->declareQueue(); 
$queue->bind('my-exchange'); 

$exchange->publish(
    'message at ' . microtime(true), 
    null, 
    AMQP_NOPARAM, 
    array(
     'headers' => array(
      'ttl' => 100 
     ) 
    ) 
); 

$queue->consume(
    function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) { 
     $headers = $msg->getHeaders(); 
     echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' '; 
     echo $msg->getDeliveryTag(), ' '; 
     echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' '; 
     echo $msg->getBody(), PHP_EOL; 

     try { 
      //Do some business logic 
      throw new Exception('business logic failed'); 
     } catch (Exception $ex) { 
      //Log exception 
      if (isset($headers['ttl'])) { 
       // with ttl logic 

       if ($headers['ttl'] > 0) { 
        $headers['ttl']--; 

        $exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers)); 
       } 

       return $queue->ack($msg->getDeliveryTag()); 
      } else { 
       // without ttl logic 
       return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue 
      } 

     } 

     return $queue->ack($msg->getDeliveryTag()); 
    } 
); 

Есть могут быть некоторые другие способы улучшения сообщения управления потоком повторно доставляет.

Заключение: нет решения для серебряной пули. Вы должны решить, какое решение соответствует вашим потребностям наилучшим образом или узнать что-то другое, но не забудьте поделиться им здесь;)

+0

Спасибо за ваш ответ. 'redelivered' действительно установлен в' true', но мне нужно перезагрузить мой блокирующий потребитель, чтобы повторно использовать сообщение. –

+0

Спасибо, это именно то, что мне нужно. Не могли бы вы дать мне несколько советов/советов, как предотвратить бесконечно перенаправленные сообщения? Было бы неплохо, если бы я мог отложить требование к очереди на заданное количество секунд, поэтому я не перегружаю свой потребительский сервер. –

+0

вот вы, обновленный ответ снова – pinepain

0

Если вы не хотите перезагружать пользователя, то basic.recover Команда AMQP может быть тем, что вы хотеть. Согласно AMQP protocol:

basic.recover(bit requeue) 

Redeliver unacknowledged messages. 

This method asks the server to redeliver all unacknowledged messages on a specified channel. 
Zero or more messages may be redelivered. This method replaces the asynchronous Recover. 
+0

Этот метод не является частью API-интерфейса клиента, который я использую. http://www.php.net/manual/en/book.amqp.php –

+1

RabbitMQ имеет частичную поддержку этого метода, см. [официальный документ на нем] (https://www.rabbitmq.com/specification.html# метод-статус-basic.recover) – pinepain

Смежные вопросы