В настоящее время я работаю с интеграцией с пружиной для нового приложения и начал работу с poc, чтобы узнать, как обрабатывать случаи сбоя. В моем приложении интеграция с весной будет получать сообщение от IBM mq и проверять информацию заголовка, а маршрут в другую очередь зависит от типа сообщения. входящее сообщение может быть массовым сообщением, поэтому я использовал сплиттер и агрегатор из весенней интеграции, и у меня есть хороший прогресс и контроль над техническим документооборотом. В настоящее время сталкиваются с несколькими проблемами, у нас есть IBM mq, а также webservice в качестве нашего шлюза. Оба шлюза принимают сообщение и отправляют на канал сплиттера, где разделитель разбивает сообщение и отправляет на исходящий канал (канал исполнителя). поэтому сообщение будет отправляться в пункт назначения параллельно, а активатор службы обновления состояния получит сообщение с тем же каналом с порядком = 2 и отправит в агрегатор. поэтому для его хорошего с реализацией.пружинная интеграция - разделитель и агрегатор
Проблема: Проблема: если jms исходящий шлюз выдает исключение, я добавил совет в качестве обработчика исключений, который отправит другому активатору службы, чтобы обновить статус отказа до объекта DTO и будет иметь тот же канал агрегатора, что и вывод, но я не получаю сообщение в канале агрегатора в этом случае и агрегатор получают только в счастливом потоке.
Я хочу агрегировать исходящее сообщение об успешном сообщении и сбое (другой активатор службы обновляет статус), а затем полный статус должен быть отправлен в очередь ответа в качестве другого исходящего или в качестве ответа в webservice.
Я попытался заказать успешный активатор службы и активатор службы обработчика сбоя, чтобы иметь тот же канал, который является входным каналом для агрегатора и его не работает.
Оценил для вашего руководства, чтобы продолжить рабочий процесс
с использованием Spring Integration 2.2.2
<channel id="inbound"/>
<channel id="splitterInChannel"/>
<channel id="splitterOutChannel">
<dispatcher task-executor="splitterExecutor"/>
</channel>
<beans:bean id="splitterExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<beans:property name="corePoolSize" value="5" />
<beans:property name="maxPoolSize" value="10" />
<beans:property name="queueCapacity" value="25" />
</beans:bean>
<channel id="ValidatorChannel"/>
<channel id="outBoundjmsChannel"/>
<channel id="outBoundErrorChannel"/>
<channel id="finalOutputChannel"></channel>
<channel id="aggregatorChannel"/>
<jms:inbound-channel-adapter connection-factory="AMQConnectionFactory"
destination="AMQueue" channel="inbound" auto-startup="true"
extract-payload="false" acknowledge="transacted"></jms:inbound-channel-adapter>
<service-activator ref="InBoundProcessor" input-channel="inbound" output-channel="splitterInChannel"></service-activator>
<!-- splitter -->
<splitter ref="Splitter" method="splitInput" input-channel="splitterInChannel" output-channel="splitterOutChannel"/>
<!-- validator -->
<service-activator ref="Validator" method="validate" input-channel="splitterOutChannel" output-channel="ValidatorChannel"/>
<!-- need to add enricher -->
<service-activator ref="Enricher" method="enrich" input-channel="ValidatorChannel" output-channel="outBoundjmsChannel"/>
<!-- outbound gateway -->
<jms:outbound-channel-adapter channel="outBoundjmsChannel" connection-factory="AMQConnectionFactory" destination-name="outputQueue"
message-converter="customMessageConvertor" order="1" >
<jms:request-handler-advice-chain>
<beans:bean class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
<beans:property name="retryTemplate" >
<beans:bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<beans:property name="retryPolicy">
<beans:bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<beans:property name="maxAttempts" value="2" />
</beans:bean>
</beans:property>
<beans:property name="backOffPolicy">
<beans:bean class="org.springframework.retry.backoff.FixedBackOffPolicy">
<beans:property name="backOffPeriod" value="1000" />
</beans:bean>
</beans:property>
</beans:bean>
</beans:property>
<beans:property name="recoveryCallback">
<beans:bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
<beans:constructor-arg ref="outBoundErrorChannel" />
</beans:bean>
</beans:property>
</beans:bean>
</jms:request-handler-advice-chain>
</jms:outbound-channel-adapter>
<!-- outBound error processor -->
<service-activator ref="ErrorProcessor" method="errorHandling" input-channel="outBoundErrorChannel" output-channel="aggregatorChannel" />
<!-- Post send processor -->
<service-activator ref="PostProcessor" method="Postprocessing" input-channel="outBoundjmsChannel" output-channel="aggregatorChannel" order="2"/>
<!-- aggregator -->
<aggregator ref="Aggregator" correlation-strategy-method="aggregateStrategy" input-channel="aggregatorChannel" output-channel="finalOutputChannel"
release-strategy-method="isRelease" method="aggregate" expire-groups-upon-completion="true"/>
<!-- final processor or responder -->
<service-activator ref="FinalProcessor" method="endProcessing" input-channel="finalOutputChannel"/>
</beans:beans>
В приведенной выше конфигурации, как сейчас я уже дал стратегию выхода как ложные и корреляционный метод, как пустая строка, если это работает, я буду генерировать UUID для партии и присоединяет UUID в сплиттере к corrlate.
при отладке вышеуказанной конфигурации я заметил, что исходящий канал ошибок принимается всякий раз, когда он пытается отправить исходящий адаптер (в моем случае его отправка дважды). Я не хочу повторять попытку в одном из приложений, а в другом приложении ему нужно попытаться отправить сообщение. В обоих случаях я хочу отправить сообщение в канал исходящей ошибки после последней попытки агрегирования, если это не удается, я обновлю статус в ErrorProcessor, как не удалось отправить.
Два вопроса. 1. Получаю дублирующее сообщение на канал и трудно идентифицировать последний сбой или успех. 2. Невозможно сделать логику для стратегии выпуска и трудно определить, что является дубликатом, и является ли его успешным или нет.
В этом случае я не смог найти общий способ сравнения объектов, потому что метод equals не имеет соответствующих атрибутов для сравнения, и это не будет правильный способ сравнения с логическим полем.
, пожалуйста, помогите мне решить эту проблему, чтобы продолжить разработку и выполнение рабочего процесса.
Большое спасибо за руководство меня продолжить. Спасибо, Криш S
В настоящее время
public Object errorHandling(Object object){
OutBoundMessage outBoundMessage = null;
if(object instanceof MessagingException){
outBoundMessage =((MessagingException) object).getFailedMessage();
}else{
//TODO: log the message
}
return outBoundMessage;
}
public String aggregateStrategy(OutBoundMessage outBoundMessage){
//TODO: get the UUID from outbound message and return
return "";
}
public List<OutBoundMessage> splitter(InBoundMessage inBoundMessage){
String[] message = inBoundMessage.getRawMessage().split(",");
long uuid = java.util.UUID.randomUUID().getLeastSignificantBits();
List<OutBoundMessage> outBoundMessagelist = new ArrayList<OutBoundMessage>();
for (String string : message) {
OutBoundMessage outBoundMessage = new outBoundMessage();
outBoundMessage.setCorrelationID(uuid);
outBoundMessagelist.add(outBoundMessage);
}
}
Добавлены по умолчанию ложно в следующем методе проверка
public boolean isRelease(List<OutBoundMessage> outBoundMessage){
//TODO: need to define condition for closing the list aggregation
return false;
}
Вам следует поделиться конфигурацией обработки ошибок. И объясните больше, что означает «я не получаю сообщение в канале агрегатора». Это странно. Я могу предположить, что вы не можете агрегировать из-за потери 'correKey'. Или ... ваша служба обновления возвращает «null». –
Спасибо, Билан, я настроил совет для обработки сообщения об ошибке, агрегатор даже не группирует успешное сообщение, и я сконфигурировал агрегатор в качестве группы истечения при завершении. поэтому агрегатор SI ожидает все расщепленное сообщение, в котором одно из сообщений генерирует исключение и берет канал ошибки. агрегатор все еще ожидает сообщения, которое не достигает агрегатора. сообщите мне, какая конфигурация должна быть выполнена для стратегии корреляции или выпуска. –
Я попросил вас о конфиге. Такое «словосочетание» может быть бесконечным. Почему вы уверены, что сообщения об ошибках не доходят до агрегатора? Наверное, у тебя нет соответствующей «корреляции». Вы должны предоставить его из 'requestMessage' при создании' errorMessage'. Вот почему config был бы полезен, потому что я даже не понимаю, что такое «Совет», который вы используете для обработки ошибок. –