Я использую ActiveMQServer в качестве брокера.Потребление одновременного сообщения в ActiveMQ
Server.java
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.core.config.Configuration;
import org.apache.activemq.core.config.impl.ConfigurationImpl;
import org.apache.activemq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.ActiveMQServers;
public class Server {
public static void main(final String arg[]) throws Exception
{
try
{
// Step 1. Create the Configuration, and set the properties accordingly
Configuration configuration = new ConfigurationImpl();
//we only need this for the server lock file
configuration.setJournalDirectory("target/data/journal");
configuration.setPersistenceEnabled(false); // http://activemq.apache.org/what-is-the-difference-between-persistent-and-non-persistent-delivery.html
configuration.setSecurityEnabled(false); // http://activemq.apache.org/security.html
/**
* this map with configuration values is not necessary (it configures the default values).
* If you want to modify it to run the example in two different hosts, remember to also
* modify the client's Connector at {@link EmbeddedRemoteExample}.
*/
Map<String, Object> map = new HashMap<String, Object>();
map.put("host", "localhost");
map.put("port", 61616);
// https://access.redhat.com/documentation/en-US/JBoss_Enterprise_Application_Platform/5/html/HornetQ_User_Guide/ch14s04.html
TransportConfiguration transpConf = new TransportConfiguration(NettyAcceptorFactory.class.getName(),map);
HashSet<TransportConfiguration> setTransp = new HashSet<TransportConfiguration>();
setTransp.add(transpConf);
configuration.setAcceptorConfigurations(setTransp); // https://github.com/apache/activemq-6/blob/master/activemq-server/src/main/java/org/apache/activemq/spi/core/remoting/Acceptor.java
// Step 2. Create and start the server
ActiveMQServer server = ActiveMQServers.newActiveMQServer(configuration);
server.start();
}
catch (Exception e)
{
e.printStackTrace();
throw e;
}
}
}
и у меня есть Client.java класс, где я создаю очередь, продюсер сообщения и сообщения потребителя.
Client.java
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.api.core.ActiveMQException;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.api.core.client.ClientConsumer;
import org.apache.activemq.api.core.client.ClientMessage;
import org.apache.activemq.api.core.client.ClientProducer;
import org.apache.activemq.api.core.client.ClientSession;
import org.apache.activemq.api.core.client.ClientSessionFactory;
import org.apache.activemq.api.core.client.MessageHandler;
import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory;
public class Client
{
private static final String queueName = "queue.exampleQueue";
private static final String propName = "myprop";
ClientSessionFactory sf = null;
ClientSession session = null;
ClientProducer producer = null;
ClientMessage message = null;
ClientConsumer consumer = null;
String name;
public Client(String name){
this.name = name;
}
public void initializeComponents(){
try
{
// Step 3. As we are not using a JNDI environment we instantiate the objects directly
/**
* this map with configuration values is not necessary (it configures the default values).
* If you modify it to run the example in two different hosts, remember to also modify the
* server's Acceptor at {@link EmbeddedServer}
*/
Map<String,Object> map = new HashMap<String,Object>();
map.put("host", "localhost");
map.put("port", 61616);
// -------------------------------------------------------
ServerLocator serverLocator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), map));
sf = serverLocator.createSessionFactory();
// Step 4. Create a core queue
ClientSession coreSession = sf.createSession(false, false, false);
if (!coreSession.queueQuery(new SimpleString(queueName)).isExists())
coreSession.createTemporaryQueue(queueName, queueName);
coreSession.close();
// Step 5. Create the session, and producer
session = sf.createSession();
producer = session.createProducer(queueName); //
// Step 7. Create the message consumer and start the connection
consumer = session.createConsumer(queueName);
session.start();
// Step 8. Receive the message.
consumer.setMessageHandler(new MessageHandler(){
public void onMessage(ClientMessage message)
{
System.out.println("client " + name + " received message " + message.getStringProperty(propName));
}
});
}
catch (Exception e)
{
e.printStackTrace();
}
}
public void sendMessage(String messageText){
// Step 6. Create and send a message
message = session.createMessage(false);
message.putStringProperty(propName, messageText);
try {
System.out.println("Producer is going to send a message");
producer.send(message);
} catch (ActiveMQException e) {
e.printStackTrace();
}
}
public void cleanUpConnection(){
if (sf != null)
{
sf.close();
}
}
}
В этом главная создать одну очередь, двух производителей и двух потребителей.
public static void main(final String[] args)
{
Client cl1 = new Client("cl1");
cl1.initializeComponents();
Client cl2 = new Client("cl2");
cl2.initializeComponents();
for (int i = 0; i < 10; i++){
try {
Date date = new Date();
String formattedDate = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss").format(date);
cl1.sendMessage(formattedDate + " number of iteration " + i);
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
cl1.cleanUpConnection();
cl2.cleanUpConnection();
}
и это выход основные:
Producer is going to send a message
client cl1 received message 05/06/2015 16:56:22 number of iteration 0
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:24 number of iteration 1
Producer is going to send a message
client cl1 received message 05/06/2015 16:56:26 number of iteration 2
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:28 number of iteration 3
Producer is going to send a message
client cl1 received message 05/06/2015 16:56:30 number of iteration 4
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:32 number of iteration 5
Producer is going to send a message
client cl1 received message 05/06/2015 16:56:34 number of iteration 6
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:36 number of iteration 7
Producer is going to send a message
client cl1 received message 05/06/2015 16:56:38 number of iteration 8
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:40 number of iteration 9
client cl2 received message 05/06/2015 16:56:22 number of iteration 0
client cl2 received message 05/06/2015 16:56:26 number of iteration 2
client cl2 received message 05/06/2015 16:56:30 number of iteration 4
client cl2 received message 05/06/2015 16:56:34 number of iteration 6
client cl2 received message 05/06/2015 16:56:38 number of iteration 8
Я хотел бы спросить, как я могу сделать одновременное потребление сообщения для всех потребителей.
Я имею в виду:
client cl1 received ... message of iteration 0
client cl2 received ... message of iteration 0
client cl1 received ... message of iteration 1
client cl2 received ... message of iteration 1
prefetch limit Я нашел настройки, но не знаю, как использовать ActiveMQConnectionFactory и ActiveMQConnection классов без рефакторинга Client.java класса. Существуют ли другие варианты для одновременного использования сообщений для всех потребителей?
Вы отправляете только одно сообщение на итерацию, и только один клиент получит сообщение. Если вы хотите, чтобы каждый клиент получал одни и те же сообщения, используйте тему вместо очереди. –
@krautmeyer Вы правы, можете опубликовать его как ответ. – Matt