2016-07-08 2 views
4

Мое приложение читает сообщения через класс Jms MessageListener и в некоторый момент времени бросает TaskRejectedException. Я знаю, что большинство из вас скажут, что количество потоков превышено на maxPoolSize и очередь также заполнена.ThreadPoolExecutor:: TaskRejectedException from Executor

Но я что-то заметил. Количество сообщений, отправленных в очередь, из которой класс MessageListener является извлечение сообщений 10353 и моей весенней недвижимости для ThreadPoolExecutor ниже:

<bean id="ticketReaderThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" scope="singleton" destroy-method="destroy"> 
    <property name="corePoolSize" value="10" /> 
    <property name="maxPoolSize" value="150" /> 
    <property name="queueCapacity" value="11000" /> 
</bean> 

Теперь по мне, то MaxPoolSize более чем достаточно, чтобы справиться с этим много запросов , Поэтому, если кто-либо из вас может объяснить причину, кроме нарушения maxPoolSize, пожалуйста, сделайте это.

Мы сталкиваемся с этой проблемой во второй раз, ранее мы уже пытались увеличить maxPoolSize, но снова через 15 дней мы испытываем это исключение примерно от 5000 до 8000 раз в день.

Update:

Это полный стек след за исключением:

Общее исключение при чтении из очереди/обработки сообщения org.springframework.core.task.TaskRejectedException: Исполнитель [[email protected]] не принял задачу: [email protected] at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute (ThreadPoolTaskExecutor.java:244) на com.batman.rapid.rapidserver.sla.JmsTicketReceiver.onMessage (JmsTicketReceiver.java:58) в org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener (AbstractMessageListenerContainer.java:560) на org.springframework.jms. listener.AbstractMessageListenerContainer.invokeListener (AbstractMessageListenerContainer.java:498) на org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener (AbstractMessageListenerContainer.java:467) на org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute (AbstractPollingMessageListenerContainer.java: 325) at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute (AbstractPollingMessageListenerContainer.java:263) at org.springfram ework.jms.listener.DefaultMessageListenerContainer $ AsyncMessageListenerInvoker.invokeListener (DefaultMessageListenerContainer.java:1058) в org.springframework.jms.listener.DefaultMessageListenerContainer $ AsyncMessageListenerInvoker.executeOngoingLoop (DefaultMessageListenerContainer.java:1050) в org.springframework.jms.listener. DefaultMessageListenerContainer $ AsyncMessageListenerInvoker.run (DefaultMessageListenerContainer.java:947) в java.lang.Thread.run (Thread.java:662) Вызванный: java.util.concurrent.RejectedExecutionException в java.util.concurrent.ThreadPoolExecutor $ AbortPolicy .rejectedExecution (ThreadPoolExecutor.java:1774) по адресу java.util.concurrent.ThreadPoolExecutor.reject (ThreadPoolExecutor.java:768) по адресу java.util.concurrent.ThreadPoolExecutor.ex ecute (ThreadPoolExecutor.java:656) at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute (ThreadPoolTaskExecutor.java:241) ...10 более

Это соответствующий код:

if (message instanceof TextMessage) 
{ 
    textMessage = (TextMessage) message; 
    ticketReaderThreadPool.execute(new TicketHandler(textMessage.getText())); 
} 

Ниже конфигурация запрошенный:

<!-- End of JMS Queue Support --> 

    <bean id="ticketReaderThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" scope="singleton" destroy-method="destroy"> 
    <property name="corePoolSize" value="10" /> 
    <property name="maxPoolSize" value="150" /> 
    <property name="queueCapacity" value="11000" /> 
</bean> 

    <bean id="notificationThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" scope="singleton" destroy-method="destroy"> 
      <property name="corePoolSize" value="10" /> 
      <property name="maxPoolSize" value="100" /> 
      <property name="queueCapacity" value="10000" /> 
    </bean> 

    <bean id="notificationManager" class="com.batman.rapid.rapidserver.sla.scheduler.NotificationManager" scope="singleton"> 
      <property name="defaultPercent" value="80"></property> 
    </bean> 

    <bean id="dbUpdateThreads" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" scope="singleton" destroy-method="destroy"> 
      <property name="corePoolSize" value="1" /> 
      <property name="maxPoolSize" value="100" /> 
      <property name="queueCapacity" value="10000" /> 
    </bean> 
+0

Могут ли мы увидеть конфигурации полного JMS ? –

+0

Я тебя не понял. Пожалуйста, предоставьте подробную информацию о своих требованиях. –

+0

Мы хотели бы видеть соответствующий файл конфигурации весны jms. –

ответ

1

Не уверен, что точная причина. Однако я твердо чувствую, что максимальное количество сообщений может превышать установленный maxqueueCapacity. Однако используемый вами подход отличается от общего использования JMS. Обычно в DefaultMessageListener мы настраиваем количество Max-потребителей для параллельной обработки для асинхронной обработки. на основе ресурсов сервера мы настраиваем максимальный потребитель, который может обрабатывать сервер

Однако в вашем случае от DefaultMessageListener сообщение читается, и вы создаете новый поток и выполняете свою бизнес-логику в новом потоке. поскольку чтение слушателя сообщения быстрее, чем бизнес-логика, задача накапливается в очереди задач потока.

Я бы предложил повторно подойти к вашей текущей реализации. поскольку он не поддерживает транзакции. Я имею в виду, что при сбое сервера все ожидающие обработки сообщений в задачах будут прекращены/потеряны, и поскольку вы установили auto-ack, сообщение было бы удалено из очереди. Другой вопрос хотел задать вручную ограничить очереди потоков и т.д.

Наконец, возвращаясь к вашему вопросу, вы можете сделать быструю проверку, если есть какая-либо блокировка в 150 нитях вы установили

Смежные вопросы