2010-11-24 3 views
11

Мы используем ThreadPoolExecutor в нашем JMS-клиенте и вставляем его в DefaultMessageListenerContainer. Я ожидаю, что это приведет к одновременным потокам для многих сообщений, однако наши журналы показывают, что идентификатор потока не изменится. Наш журнал показывает, что для различной обработки сообщений идентификатор потока всегда одинаковый на 24.Spring ThreadPoolTaskExecutor работает только один поток

Это конфигурация пружины в этом сценарии:

<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"  
     p:connectionFactory-ref="cachedConnectionFactory" 
     p:destination-ref="formsCRRDestination" 
     p:messageListener-ref="formServicePojo" 
     p:concurrentConsumers="5" 
     p:idleTaskExecutionLimit="1" 
     p:maxConcurrentConsumers="25" 
     p:taskExecutor-ref="threadPoolExecutor"   
     destroy-method="doShutdown"  
    > 


<bean id="threadPoolExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" > 
     <property name="corePoolSize" value="1"/> 
     <property name="maxPoolSize" value="15"/> 
     <property name="keepAliveSeconds" value="30"/> 
    </bean> 

После не вводя боб threadPoolExectuor в DefaultMessageListenerContainer, сообщения в настоящее время выполняются в разных потоках.

Это результирующая конфигурация:

<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"  
      p:connectionFactory-ref="cachedConnectionFactory" 
      p:destination-ref="formsCRRDestination" 
      p:messageListener-ref="formServicePojo" 
      p:concurrentConsumers="5" 
      p:idleTaskExecutionLimit="1" 
      p:maxConcurrentConsumers="25"  
      destroy-method="doShutdown"  
     > 

Я попытался чтения документации, и я не понимаю, почему это происходит. Любое объяснение?

+0

Я не в jms, но вы пытались отправить много сообщений одновременно? Я предполагаю, что механизм здесь состоит в том, чтобы начать новый поток только по требованию (т. Е. Нет простоя и появляется новое сообщение). – 2010-11-24 17:21:53

+0

Да, я попытался отправить много сообщений одновременно, только некоторые сообщения занимают много времени. – Jeune 2010-11-24 17:24:25

ответ

14

После прохождения ThreadPoolTaskExecutor code in Spring почитав документацию Java для ThreadPoolTaskExecutor Я думаю, что это ответ:

Неограниченные очереди. Использование неограниченной очереди (например, LinkedBlockingQueue без предопределенной емкости ) вызовет новые задачи в случаях, когда все потоки corePoolSize заняты. Таким образом, не более, чем потоки corePoolSize, будут создаваться . (И значение maximumPoolSize поэтому не имеет никакого эффекта.)

В нашей конфигурации выше, мы использовали LinkedBlockingQueue по умолчанию, и наш corePoolSize равен 1. Поэтому maximumPoolSize не будет имеют какой-либо эффект.

+1

Рассмотрите возможность редактирования ответа, чтобы добавить информацию о том, как исправить проблему. Какие коды java или весны должны быть изменены? Благодарю. – Gray 2017-02-23 16:44:10

2

изменить corePoolSize на 10, тогда вы получите 10 потоков одновременно. Прочитайте javadoc на java.util.concurrent.ThreadPoolExecutor которая является основой весеннего ThreadPoolTaskExecutor, то у вас будет лучше понять, как конфиг в corePoolSize и MaxPoolSize и queueCapacity

7

Я думаю, что выбрали ответ неверен. IIRC, путь ThreadPoolTaskExecutor (в конечном счете ThreadPoolExecutor в JDK) работает в

  1. ThreadPoolTaskExecutor создавать темы, чтобы до corePoolSize, когда он инициировал.
  2. Он принимает запрос до corePoolSize и позволяет потоку обрабатывать задачу.
  3. Если количество входящих запросов больше, а все потоки заняты, ThreadPoolTaskExecutor запускает их запрос во внутреннюю очередь. Это может быть проблематично, так как этот размер очереди будет Integer.MAX_VALUE по умолчанию, если вы не укажете queue queueCapacity.
  4. Запрос, добавленный в # 3, будет выполняться потоком, если в пуле есть какие-либо доступные потоки.
  5. Если запросы продолжаются и все потоки заняты & очередь заполнена, ThreadPoolTaskExecutor начинает создавать новые потоки до maxPoolSize для обработки запросов.
  6. Если запросы на них (количество увеличенных потоков + размер очереди), тогда задача будет отклонена или указанная вами политика.

Таким образом, проблема в том, что я думаю: 1) ваш потребитель достаточно быстр или 2) вы выполняете слишком сложные запросы, поэтому один поток, указанный вами с помощью corePoolSize, достаточен для обработки новых входящих запросов + поставленной задачи без позволяя ThreadPoolTaskExecutor создавать новые потоки. Я уверен, что если вы нажмете его сильнее или установите емкость очереди с небольшим числом (например, 5 ~ 10), вы сможете увидеть, сколько потоков увеличивается.

28

попробовать это:

<bean id="threadPoolTaskExecutor" 
     class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
     <property name="corePoolSize" value="10" /> 
     <property name="maxPoolSize" value="25" /> 
     <property name="queueCapacity" value="30" /> 
</bean> 
  • Это создаст 10 потоков во время инициализации.
  • Если все 10 потоков заняты и появляется новая задача, то она будет держать задачи в очереди.
  • Если очередь заполнена, она создаст 11-ю нить и пойдет до 25.
  • Затем выкинет TaskRejected Exception.
Смежные вопросы