2016-12-15 2 views
2

Нам нужно использовать очереди в нашем приложении Java EE, и поскольку это приложение для облачных вычислений (развернуто на OpenShift Online), нам нравится использовать амазонные sqs.с использованием амазонок sqs в компоненте @MessageDriven - объединение/параллельная обработка

Если я правильно понимаю теоретику принимающей части JMS/Java EE, то bean-компонент управляется контейнером Java EE, так что множество экземпляров bean создаются параллельно (в соответствии с максимальным размером пула), если количество входящих сообщений велико. Это, конечно, большое преимущество для обработки высоких нагрузок.

Однако я не вижу, как мы можем интегрировать aws sqs таким образом в приложении Java EE. Я знаю, асинхронные примеры ресивера http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-java-message-service-jms-client.html:

class MyListener implements MessageListener { 

    @Override 
    public void onMessage(Message message) { 
     try { 
      // Cast the received message as TextMessage and print the text to screen. 
      if (message != null) { 
       System.out.println("Received: " + ((TextMessage) message).getText()); 
      } 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

, а затем:

// Create a consumer for the 'TestQueue'. 
MessageConsumer consumer = session.createConsumer(queue); 

// Instantiate and set the message listener for the consumer. 
consumer.setMessageListener(new MyListener()); 

// Start receiving incoming messages. 
connection.start(); 

Это официальный асинхронный пример приемника - который не является @MessageDriven боб. Очевидно, что нам нужно где-то учетные данные для аутентификации (путем создания SQSConnectionFactory, затем соединения, а затем сеанса, который также хорошо описан в примере).
Но я полагаю, что этот пример не будет обрабатывать сообщения параллельно - то есть только один экземпляр компонента обрабатывает очередь, и это не является хорошим решением для масштабируемых высоконагруженных приложений.

a) Как мы можем перейти к реальному пути Java EE с помощью Amazon SQS? Я просто нахожу плантацию весенних примеров. Но это должен быть Java EE 7.

b) Мы используем Wildfly (в настоящее время 8.2.1). Можно ли также позволить Wildfly управлять соединением с AWS и приложением внутри, мы могли бы использовать очередь, как если бы это была управляемая очередь сервера приложений (такой же подход, как источники данных для доступа к БД)?

Заключение после того, как получил ответ от stdunbar:
Это кажется не возможным в «правильном пути», что мне нравится делать. И что же мне делать? Внесите ManagedExecutorService в качестве stdunbar, описанный для «обертывания» очереди? - Однако это подразумевает наличие локальной очереди, и это не очень хорошая ситуация для приложения, которое должно быть масштабируемым ?! Что такое альтернативы? Мы запускаем приложение в OpenShift Online. Вероятно, было бы целесообразно создать экземпляр собственной передачи, например. ApacheMQ Cartridge ... есть, конечно, много недостатков, таких как затраты, и что мы несем ответственность за «инфраструктуру».

Честно говоря, я очень разочарован АМС в этом случае ...

ответ

2

Я не думаю, что мое решение является правильным JAVA EE, но в моем случае это работает.

Конфигурация:

@Singleton 
public class SqsMessageManager 
{ 
    private Integer numberOfReceivers = 3; 

    public static SQSConnection connection = null; 
    public static Queue queue = null; 

    @Inject 
    SqsMessageReceiver sqsMessageReceiver; 

    public void init() 
    { 
     try 
     { 
      SQSConnectionFactory connectionFactory = 
        SQSConnectionFactory.builder() 
          .withRegion(Region.getRegion(Regions.EU_WEST_1)) 
          .withAWSCredentialsProvider(new EnvironmentVariableCredentialsProvider()) 
          .build(); 

      connection = connectionFactory.createConnection(); 

      queue = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createQueue("myQueue"); 

      for (int i = 0; i < numberOfReceivers; i++) 
       connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(queue).setMessageListener(sqsMessageReceiver); 

      connection.start(); 
     } 
     catch (JMSException e) 
     { 
      e.getStackTrace(); 
     } 
    } 
} 

Затем отправитель:

@Dependent 
public class SqsMessageSender 
{ 
    MessageProducer producer = null; 
    Session senderSession = null; 

    @PostConstruct 
    public void createProducer(){ 
     try 
     { 
      // open new session and message producer 
      senderSession = SqsMessageManager.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
      producer = senderSession.createProducer(SqsMessageManager.queue); 
     }catch(JMSException | NullPointerException e){ 
      ; 
     } 
    } 

    @PreDestroy 
    public void destroy(){ 
     try 
     { 
      // close session 
      producer.close(); 
      senderSession.close(); 
     }catch(JMSException e){ 

     } 
    } 

    // sends a message to aws sqs queue 
    public void sendMessage(String txt) 
    { 
     try 
     { 
      TextMessage textMessage = senderSession.createTextMessage(txt); 
      producer.send(textMessage); 
     } 
     catch (JMSException e) 
     { 
      e.getStackTrace(); 
     } 
    } 
} 

И приемник:

@Dependent 
public class SqsMessageReceiver implements MessageListener 
{ 
    public void onMessage(Message inMessage) { 
     ... 
    } 
} 
+0

Я точно не вижу, как работает ваш ** numberOfReceivers **. Вы создаете несколько слушателей на одном объекте (вы вводите 'sqsMessageReceiver', который на самом деле является одним экземпляром)? – badera

3

По некоторому старшему docs I found

Контейнера позволяет много случаев, когда управляемые сообщениями класса бин быть одновременно, обеспечивая одновременную обработку потока сообщений.

С помощью интеграции Amazon JMS, в сочетании с декларативной MDB, вы должны быть хорошо идти. Я бы не использовал интерфейс setMessageListener. Вы можете использовать декларативную версию JMS, так как вы находитесь на Wildfly 8.x/EE7:

@MessageDriven(activationConfig = { /* your config - i.e. queue name, etc */ }) 
public class MyListener implements MessageListener { 
    @Override 
    public void onMessage(Message message) { 
    } 
} 

Это позволяет контейнеру создавать столько случаев, сколько необходимо. Обратите внимание, что, возможно, потребуется некоторая настройка, требуемая в Wildfly для параметров JMS.

В качестве примечания, пусть библиотеки Amazon позаботятся о чтении очереди SQS. Я начал кататься с моим собственным читателем, думая, что я мог бы натолкнуть его. Но я обнаружил, что вы не можете использовать библиотеки AWS Java с несколькими потоками, читающими из очереди, поскольку вы будете получать повторяющиеся чтения почти каждый раз. У меня было 4 потока, читающих очередь SQS и получивших 4 одинаковых сообщения. Я, наконец, перешел на одного читателя, положив сообщение в LinkedBlockingDeque, которое будет потребляться рядом других потоков.

Все, что я показал, это чистая Java/EE.

EDIT
После игры вокруг с интеграцией Amazon SQS/JMS некоторым я чувствую, что вы тратите свое время, если вы его используете. Это только для JMS 1.1, поэтому он использует старый синтаксис JMS с включенным JNDI. Кроме того, он работает только для очередей, а не для тем.

Я настоятельно рекомендую создать собственную реализацию. ManagedExecutorService, который запускает поток чтения в очереди с коротким тайм-аутом чтения SQS. Каждый цикл будет считываться из очереди SQS и помещать сообщения в очередь JMS или тему.

Извините, что у вас есть надежда на это - Amazon просто не поддержал его достаточно, чтобы быть полезным.

+0

Спасибо, stdunbar. Я рад слышать, что это должно сработать, однако я не знаю, как привести необходимые параметры (учетные данные) в игру. Где я могу разместить учетные данные AWS? И как мне достичь вашей заметки? Как примечание, пусть библиотеки Amazon позаботятся о чтении очереди SQS? - Я думаю, что аннотация с помощью '@ MessageDriven' заставит сервер приложений читать очередь ?! - Я ценю, если бы вы могли быть немного более конкретными в том, как интегрировать AWS SQS в ваш код. – badera

+0

Я подозреваю, что вы правы. Это не хорошая новость. Если я правильно вас понимаю, это просто SDK, который «не обновляется», а не сама услуга? - Я уточнил вопрос с выводом и еще одним вопросом. – badera

+0

Вопрос, отмеченный буквой b), остается без ответа. В надежде получить, вероятно, еще несколько идей, что делать, я начинаю щедрость ... спасибо в любом случае до сих пор [+1]! – badera

0

Payara Cloud Connectors, кажется, довольно новый, но выглядит многообещающим. Не знаю, работает ли это с другими контейнерами. Насколько я понимаю, он основан на адаптерах JCA.

Смежные вопросы