2015-05-06 4 views
0

Я использую ActiveMQServer в качестве брокера.Потребление одновременного сообщения в ActiveMQ

Server.java

import java.util.HashMap; 
import java.util.HashSet; 
import java.util.Map; 

import org.apache.activemq.api.core.TransportConfiguration; 
import org.apache.activemq.core.config.Configuration; 
import org.apache.activemq.core.config.impl.ConfigurationImpl; 
import org.apache.activemq.core.remoting.impl.netty.NettyAcceptorFactory; 
import org.apache.activemq.core.server.ActiveMQServer; 
import org.apache.activemq.core.server.ActiveMQServers; 

public class Server { 
    public static void main(final String arg[]) throws Exception 
    { 
     try 
     { 
      // Step 1. Create the Configuration, and set the properties accordingly 
      Configuration configuration = new ConfigurationImpl(); 
      //we only need this for the server lock file 
      configuration.setJournalDirectory("target/data/journal"); 
      configuration.setPersistenceEnabled(false); // http://activemq.apache.org/what-is-the-difference-between-persistent-and-non-persistent-delivery.html 
      configuration.setSecurityEnabled(false); // http://activemq.apache.org/security.html 
      /** 
      * this map with configuration values is not necessary (it configures the default values). 
      * If you want to modify it to run the example in two different hosts, remember to also 
      * modify the client's Connector at {@link EmbeddedRemoteExample}. 
      */ 
      Map<String, Object> map = new HashMap<String, Object>(); 
      map.put("host", "localhost"); 
      map.put("port", 61616); 

      // https://access.redhat.com/documentation/en-US/JBoss_Enterprise_Application_Platform/5/html/HornetQ_User_Guide/ch14s04.html 
      TransportConfiguration transpConf = new TransportConfiguration(NettyAcceptorFactory.class.getName(),map); 

      HashSet<TransportConfiguration> setTransp = new HashSet<TransportConfiguration>(); 
      setTransp.add(transpConf); 

      configuration.setAcceptorConfigurations(setTransp); // https://github.com/apache/activemq-6/blob/master/activemq-server/src/main/java/org/apache/activemq/spi/core/remoting/Acceptor.java 

      // Step 2. Create and start the server 
      ActiveMQServer server = ActiveMQServers.newActiveMQServer(configuration); 
      server.start(); 
     } 
     catch (Exception e) 
     { 
      e.printStackTrace(); 
      throw e; 
     } 
    } 
} 

и у меня есть Client.java класс, где я создаю очередь, продюсер сообщения и сообщения потребителя.

Client.java

import java.text.SimpleDateFormat; 
import java.util.Date; 
import java.util.HashMap; 
import java.util.Map; 

import org.apache.activemq.api.core.ActiveMQException; 
import org.apache.activemq.api.core.SimpleString; 
import org.apache.activemq.api.core.TransportConfiguration; 
import org.apache.activemq.api.core.client.ActiveMQClient; 
import org.apache.activemq.api.core.client.ClientConsumer; 
import org.apache.activemq.api.core.client.ClientMessage; 
import org.apache.activemq.api.core.client.ClientProducer; 
import org.apache.activemq.api.core.client.ClientSession; 
import org.apache.activemq.api.core.client.ClientSessionFactory; 
import org.apache.activemq.api.core.client.MessageHandler; 
import org.apache.activemq.api.core.client.ServerLocator; 
import org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory; 

public class Client 
{ 

    private static final String queueName = "queue.exampleQueue"; 
    private static final String propName = "myprop"; 

    ClientSessionFactory sf = null; 
    ClientSession session = null; 
    ClientProducer producer = null; 
    ClientMessage message = null; 
    ClientConsumer consumer = null; 
    String name; 

    public Client(String name){ 
     this.name = name; 
    } 

    public void initializeComponents(){ 

     try 
      {   
      // Step 3. As we are not using a JNDI environment we instantiate the objects directly 

      /** 
       * this map with configuration values is not necessary (it configures the default values). 
       * If you modify it to run the example in two different hosts, remember to also modify the 
       * server's Acceptor at {@link EmbeddedServer} 
       */ 
      Map<String,Object> map = new HashMap<String,Object>(); 
      map.put("host", "localhost"); 
      map.put("port", 61616); 
      // ------------------------------------------------------- 

      ServerLocator serverLocator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), map)); 
      sf = serverLocator.createSessionFactory(); 

      // Step 4. Create a core queue 
      ClientSession coreSession = sf.createSession(false, false, false); 

      if (!coreSession.queueQuery(new SimpleString(queueName)).isExists()) 
       coreSession.createTemporaryQueue(queueName, queueName); 

      coreSession.close(); 

      // Step 5. Create the session, and producer 
      session = sf.createSession(); 

      producer = session.createProducer(queueName); // 

      // Step 7. Create the message consumer and start the connection 
      consumer = session.createConsumer(queueName); 
      session.start(); 

      // Step 8. Receive the message. 
      consumer.setMessageHandler(new MessageHandler(){ 

       public void onMessage(ClientMessage message) 
       { 
        System.out.println("client " + name + " received message " + message.getStringProperty(propName)); 
       } 

      }); 

      } 
      catch (Exception e) 
      { 
      e.printStackTrace(); 
      } 

    } 

    public void sendMessage(String messageText){ 

     // Step 6. Create and send a message 
     message = session.createMessage(false); 
     message.putStringProperty(propName, messageText); 

     try { 
      System.out.println("Producer is going to send a message"); 
      producer.send(message); 
     } catch (ActiveMQException e) { 
      e.printStackTrace(); 
     } 
    } 

    public void cleanUpConnection(){ 
     if (sf != null) 
     { 
      sf.close(); 
     } 
    } 

} 

В этом главная создать одну очередь, двух производителей и двух потребителей.

 public static void main(final String[] args) 
    { 
     Client cl1 = new Client("cl1"); 
     cl1.initializeComponents(); 

     Client cl2 = new Client("cl2"); 
     cl2.initializeComponents(); 

     for (int i = 0; i < 10; i++){ 
      try { 

       Date date = new Date(); 
       String formattedDate = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss").format(date); 

       cl1.sendMessage(formattedDate + " number of iteration " + i); 
       Thread.sleep(2000); 

      } catch (InterruptedException e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 

    } 

     cl1.cleanUpConnection(); 
     cl2.cleanUpConnection(); 

    } 

и это выход основные:

Producer is going to send a message 
client cl1 received message 05/06/2015 16:56:22 number of iteration 0 
Producer is going to send a message 
client cl2 received message 05/06/2015 16:56:24 number of iteration 1 
Producer is going to send a message 
client cl1 received message 05/06/2015 16:56:26 number of iteration 2 
Producer is going to send a message 
client cl2 received message 05/06/2015 16:56:28 number of iteration 3 
Producer is going to send a message 
client cl1 received message 05/06/2015 16:56:30 number of iteration 4 
Producer is going to send a message 
client cl2 received message 05/06/2015 16:56:32 number of iteration 5 
Producer is going to send a message 
client cl1 received message 05/06/2015 16:56:34 number of iteration 6 
Producer is going to send a message 
client cl2 received message 05/06/2015 16:56:36 number of iteration 7 
Producer is going to send a message 
client cl1 received message 05/06/2015 16:56:38 number of iteration 8 
Producer is going to send a message 
client cl2 received message 05/06/2015 16:56:40 number of iteration 9 
client cl2 received message 05/06/2015 16:56:22 number of iteration 0 
client cl2 received message 05/06/2015 16:56:26 number of iteration 2 
client cl2 received message 05/06/2015 16:56:30 number of iteration 4 
client cl2 received message 05/06/2015 16:56:34 number of iteration 6 
client cl2 received message 05/06/2015 16:56:38 number of iteration 8 

Я хотел бы спросить, как я могу сделать одновременное потребление сообщения для всех потребителей.

Я имею в виду:

client cl1 received ... message of iteration 0 
client cl2 received ... message of iteration 0 
client cl1 received ... message of iteration 1 
client cl2 received ... message of iteration 1 

prefetch limit Я нашел настройки, но не знаю, как использовать ActiveMQConnectionFactory и ActiveMQConnection классов без рефакторинга Client.java класса. Существуют ли другие варианты для одновременного использования сообщений для всех потребителей?

+4

Вы отправляете только одно сообщение на итерацию, и только один клиент получит сообщение. Если вы хотите, чтобы каждый клиент получал одни и те же сообщения, используйте тему вместо очереди. –

+0

@krautmeyer Вы правы, можете опубликовать его как ответ. – Matt

ответ

1

Вы отправляете только одно сообщение на итерацию, и только один клиент получит сообщение. Если вы хотите, чтобы каждый клиент получал одни и те же сообщения, используйте тему вместо очереди.

Смежные вопросы