2010-09-21 3 views
12

У меня есть что-то вроде очереди на работу над RabbitMQ, и после запроса на отмену задания я хотел бы отменить задачи, которые еще не начали обрабатывать (их сообщения не были ack'd), что соответствует чтобы убрать эти сообщения из очередей, к которым они были направлены.Как убрать сообщение в RabbitMQ?

Я не нашел эту функцию в AMQP или в RabbitMQ API; возможно, я не искал достаточно хорошо? Или мне придется использовать обходной путь (это не сложно, но все же)?

ответ

2

По крайней мере, два пути для достижения своей цели:

  • basic.reject будет снова поставить в сообщение, если requeue=true установлен (в противном случае он будет отвергать сообщение).
    (поддерживается с RabbitMQ 2.0.0, см. http://www.rabbitmq.com/blog/2010/08/03/well-ill-let-you-go-basicreject-in-rabbitmq/).

  • basic.recover попросит брокера переадресовать неуправляемые сообщения на канал.

+2

Нет, это решает другую проблему. Мне не нужно отклонять сообщение на стороне потребителя, я хочу отменить его доставку со стороны производителя, чтобы сообщение не дошло до потребителя, как если бы оно вообще не существовало. В моей проблеме потребитель не может решить, следует ли отклонять сообщение. – jkff

+4

Если сообщения успешно опубликованы, и вам нужно удалить их из известной очереди, затем подписаться на нее и использовать их в качестве производителя. –

4

RabbitMQ не позволяет изменять или удалять сообщения после их размещения в очереди. Для этого вы хотите, чтобы какая-то база данных содержала состояние каждой работы и использовать RabbitMQ для уведомления заинтересованных сторон об изменениях в этом состоянии.

Для слабых томов вы можете клонировать его вместе с очередью на каждое задание. Создайте очередь, опубликуйте описание задания в очереди, объявите имя очереди рабочим. Если задание нужно отменить до его обработки, удалите очередь задания; когда рабочие приходят за описанием работы, они заметят, что очередь исчезла.

Легкий вес и, как правило, лучше использовать redis или другое хранилище ключей/значений для хранения состояния задания (с удаленной или отсутствующей записью, означающей отмененную или несуществующую работу) и использовать rabbitmq для уведомления о новых/удаленных/измененных записи в хранилище ключей/значений.

1

Вам необходимо подписаться на все очереди, на которые были отправлены сообщения, и использовать их с помощью ack.

Например, если вы публикуете на тему обмена с «тестом» в качестве ключа маршрутизации, и есть 3 постоянных очереди, которые подписываются на «тест», вам нужно будет использовать эти три очереди. Возможно, было бы лучше добавить еще одну очередь, которую также будут слушать ваши потребительские процессы, и попросите их игнорировать эти сообщения.

Альтернативой, поскольку вы используете RabbitMQ, является создание настраиваемого обменного плагина, который будет принимать некоторую командную команду для очистки всех очередей. Например, у вас может быть этот обмен, который читает специальный заголовок сообщения, который сообщает ему очистить все очереди, которым предназначено это сообщение. Это требует написания кода Erlang, но есть 4 разных типа обмена, поэтому вам нужно будет скопировать наиболее похожий код и написать код для новых bahaviours. Если для этого используются только пользовательские заголовки, то тело сообщения может быть нормальным сообщением для потребителей.

Подводя итог:

1) издатель должен потреблять сообщения сам 2) издатель может отправить специальное сообщение в специальную очередь, чтобы сказать потребителям игнорировать сообщение 3) издатель может отправить специальное сообщение для пользовательского обмена, которое очистит все существующие сообщения от очередей до отправки этого специального сообщения потребителям.

7

Я бы решил этот сценарий, попросив работника проверить какой-то авторитетный источник данных, чтобы определить, должно ли оно выполняться или нет. Например, рабочий проверяет статус задания в базе данных, чтобы узнать, было ли задание уже отменено.

Для сценариев, в которых скорость обработки заданий может быть выше скорости, с которой можно обновлять и считывать авторитетный магазин, может оказаться полезным менее гарантированное хранилище данных, которое торгует скоростью для других характеристик.

Примером этого может быть использование Redis в качестве хранилища для отмены обработки сообщения вместо реляционной базы данных, такой как MySQL. Redis работает очень быстро, но дает меньше гарантий относительно данных, которые он хранит, тогда как MySQL намного медленнее, но предлагает больше гарантий относительно данных, которые он хранит.

В конце концов, концепция проверки с другим источником для того, обрабатывать сообщение или нет, такая же, но способ реализации, который зависит от вашего конкретного сценария.

Смежные вопросы