2013-02-27 3 views
0

Я взаимодействую с ActiveMQ через STOMP. У меня есть один процесс, который публикует сообщения и несколько процессов, которые подписываются и обрабатывают сообщения (около 10 параллельных экземпляров).Неблокирующие транзакции при чтении из очереди ActiveMQ с помощью STOMP

После прочтения сообщения я хочу быть уверенным, что если по какой-то причине мое приложение завершилось с ошибкой/сбой, сообщение не будет потеряно. Поэтому, естественно, я обратился к транзакциям. К сожалению, я обнаружил, что, как только потребитель читает сообщение как часть транзакции, все следующие сообщения не отправляются другим потребителям до завершения транзакции.

Тестовый кейс: abc queue имеет 100 сообщений. Если я активирую следующий код в двух разных вкладках браузера, первый вернется через 10 секунд, а второй вернется через 20 секунд.

<?php 
// Reader.php 
$con = new Stomp("tcp://localhost:61613"); 
$con->connect(); 

$con->subscribe(
    "/queue/abc", 
    array() 
); 

$tx = "tx3".microtime(); 
echo "TX:$tx<BR>"; 
$con->begin($tx); 
$messages = array(); 
for ($i = 0; $i < 10; $i++) { 
    $t = microtime(true); 
    $msg = $con->readFrame(); 
    if (!$msg) { 
     die("FAILED!"); 
    } 
    $t = microtime(true)-$t; echo "readFrame() took $t MS to complete<BR>"; 
    array_push($messages, $msg); 
    $con->ack($msg, $tx); 
    sleep(1); 
} 
$con->abort($tx); 

Есть ли что-то, что мне не хватает по коду? Есть ли способ настроить ActiveMQ (или отправить заголовок), который заставит транзакцию удалить элемент из очереди, разрешить другим процессам использовать другие сообщения, а если транзакция завершится с ошибкой или будет выполнена тайм-аут, она вернет элемент обратно ?

PS: Я думал о создании другой очереди - DetentionQueue для каждого процесса чтения, но я действительно не делаю этого, если у меня есть выбор.

ответ

1

Возможно, вы захотите изменить размер предварительной выборки подписки, чтобы ActiveMQ не отправлял сообщения в очереди на клиент 1, прежде чем клиент 2 получит шанс получить какой-либо. По умолчанию его значение равно 1000, поэтому лучше настроить его для вашего варианта использования.

Вы можете установить размер предварительной выборки с помощью заголовка «activemq.prefetchSize = 1» на фрейме подписки. Обратитесь к странице ActiveMQ Stomp для всех параметров фрейма.

+0

В моей конкретной версии ActiveMQ размер предварительной выборки по умолчанию был 1 (проверено на консоли AvctiveMQ). Чтобы быть уверенным, я проверил ваше предложение (используя activemq.prefetchSize = 1), но он не изменил результат. Но спасибо, во всяком случае :) – ygilad