2009-10-29 2 views
7

Мне нужно динамически создавать очереди асинхронных сообщений в Java. Мой прецедент отправляет электронную почту через несколько SMTP-серверов: мне нужно обеспечить, чтобы электронные письма на том же SMTP-сервере выполнялись последовательно, но электронные письма на разные SMTP-серверы могут обрабатываться одновременно. Я использовал JMS в прошлом, но, насколько я вижу, он разрешает создание очереди времени компиляции, тогда как мне нужно создавать очереди во время выполнения (одна очередь для каждого SMTP-сервера).Динамическое создание асинхронных очередей сообщений в Java

Я пропустил что-то относительно JMS или есть какой-то другой инструмент/предложение, на которое я должен был бы взглянуть?

+0

Вы используете JMS специально или это то, что вы можете использовать java.util.concurrent и его ExecutorServices? –

+0

Я не использую JMS специально, поэтому я посмотрю на ExecutorServices, спасибо. – Zecrates

ответ

6

Я согласен с Адамом, прецедент звучит, как JMS, над головой. Java встроенная функциональность достаточна:

package de.mhaller; 

import java.util.ArrayDeque; 
import java.util.ArrayList; 
import java.util.Deque; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Queue; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.LinkedBlockingDeque; 

import org.junit.Assert; 
import org.junit.Test; 

public class Mailer { 

    @Test 
    public void testMailer() throws Exception { 
     ExecutorService executor = Executors.newCachedThreadPool(); 
     ArrayList<Mail> log = new ArrayList<Mail>(); 
     LinkedBlockingDeque<Mail> incoming = new LinkedBlockingDeque<Mail>(); 

     // TODO: Put mails to be sent into the incoming queue 
     incoming.offer(new Mail("[email protected]", "localhost")); 
     incoming.offer(new Mail("[email protected]", "otherhost")); 
     incoming.offer(new Mail("[email protected]", "otherhost")); 
     incoming.offer(new Mail("[email protected]", "localhost")); 

     Map<Mailserver, Queue<Mail>> queues = new HashMap<Mailserver, Queue<Mail>>(); 
     while (!incoming.isEmpty()) { 
      Mail mail = incoming.pollFirst(); 
      Mailserver mailserver = findMailserver(mail); 
      if (!queues.containsKey(mailserver)) { 
       ArrayDeque<Mail> serverQueue = new ArrayDeque<Mail>(); 
       queues.put(mailserver, serverQueue); 
       executor.execute(new SendMail(mailserver, serverQueue)); 
      } 
      Queue<Mail> slot = queues.get(mailserver); 
      slot.offer(mail); 
     } 

     assertMailSentWithCorrectServer(log); 
    } 

    private void assertMailSentWithCorrectServer(ArrayList<Mail> log) { 
     for (Mail mail : log) { 
      if (!mail.server.equals(mail.sentBy.mailserver)) { 
       Assert.fail("Mail sent by wrong server: " + mail); 
      } 
     } 
    } 

    private Mailserver findMailserver(Mail mail) { 
     // TODO: Your lookup logic which server to use 
     return new Mailserver(mail.server); 
    } 

    private static class Mail { 
     String recipient; 
     String server; 
     SendMail sentBy; 

     public Mail(String recipient, String server) { 
      this.recipient = recipient; 
      this.server = server; 
     } 

     @Override 
     public String toString() { 
      return "mail for " + recipient; 
     } 
    } 

    public static class SendMail implements Runnable { 

     private final Deque<Mail> queue; 
     private final Mailserver mailserver; 

     public SendMail(Mailserver mailserver, Deque<Mail> queue) { 
      this.mailserver = mailserver; 
      this.queue = queue; 
     } 

     @Override 
     public void run() { 
      while (!queue.isEmpty()) { 
       Mail mail = queue.pollFirst(); 
       // TODO: Use SMTP to send the mail via mailserver 
       System.out.println(this + " sent " + mail + " via " + mailserver); 
       mail.sentBy = this; 
      } 
     } 

    } 

    public static class Mailserver { 
     String hostname; 

     public Mailserver(String hostname) { 
      this.hostname = hostname; 
     } 

     @Override 
     public String toString() { 
      return hostname; 
     } 

     @Override 
     public int hashCode() { 
      return hostname.hashCode(); 
     } 

     @Override 
     public boolean equals(Object obj) { 
      return hostname.equals(((Mailserver) obj).hostname); 
     } 

    } 

} 
1

JMS сама по себе как спецификация довольно молчала по этому вопросу. Большинство реализаций позволяют вам это делать, просто не через JMS, а используя собственный API. Но вы не сможете подключить что-то формальное, как MDB, к динамической очереди. Скорее вам нужно будет управлять своими собственными подключениями и слушателями.

1

В прошлый раз, когда мы смотрели на это в среде WebSphere, было неожиданно сложно/невозможно создать очереди динамически (временные очереди слишком преходящи для вас, я думаю). Хотя API для создания очередей существовали, они требовали перезапуска сервера после этого, чтобы стать активными. Затем есть проблема с MDB.

Как насчет грязной обходности, основанной на пословице, что все проблемы могут быть решены с помощью дополнительного уровня косвенности, предполагающего, что набор доступных принтеров сравнительно мал.

Создайте очереди Printer01 на Printer99 (или некоторое меньшее число). Имейте «базу данных», которая отображает очереди на реальные принтеры. По мере поступления запросов на принтеры вы можете добавить в таблицу сопоставления. Возможно, у вас есть некоторые накладные расходы на MDB, которые смотрят на очереди, которые никогда не будут использоваться, но если ваше количество операций с большим количеством принтеров велико, возможно, вы можете себе это позволить?

0

Создание очереди для каждого из ваших SMTP разорвать и предельную очередь потребителя (MDB или слушающее сообщение) на 1

0

Я сделал это с ActiveMQ - я на самом деле отправил вопрос об этом в то время, как У меня были схожие проблемы (в документации JMS в то время указывалось, что это не было поддержано) и был уверен, что она была поддержана.

+0

У вас есть ссылка на ваш вопрос или документацию, описывающую, как этого достичь? – Zecrates

Смежные вопросы