2015-05-30 2 views
0

Я наблюдаю странное поведение при этом в РОС RabbitMQ 3.3.5 с помощью пружинного кролика 1.3.9.RELEASE библиотекаОдновременные Производители блок на неопределенный срок в RabbitMQ

Когда я начинаю один продуцирующие нить, вещи работать бесперебойно. Но если запускать более 1 потока одновременно, только один из них когда-либо заканчивается, все остальные блокируются неограниченно, даже после того, как очередь становится пустой.

Состояние соединений заблокированных потоков остается работоспособным при мониторинге с rabbitmqctl list_connections. Следует отметить, что при блокировании производителей или в любое другое время во время полного запуска нет сигналов тревоги.

Я также заметил, что проблема исчезает, если я спал 1 миллисекунду после каждой отправки.

Итак, у меня есть эти вопросы

  1. ли RabbitMQ не поддерживает одновременные производителей, публикации на высоких скоростях?
  2. Даже если соединения действительно заблокированы, почему он не отображается в rabbitmqctl list_connections?
  3. Почему они блокируются бесконечно и не восстанавливают очередь на сыворотку, становится пустой?

Код

public static void main(String[] argv) throws java.io.IOException, InterruptedException { 
     init(); 
     PocConfig config = new PocConfig(); 
     int threadCount = config.getThreadCount(); 
     final int eventsPerThread = config.getEvents()/threadCount; 
     final long sleep = config.getSleep(); 

     System.out.println("Start producer with configuration [threadCount=" + threadCount + ", events=" + eventsPerThread + ", sleep=" 
      + sleep + "]"); 

     ExecutorService executorService = Executors.newFixedThreadPool(threadCount); 
     for (int i = 0; i < threadCount; i++) { 
      final int threadId = i; 
      executorService.submit(new Runnable() { 
       public void run() { 
        produce(eventsPerThread, sleep, threadId); 
       } 
      }); 
     } 
     waitAndTearDown(executorService); 
    } 

    private static void produce(int events, long sleep, int threadId)  { 
     long start = System.currentTimeMillis(); 
     for (int index = 1; index <= events; index++) { 
      try { 
       byte[] message = messageFactory.createTestMessage(index); 
       amqpTemplate.convertAndSend(QUEUE_NAME, message); 
       if (sleep > 0) { 
        Thread.sleep(sleep); 
       } 
      } catch (Exception e) { 
       LOG.error("Error", e); 
      } 
     } 
     long time = System.currentTimeMillis() - start; 
     System.out.println("Producer:" + threadId + " finished, events: " + events + ", Time(s): " + time/1000 + ", tps: " + (events * 1000)/time); 
    } 

конфигурации Spring

<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> 
    <property name="addresses" value="${addresses}" /> 
    <property name="username" value="${user}" /> 
    <property name="password" value="${passwd}" /> 
    <property name="cacheMode" value="CONNECTION" /> 
    <property name="connectionCacheSize" value="${threads}" /> 
    <property name="channelCacheSize" value="10" /> 
</bean> 

<rabbit:template id="template" connection-factory="connectionFactory" 
    exchange="testExchange" routing-key="testQueue"/> 

ответ

1

Там нет ничего, что я могу думать о том, что будет блокировать, так что я просто побежал тест; и не было никаких проблем:

Start producer with configuration [threadCount=5, events=10, sleep=0] 
Producer:2 finished, events: 1000, Time(s): 0, tps: 4405 
Producer:3 finished, events: 1000, Time(s): 0, tps: 4132 
Producer:1 finished, events: 1000, Time(s): 0, tps: 4048 
Producer:0 finished, events: 1000, Time(s): 0, tps: 3968 
Producer:4 finished, events: 1000, Time(s): 0, tps: 3952 

Что заставляет вас думать, что они заблокированы?

Возьмите дамп потока (например, с jstack), чтобы посмотреть, что делают потоки.

EDIT:

Я до сих пор не может его воспроизвести, даже с сообщениями 1M и CacheMode CONNECTION ...

Start producer with configuration [threadCount=5, events=200000, sleep=0] 
Producer:0 finished, events: 200000, Time(s): 50, tps: 3959 
Producer:3 finished, events: 200000, Time(s): 53, tps: 3746 
Producer:1 finished, events: 200000, Time(s): 55, tps: 3635 
Producer:2 finished, events: 200000, Time(s): 55, tps: 3634 
Producer:4 finished, events: 200000, Time(s): 55, tps: 3629 

Я сделать см очередь переходит в режим flow (через admin UI), но все восстанавливается просто отлично.

Я вижу ваш работник находится под контролем потока ...

"pool-2-thread-3" prio=10 tid=0x00007f4af4849800 nid=0x65d5 runnable [0x00007f4ae082f000] 
java.lang.Thread.State: RUNNABLE 
    at java.net.SocketOutputStream.socketWrite0(Native Method) 
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) 

вы видите что-нибудь в кролике войти ли? Что вы видите в пользовательском интерфейсе администратора в отношении скорости обмена сообщениями, статуса и т. Д.?

Независимо от того, что это не имеет никакого отношения к Spring AMQP; вам нужно будет связаться с ребятами из rabbitmq в группе google rabbitmq-users.

(Я тестировал кролик 3.4.2).

EDIT2:

С совершенно чистой установкой 3.5.2 ...

Start producer with configuration [threadCount=5, events=200000, sleep=0] 
Producer:0 finished, events: 200000, Time(s): 39, tps: 5091 
Producer:1 finished, events: 200000, Time(s): 39, tps: 5002 
Producer:2 finished, events: 200000, Time(s): 40, tps: 4954 
Producer:3 finished, events: 200000, Time(s): 40, tps: 4951 
Producer:4 finished, events: 200000, Time(s): 40, tps: 4939 

и я не видел flow состояния в интерфейсе администратора (по очереди, но каналы/соединения показывают, что они были в потоке, но снова восстановились).

+0

Я тестирую с миллионами событий, так или иначе, вот основные части дампа потока http://pastebin.com/0B035D0j Я знаю, что они застряли, потому что они не печатают сообщение окончания, а также количество сообщений на кролике не увеличивается после того, как одна нить заканчивается. Вот пример журнала http://pastebin.com/37rk8bYp –

+0

Гэри, вы тестируете с кроликом на локальном хосте? Я вижу это только при ударе rabbitmq на другой машине в кластере, а не на localhost. Что касается режима потока, да, я вижу, что одно соединение происходит в режиме потока, а затем восстанавливается, но все остальные соединения всегда работают. –

+0

Gary, проблема, похоже, связана с ConnectionFactory, поскольку все работает нормально, если я запускаю 5 отдельных процессов с 1 потоком каждый или если у меня есть отдельная фабрика соединений для каждого потока в одном процессе. –

Смежные вопросы