2013-07-25 2 views
1

Это вещь.Как удалить сообщения из очереди AMQP (RabbitMQ)?

Я читаю очередь результатов от Rabbitmq, используя PHP AMQP, чтобы обрабатывать важную информацию по каждому отправленному электронной почте. После этого мне нужно удалить или пометить это сообщение как написанное, поэтому в следующий раз, когда я прочитаю очередь, я не получу обработанные сообщения.

Поскольку сервер Rabbitmq отправляет более 10.000 писем в час, каждый раз, когда я читаю очередь для обработки отправлений результатов, сценарий может работать в течение 5 минут, чтобы обрабатывать все сообщения в очереди, поэтому после этого было сделано несколько сотен новых сообщений в течение этих 5 минут. Это делает невозможным очистку очереди после завершения скрипта, поскольку она удаляет места сообщений во время запуска скрипта, где не обрабатывается.

Это оставляет мне только один выбор. Отметьте или удалите сообщение сразу после обработки или чтения моего сценария AMQP.

Есть ли способ сделать это? (Здесь сценарий)

<?php 
/** 
* result.php 
* Script that connects to RabbitMQ, and takes the result message from 
* the result message queue. 
*/ 

// include the settings 
require_once('settings.php'); 

// try to set up a connection to the RabbitMQ server 
try 
{ 
    // construct the connection to the RabbitMQ server 
    $connection = new AMQPConnection(array(
     'host'  => $hostname, 
     'login'  => $username, 
     'password' => $password, 
     'vhost'  => $vhost 
    )); 

    // connect to the RabbitMQ server 
    $connection->connect(); 
} 
catch (AMQPException $exception) 
{ 
    echo "Could not establish a connection to the RabbitMQ server.\n"; 
} 

// try to create the channel 
try 
{ 
    // open the channel 
    $channel = new AMQPChannel($connection); 
} 
catch (AMQPConnectionException $exception) 
{ 
    echo "Connection to the broker was lost (creating channel).\n"; 
} 

// try to create the queue 
try 
{ 
    // create the queue and bind the exchange 
    $queue = new AMQPQueue($channel); 
    $queue->setName($resultbox); 
    $queue->setFlags(AMQP_DURABLE); 
    $queue->bind('exchange1', 'key1'); 
    $queue->declare(); 
} 
catch (AMQPQueueException $exception) 
{ 
    echo "Channel is not connected to a broker (creating queue).\n"; 
} 
catch (AMQPConnectionException $exception) 
{ 
    echo "Connection to the broker was lost. (creating queue)/\n"; 
} 

// Get the message from the queue. 
while ($envelope = $queue->get()) { 
    //Function that processes the message 
    process_message($envelope->getBody()); 
} 
    $queue->purge(); 

// done, close the connection to RabbitMQ 
$connection->disconnect(); 
?> 

ответ

3

сообщение подтверждения (ы) $queue->ack() после успешной обработки или даже потреблять/получить их с AMQP_AUTOACK флагом.

UPD:

на основе кода:

1. Ack'ing сообщение
while ($envelope = $queue->get()) { 
    //Function that processes the message 
    process_message($envelope->getBody()); 
    $queue->ack($envelope->getDeliveryTag()); 
} 
2. Получение его с AMQP_AUTOACK флагом:
while ($envelope = $queue->get(AMQP_AUTOACK)) { 
    //Function that processes the message 
    process_message($envelope->getBody()); 
} 

PS:

Проверьте документацию AMQPQueue::consume, похоже, что здесь больше подходит.

3. Вы можете потреблять и извед сообщение после того, как оно было обработано:
$queue->consume(function ($envelope, $queue) { 
     process_message($envelope->getBody()); 
     $queue->ack($envelope->getDeliveryTag()); 
}); 
4. или потреблять с AMQP_AUTOACK флагом, но когда ОБРАБОТКА терпит неудачу, вы не будете в состоянии обработать сообщение снова:
$queue->consume(function ($envelope, $queue) { 
     process_message($envelope->getBody()); 
     $queue->ack($envelope->getDeliveryTag()); 
}, AMQP_AUTOACK); 

Вывод: я бы рекомендовал использовать решение №3, но это зависит от вас.

+0

Как я могу реализовать его в PHP-коде? Спасибо – rodvela

+0

Обновлен мой ответ с кодом. Я уверен, что вы уже прочитали его, но на всякий случай, вот официальный документ http://php.net/manual/en/book.amqp.php, он немного устарел в некоторых частях, но все же хорош, чтобы понять, как используйте php-amqp и некоторые примеры, которые могут оказаться полезными – pinepain

+0

ps посмотрите, например, здесь https://github.com/pinepain/amqpy/tree/master/demo/canonical – pinepain

Смежные вопросы