У меня есть конечная точка JAX-RS, который потребляет JSON полезной нагрузки:ActiveMQ: потреблять специфический поток
@POST
@Consumes(MediaType.APPLICATION_JSON)
public Response postEnvelope(final InputStream is) {
...
Я хочу, чтобы поток этого JSON InputStream в ActiveMQ:
...
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("envelopes");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
StreamMessage streamMessage = session.createStreamMessage();
byte[] bytes = new byte[1024 * 100];
for (int count; (count = is.read(bytes)) > 0;) {
streamMessage.clearBody();
streamMessage.writeBytes(bytes, 0, count);
producer.send(streamMessage);
}
...
Затем я хочу другой поток для потребления потока ActiveMQ JSON и записи его в выходной поток:
...
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination envelopesQueue = session.createQueue("envelopes");
MessageConsumer consumer = session.createConsumer(envelopesQueue);
Message message = consumer.receive();
if (message instanceof StreamMessage) {
do {
StreamMessage streamMessage = (StreamMessage) message;
byte[] bytes = new byte[1024 * 100];
for (int count; (count = streamMessage.readBytes(bytes)) > 0;) {
out.write(bytes, 0, count);
}
} while ((message = consumer.receive(2500)) != null);
}
...
Мой вопрос: Как я могу убедиться, что мой пользователь ActiveMQ только получает сообщения, связанные с определенным потоком JSON? (т. е. если в то же самое время на конечную точку REST отправляются две полезной нагрузки JSON, то как я могу предотвратить их запись в один и тот же OutputStream).
Не совсем понятно, что вы хотите. У вас есть производитель и потребитель с MQ, и у вас есть конечная точка JAX-RS. Скажем, у меня есть сообщение в JSON, и я отправляю его в конечную точку, будет ли он отправлен в очередь через производителя, а затем обработан потребителем? –
Сообщения в очереди могут иметь свойства, такие как идентификатор корреляции, я думаю ... Если вы хотите отправлять сообщения с различными идентификаторами соответствия в разные выходные потоки, вы можете (хотя это, вероятно, вам не поможет?) Или вы говорите о mutex, чтобы гарантировать, что оба сообщения не будут записываться одновременно в один выходной поток? –
@KoosGadellaa Чтобы ответить на ваш первый вопрос: Это именно то, что я пытаюсь сделать. – user1288617