2016-02-27 4 views
0

Я написал образец кода для добавления элементов в activemq, а затем их извлечение. Я успешно смог добавить около 1000 элементов, но при извлечении элементов код некогда застревает после извлечения около 50 - 200 элементов, даже если в очереди много элементов.activemq потребитель не возвращает данные, даже если очередь не пуста

Ниже приводится код, который я использовал для добавления элементов в очереди

@POST 
@Path("/addelementtoqueue") 
@Consumes(MediaType.APPLICATION_FORM_URLENCODED) 
public void addElementToQeueue(@FormParam("count") int count) throws Exception { 
    IntStream.range(0, count) 
     .forEach(e -> { 
      try { 
       addElement(e); 
      }catch(Exception e1) { 
       throw new RuntimeException(e1); 
      } 
     }); 
} 

private void addElement(int i) throws Exception { 
    Connection conn = GlobalConfiguration.getJMSConnectionFactory().createConnection(); 
    conn.start(); 
    Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
    MessageProducer prod = session.createProducer(queue); 
    prod.send(queue, session.createTextMessage("message "+ i), DeliveryMode.PERSISTENT, 4, 0); 
    prod.close(); 
    session.close(); 
    conn.close(); 
} 

и это фрагмент кода, я использую для извлечения элементов из очереди

@POST 
@Path("/removeelementfromqueue") 
@Consumes(MediaType.APPLICATION_FORM_URLENCODED) 
public void removeElementToQeueue(@FormParam("count") int count) throws Exception { 
    IntStream.range(0, count) 
     .forEach(e -> { 
      try { 
       extractElement(); 
      }catch(Exception e1) { 
       throw new RuntimeException(e1); 
      } 
     }); 
} 

private void extractElement() throws Exception { 
    Connection conn = GlobalConfiguration.getJMSConnectionFactory().createConnection(); 
    conn.start(); 
    Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
    queue = session.createQueue("walkin.testing"); 
    MessageConsumer consumer = session.createConsumer(queue); 
    TextMessage msg = (TextMessage)consumer.receive(); 
    System.out.println(msg.getText()); 
    msg.acknowledge(); 
    consumer.close(); 
    session.close(); 
    conn.close(); 
} 

Я получаю фабрику соединений через resource.xml, фрагмент для него:

<resources>  
<Resource id="MyJmsResourceAdapter" type="ActiveMQResourceAdapter"> 
    BrokerXmlConfig = jdbcBroker:(tcp://0.0.0.0:61616) 
    ServerUrl  = tcp://0.0.0.0:61616?jms.prefetchPolicy.queuePrefetch=0 
</Resource> 

<Resource id="MyJmsConnectionFactory" type="javax.jms.ConnectionFactory"> 
    ResourceAdapter = MyJmsResourceAdapter 
</Resource></resources> 

Я использую activeMQ 5.13.1, с apache-tomee-plus-1.7.2 и Java 8, jdbc хранится как mysql. Я настроил activemq-jdbc-performance.xml в качестве файла конфигурации для apache activemq.

Я пробовал много исследований по этому вопросу, но я не могу определить основную причину этой проблемы. Было бы очень полезно, если кто-нибудь может предложить мне, что я делаю неправильно

ответ

0

Я рекомендую против открытия/закрытия соединений/сеансов/очередей для каждой операции, но вместо этого можно использовать пул, чтобы свести к минимуму, сколько из каждого ресурса необходимо. Довольно уверен, что соединения являются потокобезопасными, но сеансов нет, и вам нужно создать/использовать/назначить сеанс для каждого активного потока. Путем объединения вы можете свести к минимуму количество сеансов, созданных для того, чтобы в настоящее время выполняются активные потоки и повторно использовать их позже.

Так что я предполагаю, что у вас проблема с ресурсами, хотя, кажется, что все закрывается/освобождается правильно, что-то не так (возможно, вне кода, который я вижу здесь). Вы проверили журналы activemq? Вы отлаживали это и следили за тем, чтобы он не висел при попытке создания n-го соединения или сеанса?

Смежные вопросы