2015-07-28 3 views
1

Я работаю над существующим кодом интеграции Spring, который застревает.Spring Integration - Task Executor stuck

Ниже код получает удар. Около 20 000 записей извлекаются из SQL-запроса и отправляются в сплиттер.

код:

<int-jdbc:outbound-gateway query="..." /> 

<int:splitter input-channel="..." output-channel="queueChannel"/> 

<int:channel id="queueChannel"> 
    <int:queue capacity="25" /> 
</int:channel> 

<int:service-activator ref="..." 
    input-channel="queueChannel" output-channel="..." method="xxx"> 
    <int:poller max-messages-per-poll="25" fixed-delay="100" 
    receive-timeout="30000" task-executor="reqExecutor"/> 
</int:service-activator> 

<task:executor id="reqExecutor" pool-size="25" queue-capacity="5" rejection-policy="CALLER_RUNS" /> 

После выполнения некоторых поиска в Интернете, вот мое понимание кода. Пожалуйста, исправьте меня, если я ошибаюсь:

Выходной канал сплиттера представляет собой очередь с пропускной способностью 25, что означает, что он выберет партию из 25 записей из запроса.

Теперь код, записанный в служебном активаторе, будет проверяться каждые 100 мс и отображает 25 сообщений из канала очереди. Активатор службы работает в многопоточной среде с исполнителем задач.

Задание задачи имеет размер пула 25. Это максимальное количество потоков, которые могут запускаться за раз. и объем очереди - 5. Если все потоки заняты, то Исполнитель отправит их в очередь. Если емкость очереди достигнута до 5, то исполнитель отклонит задачу.

Политика отклонения - CALLER_RUNS. Исполнитель будет использовать основной процесс при отказе.

Эффективность приложения может зависеть от политики отклонения CALLER_RUNS. Но другие политики отбрасывают или прерывают поток. Таким образом, изменение политики отклонения не является решением.

Должен ли я изменить размер пула или емкость очереди исполнителей задач для решения проблемы. Есть ли предпочтительное значение. Каким будет его влияние.

Или, я могу изменить фиксированную задержку опроллера?

EDIT 1:

В журналах, ниже линии многократно идет, и процесс не завершенности:

Received no Message during the poll, returning 'false' 

EDIT 2:

Является ли моя проблема связана с проблемой упоминается в ссылка ниже:

http://docs.spring.io/autorepo/docs/spring-integration/3.0.x/reference/html/messaging-endpoints-chapter.html#async-polling

Кроме того, я не совсем понимаю атрибут get-timeout poller.

+0

Изменение этих параметров просто сдвинет узкое место где-то в другом месте. Вы даже можете уйти, извлекая все записи. – Geek

+0

Итак, есть ли какие-либо рекомендации или рекомендации по решению таких проблем? –

+0

Вы можете попробовать несколько перестановок этой конфигурации, но дело в том, что проблема может быть за пределами этих конфигураций. Если вы не исправите настоящую проблему, может произойти дроссель для другого. – Geek

ответ

1

Он разрешен после некоторой оптимизации кода.Существовали две основные проблемы в коде:

  1. и хэш-код равен метод ключа в HashMap не перекрытая. Хэшмап использовался для кэширования запросов, но при отсутствии методов переопределения он работал неправильно.

  2. Был введен запрос на вставку в коде интеграции Spring, который вставлял записи около 20000 раз. Поскольку AFAIK мы не можем выполнять пакетное обновление в Spring Integration. Итак, я извлекаю запрос вставки в класс java и выполняю пакетное обновление.

Однако, мне интересно, почему дамп и дамп памяти не дали никакого намека на это?

Спасибо, Geek и Gary, что помогли мне.

2

receive-timeout - это срок, в течение которого поток опроса (reqExecutor) ждет в очереди канала для сообщения. Если он истекает после того, как сообщение не поступило, поток возвращается в пул.

Если сообщение прибывает, оно обрабатывается по резьбе и , затем нить возвращается в бассейн.

Если вы не можете это обработать из дампа нити, отправьте его где-нибудь (не здесь - вероятно, слишком большой) - пастебин или github gist.