Я наблюдаю странное поведение при этом в РОС RabbitMQ 3.3.5 с помощью пружинного кролика 1.3.9.RELEASE библиотекаОдновременные Производители блок на неопределенный срок в RabbitMQ
Когда я начинаю один продуцирующие нить, вещи работать бесперебойно. Но если запускать более 1 потока одновременно, только один из них когда-либо заканчивается, все остальные блокируются неограниченно, даже после того, как очередь становится пустой.
Состояние соединений заблокированных потоков остается работоспособным при мониторинге с rabbitmqctl list_connections
. Следует отметить, что при блокировании производителей или в любое другое время во время полного запуска нет сигналов тревоги.
Я также заметил, что проблема исчезает, если я спал 1 миллисекунду после каждой отправки.
Итак, у меня есть эти вопросы
- ли RabbitMQ не поддерживает одновременные производителей, публикации на высоких скоростях?
- Даже если соединения действительно заблокированы, почему он не отображается в rabbitmqctl list_connections?
- Почему они блокируются бесконечно и не восстанавливают очередь на сыворотку, становится пустой?
Код
public static void main(String[] argv) throws java.io.IOException, InterruptedException {
init();
PocConfig config = new PocConfig();
int threadCount = config.getThreadCount();
final int eventsPerThread = config.getEvents()/threadCount;
final long sleep = config.getSleep();
System.out.println("Start producer with configuration [threadCount=" + threadCount + ", events=" + eventsPerThread + ", sleep="
+ sleep + "]");
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
executorService.submit(new Runnable() {
public void run() {
produce(eventsPerThread, sleep, threadId);
}
});
}
waitAndTearDown(executorService);
}
private static void produce(int events, long sleep, int threadId) {
long start = System.currentTimeMillis();
for (int index = 1; index <= events; index++) {
try {
byte[] message = messageFactory.createTestMessage(index);
amqpTemplate.convertAndSend(QUEUE_NAME, message);
if (sleep > 0) {
Thread.sleep(sleep);
}
} catch (Exception e) {
LOG.error("Error", e);
}
}
long time = System.currentTimeMillis() - start;
System.out.println("Producer:" + threadId + " finished, events: " + events + ", Time(s): " + time/1000 + ", tps: " + (events * 1000)/time);
}
конфигурации Spring
<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="addresses" value="${addresses}" />
<property name="username" value="${user}" />
<property name="password" value="${passwd}" />
<property name="cacheMode" value="CONNECTION" />
<property name="connectionCacheSize" value="${threads}" />
<property name="channelCacheSize" value="10" />
</bean>
<rabbit:template id="template" connection-factory="connectionFactory"
exchange="testExchange" routing-key="testQueue"/>
Я тестирую с миллионами событий, так или иначе, вот основные части дампа потока http://pastebin.com/0B035D0j Я знаю, что они застряли, потому что они не печатают сообщение окончания, а также количество сообщений на кролике не увеличивается после того, как одна нить заканчивается. Вот пример журнала http://pastebin.com/37rk8bYp –
Гэри, вы тестируете с кроликом на локальном хосте? Я вижу это только при ударе rabbitmq на другой машине в кластере, а не на localhost. Что касается режима потока, да, я вижу, что одно соединение происходит в режиме потока, а затем восстанавливается, но все остальные соединения всегда работают. –
Gary, проблема, похоже, связана с ConnectionFactory, поскольку все работает нормально, если я запускаю 5 отдельных процессов с 1 потоком каждый или если у меня есть отдельная фабрика соединений для каждого потока в одном процессе. –