Если сообщение ш как не подтверждено, и приложение не будет выполнено, оно будет автоматически отправлено автоматически, и на конверте будет установлено значение : true
(если вы не используете их с флагом no-ack = true
).
UPD:
Вы должны nack
сообщений с флагом Redelivery в своем улове блоке
try {
//Do some business logic
} catch (Exception $ex) {
//Log exception
return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE);
}
Берегитесь бесконечно сообщениями голых во время подсчета обратной передачи не осуществляется в RabbitMQ и в AMQP протокола вообще.
Если вы не хотите возиться с такими сообщениями и просто хотите добавить некоторую задержку вы можете захотеть добавить sleep()
или usleep()
до того nack
вызова метода, но это не очень хорошая идея.
Есть несколько методов, чтобы иметь дело с проблемой цикла: возвращать
1. Положитесь на Dead Letter Exchanges
- плюсов: надежные, стандартных, четких
- минусов: требует дополнительной логики
2.Используйте per message or per queue TTL
- плюсы: легко реализовать, а также стандартные, четкие
- минусы: длинные очереди вы можете потерять некоторое сообщение
Примеры (обратите внимание, что для очереди ТТЛ мы проходим только номер и для сообщения ТТЛ - все, что будет числовой строкой):
2,1 за сообщение ТТЛ:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish(
'message at ' . microtime(true),
null,
AMQP_NOPARAM,
array(
'expiration' => '1000'
)
);
2.2. За очереди ТТЛ:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->setArgument('x-message-ttl', 1000);
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish('message at ' . microtime(true));
3. Проведение подсчета или повторно доставляет левый ряд повторно доставляет (ака хмель предел или ТТЛ в стеке IP) в теле сообщения или заголовки
- профи: дать вам дополнительный контроль на срок службы сообщений на уровень приложения
- минус: существенные накладные расходы, пока вы должны изменить сообщение и опубликовать его снова, не зависит от конкретного приложения
Код:
$queue = new AMQPQueue($channel);
$queue->setName('my-queue');
$queue->declareQueue();
$queue->bind('my-exchange');
$exchange->publish(
'message at ' . microtime(true),
null,
AMQP_NOPARAM,
array(
'headers' => array(
'ttl' => 100
)
)
);
$queue->consume(
function (AMQPEnvelope $msg, AMQPQueue $queue) use ($exchange) {
$headers = $msg->getHeaders();
echo $msg->isRedelivery() ? 'redelivered' : 'origin', ' ';
echo $msg->getDeliveryTag(), ' ';
echo isset($headers['ttl']) ? $headers['ttl'] : 'no ttl' , ' ';
echo $msg->getBody(), PHP_EOL;
try {
//Do some business logic
throw new Exception('business logic failed');
} catch (Exception $ex) {
//Log exception
if (isset($headers['ttl'])) {
// with ttl logic
if ($headers['ttl'] > 0) {
$headers['ttl']--;
$exchange->publish($msg->getBody(), $msg->getRoutingKey(), AMQP_NOPARAM, array('headers' => $headers));
}
return $queue->ack($msg->getDeliveryTag());
} else {
// without ttl logic
return $queue->nack($msg->getDeliveryTag(), AMQP_REQUEUE); // or drop it without requeue
}
}
return $queue->ack($msg->getDeliveryTag());
}
);
Есть могут быть некоторые другие способы улучшения сообщения управления потоком повторно доставляет.
Заключение: нет решения для серебряной пули. Вы должны решить, какое решение соответствует вашим потребностям наилучшим образом или узнать что-то другое, но не забудьте поделиться им здесь;)
На каком языке вы владеете и можете ли вы предоставить код? – pinepain
@ zaq178miami, см. Мое отредактированное сообщение –
@Bram_Gerritsen, см. Мой ответ update – pinepain