2015-07-20 2 views
8

Я потребляю сообщения из очереди амазонок SQS. У меня есть тысячи сообщений в очереди. Когда я запускаю приложение (написанное на Java с Spring Framework), он начинает опрос сообщений из очереди и после получения 500 сообщений останавливается. Если я снова запустил приложение, он будет потреблять еще 500 сообщений.Amazon SQS java sdk останавливается после потребления 500 сообщений

Мой код похож ...

Подключение завода

@Bean 
public DefaultJmsListenerContainerFactory jmsListenerContainerFactoryActiveMQ() { 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 
    factory.setConnectionFactory(connectionFactory()); 
    factory.setConcurrency("3-15"); 
    factory.setReceiveTimeout(3000L); 
    return factory; 
} 

@Bean(name = "sqsJmsListenerContainerFactory") 
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(CustomDestinationResolver resolver) { 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 
    factory.setConnectionFactory(sqsConnectionFactory()); 
    factory.setConcurrency("3-15"); 
    factory.setReceiveTimeout(3000L); 
    return factory; 
} 

Слушатель

@JmsListener(containerFactory = "sqsJmsListenerContainerFactory", destination = "sqs.queue") 
public void onMessage(Message message) { 
    //Processing message 
} 

Что-то мне нужно настроить в очереди амазонки или в связи с заводским бобом ?
Спасибо :-)

Обновлены: Добавлена ​​нить свалка

Хотя Приложение потребляющие сообщения
DefaultMessageListenerContainer в потоке свалки как

"[email protected]" prio=5 tid=0x18 nid=NA runnable 
    java.lang.Thread.State: RUNNABLE 
     at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-1) 
     at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
     at java.net.SocketInputStream.read(SocketInputStream.java:170) 
     at java.net.SocketInputStream.read(SocketInputStream.java:141) 
     at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) 
     at sun.security.ssl.InputRecord.read(InputRecord.java:503) 
     at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961) 
     - locked <0x2230> (a java.lang.Object) 
     at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918) 
     at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) 
     - locked <0x2231> (a sun.security.ssl.AppInputStream) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160) 
     at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273) 
     at org.apache.http.impl.conn.LoggingSessionInputBuffer.readLine(LoggingSessionInputBuffer.java:116) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57) 
     at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260) 
     at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283) 
     at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251) 
     at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:197) 
     at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271) 
     at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66) 
     at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123) 
     at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:682) 
     at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:486) 
     at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) 
     at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:685) 
     at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:460) 
     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:295) 
     at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2291) 
     at com.amazonaws.services.sqs.AmazonSQSClient.deleteMessage(AmazonSQSClient.java:1340) 
     at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.deleteMessage(AmazonSQSMessagingClientWrapper.java:127) 
     at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.acknowledge(AutoAcknowledger.java:33) 
     at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.notifyMessageReceived(AutoAcknowledger.java:42) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.messageHandler(SQSMessageConsumerPrefetch.java:477) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.receive(SQSMessageConsumerPrefetch.java:410) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumer.receive(SQSMessageConsumer.java:157) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:413) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:293) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:246) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1144) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1136) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1033) 
     at java.lang.Thread.run(Thread.java:745) 


ConsumerPrefetchThread в потоке свалки подобные

"[email protected]" daemon prio=5 tid=0x1b nid=NA runnable 
    java.lang.Thread.State: RUNNABLE 
     at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-1) 
     at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
     at java.net.SocketInputStream.read(SocketInputStream.java:170) 
     at java.net.SocketInputStream.read(SocketInputStream.java:141) 
     at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) 
     at sun.security.ssl.InputRecord.read(InputRecord.java:503) 
     at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961) 
     - locked <0x23a7> (a java.lang.Object) 
     at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918) 
     at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) 
     - locked <0x23a8> (a sun.security.ssl.AppInputStream) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160) 
     at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273) 
     at org.apache.http.impl.conn.LoggingSessionInputBuffer.readLine(LoggingSessionInputBuffer.java:116) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57) 
     at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260) 
     at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283) 
     at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251) 
     at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:197) 
     at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271) 
     at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66) 
     at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123) 
     at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:682) 
     at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:486) 
     at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) 
     at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:685) 
     at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:460) 
     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:295) 
     at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2291) 
     at com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1021) 
     at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.receiveMessage(AmazonSQSMessagingClientWrapper.java:319) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessages(SQSMessageConsumerPrefetch.java:216) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:180) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

Когда приложение перестает потреблять сообщения
ConsumerPrefetchThread в потоке отвала, как

"[email protected]" prio=5 tid=0x18 nid=NA waiting 
    java.lang.Thread.State: WAITING 
     at java.lang.Object.wait(Object.java:-1) 
     at java.lang.Object.wait(Object.java:502) 
     at org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1151) 
     at org.apache.activemq.jms.pool.ConnectionPool.createSession(ConnectionPool.java:133) 
     at org.apache.activemq.jms.pool.PooledConnection.createSession(PooledConnection.java:167) 
     at com.ac.jms.senders.AbstractNoResponseSender.request(AbstractNoResponseSender.java:40) 
     at com.ac.mic.listener.AbstractMicQueueListener.onMessage(AbstractMicQueueListener.java:117) 
     at com.ac.mic.listener.MicQueueListener.onMessage(MicQueueListener.java:40) 
     at sun.reflect.GeneratedMethodAccessor240.invoke(Unknown Source:-1) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:497) 
     at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:185) 
     at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:104) 
     at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:90) 
     at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:66) 
     at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:674) 
     at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:634) 
     at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:605) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:308) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:246) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1144) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1136) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1033) 
     at java.lang.Thread.run(Thread.java:745) 


ConsumerPrefetchThread в потоке отвала, как

"[email protected]" daemon prio=5 tid=0x1b nid=NA waiting 
    java.lang.Thread.State: WAITING 
     at java.lang.Object.wait(Object.java:-1) 
     at java.lang.Object.wait(Object.java:502) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.waitForPrefetch(SQSMessageConsumerPrefetch.java:273) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:174) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
+0

Попробуйте взять дамп потока, когда ваш потребитель остановится, чтобы посмотреть, что делают потоки контейнера. Может быть, он застрял в вашем коде? –

+0

@GaryRussell Я добавил поток дампа. В этом приложении я также использую ActiveMQ. Я нажимаю сообщения, которые получаю от SQS до очередей ActiveMQ. – Piyush

ответ

3

Похоже, какой-то бассейн истощения в вашей код ...

at org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1151) 
    at org.apache.activemq.jms.pool.ConnectionPool.createSession(ConnectionPool.java:133) 
    at org.apache.activemq.jms.pool.PooledConnection.createSession(PooledConnection.java:167) 
    at com.ac.jms.senders.AbstractNoResponseSender.request(AbstractNoResponseSender.java:40) 
    at com.ac.mic.listener.AbstractMicQueueListener.onMessage(AbstractMicQueueListener.java:117) 
    at com.ac.mic.listener.MicQueueListener.onMessage(MicQueueListener.java:40) 

Контейнерная резьба застревает, пытаясь получить сеанс от PooledConnection.

Возможно, вы не возвращаете сеансы в бассейн?

Рассмотрите возможность использования JmsTemplate вместо собственного кода, чтобы поговорить с JMS. Это позволяет избежать таких проблем.

+1

Я исправил эту проблему. После отправки сообщения я не закрывал сеанс и соединение. Я научился использовать дамп потока. Спасибо @GaryRussell :) – Piyush

Смежные вопросы