2015-07-01 3 views
0

Мое приложение должно контролировать несколько очередей JMS.Как контролировать несколько очередей JMS

Как это сделать? Начать 2 темы? Можно ли контролировать две очереди одновременно?

Пример кода для одной очереди:

... 
queue1 = session.createQueue("queue-1"); 
consumer = session.createConsumer(queue1); 

connection.start(); 

while (true) { 
    Message m = consumer.receive(10000); 

    if (m == null) { 
     ...nothing... 
    } else { 
     ...do something with the message... 
    } 
} 
... 

Как я должен смотреть очереди 1 и 2 очереди?

+1

ли вы исследовать JMX API? –

+0

У меня есть базовое понимание API и реализации. Несколько клиентов в одной очереди, но не найти легко информацию о том, как я могу подключить один клиент к нескольким очередям. –

+1

Невозможно написать приемник очереди, который принимает имя очереди как параметр, затем в вашем 'main' (или эквивалентном методе) создайте два экземпляра с аргументами конструктора« queue-1 »и« queue-2 »соответственно. ? –

ответ

0

Это мое решение. Оно работает. Любые дополнительные рекомендации приветствуются!

Основной класс:

public class Notifier { 
    public static void main(String[] args) throws Exception { 
     // Start a thread for each JMQ queue to monitor. 
     DestinationThread destination1 = new DestinationThread("queue1"); 
     DestinationThread destination2 = new DestinationThread("queue2"); 
     destination1.start(); 
     destination2.start(); 
    } 
} 

Нить:

public class DestinationThread extends Thread { 

    private String destinationQueue; 

    private static ActiveMQConnectionFactory connectionFactory = null; 
    private static Connection connection = null; 
    private static Session session = null; 
    private static Destination destination = null; 
    private static MessageConsumer consumer = null; 

    public DestinationThread(String destinationQueue) { 
     this.destinationQueue = destinationQueue; 
    } 

    @Override 
    public void run() { 
     try { 
       initializeThread(destinationQueue); 
       startThread(destinationQueue); 
     } catch (Exception e) { 
      //TODO 
     } 
    } 

    private void initializeThread(String destinationQueue) { 
     boolean connectionMade = false; 
     while (!connectionMade) { 
      try { 
       connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); 
       connection = connectionFactory.createConnection(); 
       connection.start(); 
       session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 
       destination = session.createQueue(destinationQueue); 
       consumer = session.createConsumer(destination); 
       connectionMade = true; 
      } catch (JMSException e) { 
       //TODO 
       try { 
        Thread.sleep(30000); 
       } catch (InterruptedException ie) { 
       } 
      } 
     } 
    } 


    private void startThreadOther(String destinationQueue) throws Exception { 
     while (true) { 
      try { 
       Message message = consumer.receive(300000); 
       if (message == null) { 
        //No message received for 5 minutes - Re-initializing the connection 
        initializeThread(destinationQueue); 
       } else if (message instanceof TextMessage) { 
        if (destinationQueue.equals("queue1") { 
         //Message received from queue1 - do something with it 
        } else if (destinationQueue.equals("queue2") { 
         //Message received from queue2 - do something with it 
        } else { 
         //nothing 
        } 
       } else { 
        //nothing 
       } 
      } catch (Exception e) { 
       //TODO 
      } 
     } 
    } 
} 
0

Для этого можно использовать кварцевый планировщик Quartz Scheduler. Реализовать один (или больше) кварцевую работу (ы), как это:

public class MessageReaderJob1 implements Job { 
private QueueReader1 qr; 
@Override 
public synchronized void execute(JobExecutionContext arg0) throws JobExecutionException { 
    qr = QueueReader1.getInstance(); 

    try { 
     Message message = qr.getConsumer().receiveNoWait(); 
     .... 
    } 
} 

Затем понадобится планировщик, который вы будете запускать из приложения (основной метод или сервлета), обратите внимание, что вы можете реализовать другой триггер для вторая очередь также:

public class TestCasesSchedule { 

private Scheduler scheduler; 

public void createSchedule() { 
    JobDetail job1 = JobBuilder.newJob(MessageReaderJob1.class) 
      .withIdentity("jobname", Scheduler.DEFAULT_GROUP) 
      .build(); 

    JobDetail job2 = JobBuilder.newJob(MessageReaderJob2.class) 
      .withIdentity("jobname", Scheduler.DEFAULT_GROUP) 
      .build(); 

    Trigger trigger = TriggerBuilder.newTrigger() 
      .withIdentity("minutestrigger", "triggergroup") 
      .startNow() 
      .withSchedule(SimpleScheduleBuilder.simpleSchedule() 
        .withIntervalInMinutes(5) 
        .repeatForever()) 
      .build(); 

    try { 
     SchedulerFactory sf = new StdSchedulerFactory(); 
     scheduler = sf.getScheduler(); 
     scheduler.start(); 
     scheduler.scheduleJob(job1, trigger); 
     scheduler.scheduleJob(job2, trigger); 
    } catch (SchedulerException se) { 
     System.err.println(se.getMessage()) 
    } 
} 

QueueReader для одного из очереди была бы выглядеть следующим образом:

public class QueueReader1 { 

private MessageConsumer consumer = null; 
private Context jndiContext = null; 
private QueueConnectionFactory queueConnectionFactory = null; 
private QueueConnection queueConnection = null; 
private QueueSession queueSession = null; 
private Queue queue = null; 

private static final QueueReader instance = new QueueReader(); 

public synchronized static QueueReader getInstance() { 
    return instance; 
} 

private QueueReader() { 

    /* 
    * Create a JNDI API InitialContext object if none exists 
    * yet. 
    */ 
    try { 
     jndiContext = new InitialContext(); 


    } catch (NamingException e) { 
     System.err.println(e.getMessage()) 
     System.exit(1); 
    } 

    /* 
    * Look up connection factory and queue. If either does 
    * not exist, exit. 
    */ 
    try { 
     queueConnectionFactory = (QueueConnectionFactory) jndiContext.lookup("connection_factory_name"); 
     queue = (Queue) jndiContext.lookup("queue_name"); 
     queueConnection = 
       queueConnectionFactory.createQueueConnection(); 
     queueSession = 
       queueConnection.createQueueSession(false, 
       Session.AUTO_ACKNOWLEDGE); 

     consumer = queueSession.createConsumer(queue); 

     queueConnection.start(); 

    } catch (JMSException ex) { 
     System.err.println(ex.getMessage()); 
    } catch (NamingException e) { 
     System.err.println(e.getMessage()); 
    } 
} 

} 
Смежные вопросы