2010-09-21 4 views
8

Я бы хотел отправить сообщение на сервер RabbitMQ, а затем дождаться ответного сообщения (в очереди «ответ»). Конечно, я не хочу ждать вечно, если приложение, обрабатывающее эти сообщения, не работает - должен быть тайм-аут. Это звучит как очень простая задача, но я не могу найти способ сделать это. Я столкнулся с этой проблемой с Java API.RabbitMQ Ждать сообщения с таймаутом

ответ

0

com.rabbitmq.client.QueueingConsumer имеет nextDelivery(long timeout) метод, который будет делать то, что вы хотите. Однако это устарело. Написание собственного тайм-аута не так сложно, хотя может быть лучше иметь текущий поток и список идентификаторов времени, а не добавлять и удалять потребителей и связанные потоки времени.

Редактировать добавить: Заметили дату после ответа!

0

Я применил эту проблему с помощью C#, создав объект для отслеживания ответа на конкретное сообщение. Он устанавливает уникальную очередь сообщений для сообщения и подписывается на него. Если ответ не получен в указанный таймфрейм, таймер обратного отсчета отменяет подписку, которая удаляет очередь. Отдельно у меня есть методы, которые могут быть синхронны из моего основного потока (использует семафор) или асинхронного (использует обратный вызов) для использования этой функции.

В принципе, реализация выглядит следующим образом:

//Synchronous case: 
//Throws TimeoutException if timeout happens 
var msg = messageClient.SendAndWait(theMessage); 

//Asynchronous case 
//myCallback receives an exception message if there is a timeout 
messageClient.SendAndCallback(theMessage, myCallback); 
2

Клиентская библиотека RabbitMQ Java Теперь supports a timeout argument to its QueueConsumer.nextDelivery() method.

Например, RPC учебник используется следующий код:

channel.basicPublish("", requestQueueName, props, message.getBytes()); 

while (true) { 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
    if (delivery.getProperties().getCorrelationId().equals(corrId)) { 
     response = new String(delivery.getBody()); 
     break; 
    } 
} 

Теперь вы можете использовать consumer.nextDelivery(1000) ждать максимальной одной секунды. Если таймаут достигнут, метод возвращает null.

channel.basicPublish("", requestQueueName, props, message.getBytes()); 

while (true) { 
    // Use a timeout of 1000 milliseconds 
    QueueingConsumer.Delivery delivery = consumer.nextDelivery(1000); 

    // Test if delivery is null, meaning the timeout was reached. 
    if (delivery != null && 
     delivery.getProperties().getCorrelationId().equals(corrId)) { 
     response = new String(delivery.getBody()); 
     break; 
    } 
} 
Смежные вопросы