2016-09-02 2 views
0

У меня есть конечная точка JAX-RS, который потребляет JSON полезной нагрузки:ActiveMQ: потреблять специфический поток



    @POST 
    @Consumes(MediaType.APPLICATION_JSON) 
    public Response postEnvelope(final InputStream is) { 
    ... 

Я хочу, чтобы поток этого JSON InputStream в ActiveMQ:



    ... 
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
    Destination destination = session.createQueue("envelopes"); 
    MessageProducer producer = session.createProducer(destination); 
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
    StreamMessage streamMessage = session.createStreamMessage(); 
    byte[] bytes = new byte[1024 * 100]; 
    for (int count; (count = is.read(bytes)) > 0;) { 
     streamMessage.clearBody(); 
     streamMessage.writeBytes(bytes, 0, count); 
     producer.send(streamMessage); 
    } 
    ... 

Затем я хочу другой поток для потребления потока ActiveMQ JSON и записи его в выходной поток:



    ... 
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
    Destination envelopesQueue = session.createQueue("envelopes"); 
    MessageConsumer consumer = session.createConsumer(envelopesQueue); 
    Message message = consumer.receive(); 
    if (message instanceof StreamMessage) { 
     do { 
      StreamMessage streamMessage = (StreamMessage) message; 
      byte[] bytes = new byte[1024 * 100]; 
      for (int count; (count = streamMessage.readBytes(bytes)) > 0;) { 
       out.write(bytes, 0, count); 
      } 
     } while ((message = consumer.receive(2500)) != null); 
    } 
    ... 

Мой вопрос: Как я могу убедиться, что мой пользователь ActiveMQ только получает сообщения, связанные с определенным потоком JSON? (т. е. если в то же самое время на конечную точку REST отправляются две полезной нагрузки JSON, то как я могу предотвратить их запись в один и тот же OutputStream).

+0

Не совсем понятно, что вы хотите. У вас есть производитель и потребитель с MQ, и у вас есть конечная точка JAX-RS. Скажем, у меня есть сообщение в JSON, и я отправляю его в конечную точку, будет ли он отправлен в очередь через производителя, а затем обработан потребителем? –

+0

Сообщения в очереди могут иметь свойства, такие как идентификатор корреляции, я думаю ... Если вы хотите отправлять сообщения с различными идентификаторами соответствия в разные выходные потоки, вы можете (хотя это, вероятно, вам не поможет?) Или вы говорите о mutex, чтобы гарантировать, что оба сообщения не будут записываться одновременно в один выходной поток? –

+0

@KoosGadellaa Чтобы ответить на ваш первый вопрос: Это именно то, что я пытаюсь сделать. – user1288617

ответ

0

Я собираюсь ответить на свой вопрос; Я больше не использую API StreamMessage. Вместо этого моя конечная точка POST сохраняет входной поток в файл, и я отправляю TextMessage, содержащий URL-адрес, где можно получить файл.

Смежные вопросы