2014-09-16 5 views
1

Я не могу правильно добавить атрибуты. Я использую AWS 1.7 Когда я добавляю их, они отображаются в теле сообщения, а не в атрибутах. Я вижу это, когда я вхожу в консоль AWS.Амазонки SQS Атрибуты сообщений

добавить сообщение атрибутов с этим кодом:

   Message awsMessage = new Message();   

    Map<String, MessageAttributeValue> messageAttributes = 
new HashMap<String, MessageAttributeValue>(); 

       messageAttributes.put("email", new MessageAttributeValue() 
       .withDataType("String") 
       .withStringValue(email)); 
       messageAttributes.put("data", new MessageAttributeValue() 
       .withDataType("String") 
       .withStringValue(newFileName)); 
       messageAttributes.put("template", new MessageAttributeValue() 
       .withDataType("String") 
       .withStringValue(filename)); 

       awsMessage.setMessageAttributes(messageAttributes); 

Я попытался использовать это, чтобы вытащить атрибуты:

List<Message> messages = SQSUtilityClass.getMessagesFromQueue(QUEUE_URL); 
    int size = messages.size(); 
    Map<String, String> attributes = new HashMap<String, String>(); 
    System.out.println("Size: "+size); 
    for(int x =0;x<size;x++){ 
     Message message = messages.get(x); 
     //System.out.println(message.getBody()); 
     attributes = message.getAttributes(); 
     for(String key: attributes.keySet()){ 
      System.out.println(key + " - "+ attributes.get(key)); 
     } 
    } 

Тем не менее, мои атрибуты находятся в теле сообщения при просмотре через AWS-консоли.

ответ

1

Вы можете использовать следующую библиотеку для производства в SQS.

<dependency> 
     <groupId>com.amazonaws</groupId> 
     <artifactId>amazon-sqs-java-messaging-lib</artifactId> 
     <version>${aws-messaging-lib-version}</version> 
     <type>jar</type> 
    </dependency> 

Вот пример кода для производства.

SendMessageRequest request = new SendMessageRequest(); 
    request.withMessageAttributes(mapMessageAttributes(messageAttributes)); 
    request.setMessageBody(message); 
    request.setQueueUrl(queueUrl); 
    sqsClient.sendMessage(request); 

где mapMessageAttributes (messageAttributes) возвращает карту и sqsClient является экземпляром AmazonSQSClient.

Как только атрибуты доставляются в SQS, я использую библиотеку JMS для использования, устанавливая их как Spring DefaultMessageListenerContainer.

 SQSConnectionFactory connectionFactory = 
       SQSConnectionFactory.builder() 
         .withRegion(AWSSQSUtils.getAWSRegion(region)) 
         .withAWSCredentialsProvider(new DefaultAWSCredentialsProviderChain()) 
         .withNumberOfMessagesToPrefetch(prefetchCount) 
         .withClientConfiguration((new ClientConfiguration()) 
           .withMaxConnections(connectionsCount) 
           .withProxyHost(proxyHost) 
           .withProxyPort(proxyPort)) 
         .build(); 

     LOG.info("Connection factory initialized.."); 

     // Create the connection. 
     SQSConnection connection = connectionFactory.createConnection(); 

     // Create the transacted session with UNORDERED_ACKNOWLEDGE mode. 
     Session session = connection.createSession(false, SQSSession.UNORDERED_ACKNOWLEDGE); 
     Queue immediateQueue = session.createQueue(safInputQueueImmediate); 
     Queue normalQueue = session.createQueue(safInputQueueNormal); 

     LOG.info("Connection established and session created.."); 

     //Create listeners 
     immediateListnerContainer = new 
       DefaultMessageListenerContainer(); 
     immediateListnerContainer.setConnectionFactory(connectionFactory); 
     immediateListnerContainer.setMaxConcurrentConsumers(consumerConcurrency); 
     immediateListnerContainer.setSessionAcknowledgeMode(SQSSession.UNORDERED_ACKNOWLEDGE); 
     immediateListnerContainer.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER); 
     immediateListnerContainer.setDestination(immediateQueue); 
     immediateListnerContainer.setMessageListener(
       new StoreAndForwardListener(jobService, persistentMessagingDao)); 
     immediateListnerContainer.setMaxMessagesPerTask(maxMessagesPerTask); 
     immediateListnerContainer.afterPropertiesSet(); 
     immediateListnerContainer.start(); 

Я использую программный подход против декларативной, поскольку SQS поддерживает неупорядоченный aknowledge, который не поддерживается Spring JMS еще.

В методе onMessage MessageListener вам будет передан объект javax.jms.Message, который может использоваться для получения всех атрибутов сообщения, как показано ниже.

Enumeration<String> headers = message.getPropertyNames(); 
while(headers.hasMoreElements()){ 
    attributeName = headers.nextElement(); 
    print(message.getStringProperty(attributeName)) 
} 

Надеюсь, что это поможет.

У меня есть билет, открытый с AWS, где некоторые атрибуты падают в пути. i.e установить M количество атрибутов, и только M-1 или M-2 атрибуты сделать его в SQS. Как только у меня будет обновление, я добавлю его здесь.

Смежные вопросы