2016-10-06 3 views
1

Мы используем контейнер для прослушивания весной amqp - около 40 потребителей, счет предварительной выборки 1. Сообщение ttl составляет около 60 секунд, прежде чем оно попадет в очередь мертвой буквы.RabbitMQ загрузка потребителя при высокой нагрузке

Операция, выполняемая каждым потребителем, - это обновление базы данных, которое медленнее, чем скорость, с которой сообщения поступают в очередь.

После некоторого количества сообщений в очереди очередь загрузки потребителя уменьшается до нуля. У меня создалось впечатление, что потребители заблокированы в базе данных. Однако, если я смотрю на дамп потока, все потребители находятся в состоянии ожидания на mb-кролике - никакие сообщения не обрабатываются.

"SimpleAsyncTaskExecutor-7" #51 prio=5 os_prio=0 tid=0x00007fcb01ad0800 nid=0x58f7 waiting on condition [0x00007fcae5af1000] 
    java.lang.Thread.State: TIMED_WAITING (parking) 
    at sun.misc.Unsafe.park(Native Method) 
    - parking to wait for <0x00000000854c30c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) 
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) 
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) 
    at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467) 
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:390) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1097) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1086) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:93) 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1203) 
    at java.lang.Thread.run(Thread.java:745) 

Очередь также попадает в состояние потока.

Не знаете, почему обработка сообщений в очереди остановлена. Я понимаю, что публикация ограничена.

Любые предложения помогут.

+0

Я не особо знаком с Spring AMQP, но количество предварительной выборки 1 звучит крайне низко (например, у другой библиотеки, которую я использовал в прошлом, по умолчанию используется 50). Была ли причина для этого, и вы экспериментировали с разными ценностями? Увеличение количества выборки предварительной выборки обычно означает, как вы увеличиваете использование отдельных потребителей, поскольку вы по существу предоставляете каждому потребителю «больше работы». Кроме того, какой объем сообщений (например, в секунду) мы говорим здесь? – Donut

+0

Скорость сообщения составляет> 700 в секунду. Я экспериментировал с prefech, но проблема в том, что очередь отключена. Также у меня есть только одно TCP-соединение с кроликом для всех каналов, которые у меня есть. Это лимитирующий фактор? – Srikanth

+0

В моем опыте, как правило, достаточно одного TCP-соединения. Каков средний размер этих сообщений? Вы имеете дело с одним экземпляром RMQ или кластером (и если да, то очередь зеркалируется через один или несколько узлов?). Являются ли ваши экземпляры дисков RMQ экземплярами или узлами ОЗУ? Постоянна ли очередь? Кроме того, прочитал ли вы этот пост в блоге? http://www.rabbitmq.com/blog/2014/04/14/finding-bottlenecks-with-rabbitmq-3-3/. Существует довольно много полезной информации о том, что может вызвать механизм управления потоком активировать. – Donut

ответ

0

Я никогда не знаю, как ответить на «предложения» на SO, поэтому я предлагаю :)

Вот несколько предложений:

  • увеличить число потребителей
  • увеличить префикс предварительной выборки

Теперь я не могу сказать вам, какие именно значения должны быть точно настроены. Также вы можете попробовать одну из этих вещей или и то, и другое. Возможно, this article может дать вам приблизительную идею о том, как начать (то есть какие значения).

Кроме того, вы также можете масштабировать его, чтобы зеркалировать очереди на пару узлов в кластере и потреблять сообщения оттуда.

Также проверьте this article. Кредитный поток похож на то, что вы также можете попробовать, а также на пейджинг сообщений.

Смежные вопросы