2011-12-22 2 views
2

Я пытаюсь помещать постоянные сообщения в очередь WebSphere MQ, однако они должны быть асинхронными. Единственный способ, с помощью которого я могу работать async, - это если сообщения не являются постоянными (я имею в виду, что putSuccessCount равно числу сообщений, помещенных на MQAsyncStatus, а все остальные времена равны нулю). Приведенный ниже код описывает то, что я пытаюсь сделать:Поместить сообщение async в очередь WebSphere MQ

import com.ibm.mq.MQAsyncStatus; 
import com.ibm.mq.MQDestination; 
import com.ibm.mq.MQMessage; 
import com.ibm.mq.MQPutMessageOptions; 
import com.ibm.mq.MQQueueManager; 
import com.ibm.mq.constants.MQConstants; 

public class MQPutTest extends TestCase{ 

    private static Logger log = Logger.getLogger(MQPutTest.class); 

    public void testPut() throws Exception{ 

     Hashtable<String, Object> props = new Hashtable<String, Object>(); 
     props.put(MQConstants.CHANNEL_PROPERTY, "my_channel"); 
     props.put(MQConstants.PORT_PROPERTY, 1414); 
     props.put(MQConstants.HOST_NAME_PROPERTY, "localhost"); 

     String qManager = "my_queue_manager"; 
     MQQueueManager qMgr = new MQQueueManager(qManager, props); 

     int openOptions = MQConstants.MQOO_OUTPUT | MQConstants.MQOO_INPUT_AS_Q_DEF; 

     MQDestination queue = qMgr.accessQueue("my_queue", openOptions); 

     MQPutMessageOptions pmo = new MQPutMessageOptions(); 
     pmo.options = MQConstants.MQPMO_ASYNC_RESPONSE; 

     MQMessage message = new MQMessage(); 
     message.format = MQConstants.MQFMT_STRING; 
     message.writeString("test message"); 
     queue.put(message, pmo); 

     queue.close(); 
     MQAsyncStatus asyncStatus = qMgr.getAsyncStatus(); 
     qMgr.disconnect(); 
    } 
} 

Я приписывая увеличение производительности, я вижу с большим количеством сообщений на тот факт, что очередь установлена ​​на не настойчиво, а не сообщения помещать его асинхра , Я установил тип ответа по умолчанию для Asynchronous в расширенных свойствах очереди в MQ explorer bu, это не имеет никакого эффекта.

Любая помощь будет оценена по достоинству.

+0

Сообщения, отправленные в фоновом режиме, не сохраняются, потому что они могут быть потеряны, если ваше приложение останавливается до отправки сообщения. Единственный способ узнать сообщение остался в том, чтобы дождаться его принятия восстановимым способом (обычно записывается на диск на сервере) –

+0

Спасибо, можно ли использовать огонь и забыть о постоянных очередях? Я хочу, чтобы отправлять сообщения в постоянную очередь, чтобы они выдержали перезагрузку, но не дождались ответа диспетчера очереди на каждый PUT, я просто хочу выкидывать сообщения в диспетчере очередей, но не нужно знать, если они попадают в очередь или нет для каждого сообщения. Я могу использовать MQAsyncStatus для согласования чисел после каждой партии сообщений. Это чисто по соображениям производительности. –

+0

Проблема в том, что либо вам нужны гарантии, либо нет. Похоже, что вы в порядке, если потеряно одно или два сообщения, но не все из них? –

ответ

0

Вслед за комментариями в вопросе WMQ не имеет понятия «постоянная очередь» (отсюда и причина, по которой я изменил заголовок сообщения). Атрибут queue для persistence никоим образом не изменяет, как QMgr обрабатывает очередь или любые сообщения в ней. Все, что он делает, - это указать программу, для которой параметр persistence используется, если программисту не удается явно установить это значение. Независимо от того, существует ли какое-либо данное сообщение, зависит от того, было ли задано постоянство MQMD, когда сообщение было сначала помещено в очередь. Любая очередь может содержать постоянные и непостоянные сообщения в любой комбинации.

В частности, фрагмент кода int he post не указывает сохранение, используя message.setPersistence(), чтобы он наследовал все значения по умолчанию для очереди. Это, в свою очередь, зависит от значения, унаследованного от атрибута очереди. В любом случае параметр атрибута очереди переопределяет явное значение из приложения.

Таким образом, разница в производительности, которую вы видите, скорее всего будет отражать настройку атрибута DEFPSIST очереди, но это не значит, что асинхронные установки не работают. Помните, что async puts никоим образом не уменьшает количество времени, которое требуется для размещения сообщения в очереди. QMgr имеет тот же путь к коду, что и для сохранения сообщения, является ли он синхронным или нет. Разница в том, что ваше приложение больше не ждет, пока QMgr завершит свою работу. Вполне возможно, что когда вы вызываете обновление MQAsyncStatus, что WMQ еще не написал любые сообщения. Это даже более вероятно, если все они находятся в одной единице работы, потому что WMQ не вернет статус для всех сообщений до тех пор, пока обработка COMMIT не будет завершена. Если вы не вызываете COMMIT явно, это происходит, когда вы закрываете очередь.

Вы можете проверить это, повторив вызов qMgr.getAsyncStatus() каждую секунду в течение нескольких секунд после COMMIT или CLOSE. Вы должны увидеть, что первый из них не возвращает сообщений, которые были успешно удалены, но в конечном итоге вы можете учитывать их все.

Кстати, вопрос о постоянстве сообщений должен быть всегда обработан в вашем коде. Постоянство сообщений обычно выводится как бизнес-требование, которое фиксируется в дизайне приложения.Поэтому руководитель проекта и разработчик обязаны гарантировать, что это требование выполнено. Единственный способ надежно удостовериться, что это выполнено, - это то, что приложение вызывает явно setPErsistence(). В противном случае приложение будет использовать значение из атрибута DEFPSIST очереди, чтобы неявно вызвать setPersistence(). Итак, зачем беспокоиться о разрешении такого типа по умолчанию в API? Потому что есть несколько законных случаев, когда настойчивость должна быть в состоянии изменить во время выполнения. Написание приложения для намеренного использования значения по умолчанию и повторного открытия очереди после выполнения каждой единицы работы. Во всех остальных случаях персистентность должна быть задана в программе или в управляемых объектах.

И, наконец, если вы помещаете 10 000 сообщений под единую единицу работы, еще одна причина, по которой вы можете получить ответ, в котором говорится, что сообщения не были успешно отправлены, - это нехватка места в журналах или перегородок, которые не установлены для размещения нагрузки. Например, если MAXUMSGS настроено на то, что меньше 10 000, вся ваша транзакция будет отклонена. С другой стороны, если MAXUMSGS настроен правильно, но размер и количество первичных и вторичных журналов недостаточны для хранения объема данных в транзакции, тогда снова вся транзакция откатится. Вы должны немного поиграть с интервалом COMMIT, потому что оптимальное значение зависит от размера сообщений в зависимости от размера очереди и буферов журналов. Как только вы превысите объем данных, которые можно оптимизировать в одну операцию записи, дополнительные сообщения фактически снижают производительность. Когда люди помещают 10 000 сообщений в единую единицу работы, это вряд ли когда-либо для производительности, а скорее потому, что это их соответствующая единица восстановления, и соответствующий удар производительности является вторичным по отношению к требованию восстановления.

0

Если его можно потерять одно или два сообщения, вы можете добавить сообщения во внутреннюю очередь отправки, например.

ExecutorService service = Executors.newSingleThreadedExecutor(); 

// build you message here 

// add the message to be sent asynchronously. 
service.execute(new Runnable() { 
    public void run() { 
     queue.put(message, pmo); 
    } 
}); 

Любые сообщения в очереди, когда процесс умирает, теряются.

+0

Вы думаете WMQ v6.0 здесь, верно? Поскольку в версии 7.0 собственный асинхронный PUT делегирует это в QMgr и не создает условие гонки между PUT и CLOSE, как и пример выше. Я бы подумал, что в v7.0 и выше разработчик будет * намного лучше использовать встроенную функцию, но это будет хорошим решением для v6.0. –

+0

Я предполагаю, что вы закроете соединение только при выключении приложения. Открытие и закрытие соединений могут быть более дорогими, чем создание/отправка сообщений. –

+0

Правильно, но если вы отправляете несколько сообщений в дочерний поток, а затем выдаете 'CLOSE', он создает условие гонки, в котором дескриптор соединения может уйти до того, как все сообщения будут« PUT »дочерним потоком. Хотя 'CLOSE' выдается только один раз за всю жизнь приложения, для предотвращения этого состояния должно быть некоторое управление дочерними потоками. В противном случае метод в значительной степени гарантирует потерю хотя бы одного или двух сообщений. –

Смежные вопросы