2016-02-18 3 views
1

В настоящее время я работаю с интеграцией с пружиной для нового приложения и начал работу с poc, чтобы узнать, как обрабатывать случаи сбоя. В моем приложении интеграция с весной будет получать сообщение от IBM mq и проверять информацию заголовка, а маршрут в другую очередь зависит от типа сообщения. входящее сообщение может быть массовым сообщением, поэтому я использовал сплиттер и агрегатор из весенней интеграции, и у меня есть хороший прогресс и контроль над техническим документооборотом. В настоящее время сталкиваются с несколькими проблемами, у нас есть IBM mq, а также webservice в качестве нашего шлюза. Оба шлюза принимают сообщение и отправляют на канал сплиттера, где разделитель разбивает сообщение и отправляет на исходящий канал (канал исполнителя). поэтому сообщение будет отправляться в пункт назначения параллельно, а активатор службы обновления состояния получит сообщение с тем же каналом с порядком = 2 и отправит в агрегатор. поэтому для его хорошего с реализацией.пружинная интеграция - разделитель и агрегатор

Проблема: Проблема: если jms исходящий шлюз выдает исключение, я добавил совет в качестве обработчика исключений, который отправит другому активатору службы, чтобы обновить статус отказа до объекта DTO и будет иметь тот же канал агрегатора, что и вывод, но я не получаю сообщение в канале агрегатора в этом случае и агрегатор получают только в счастливом потоке.

Я хочу агрегировать исходящее сообщение об успешном сообщении и сбое (другой активатор службы обновляет статус), а затем полный статус должен быть отправлен в очередь ответа в качестве другого исходящего или в качестве ответа в webservice.

Я попытался заказать успешный активатор службы и активатор службы обработчика сбоя, чтобы иметь тот же канал, который является входным каналом для агрегатора и его не работает.

Оценил для вашего руководства, чтобы продолжить рабочий процесс

с использованием Spring Integration 2.2.2

<channel id="inbound"/> 
<channel id="splitterInChannel"/> 

<channel id="splitterOutChannel"> 
<dispatcher task-executor="splitterExecutor"/> 
</channel>   
<beans:bean id="splitterExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
    <beans:property name="corePoolSize" value="5" /> 
    <beans:property name="maxPoolSize" value="10" /> 
    <beans:property name="queueCapacity" value="25" /> 
</beans:bean> 
<channel id="ValidatorChannel"/> 

<channel id="outBoundjmsChannel"/> 
<channel id="outBoundErrorChannel"/> 
<channel id="finalOutputChannel"></channel> 
<channel id="aggregatorChannel"/> 

<jms:inbound-channel-adapter connection-factory="AMQConnectionFactory" 
destination="AMQueue" channel="inbound" auto-startup="true" 
extract-payload="false" acknowledge="transacted"></jms:inbound-channel-adapter> 


<service-activator ref="InBoundProcessor" input-channel="inbound" output-channel="splitterInChannel"></service-activator> 

<!-- splitter --> 
<splitter ref="Splitter" method="splitInput" input-channel="splitterInChannel" output-channel="splitterOutChannel"/> 
<!-- validator --> 
<service-activator ref="Validator" method="validate" input-channel="splitterOutChannel" output-channel="ValidatorChannel"/>   

<!-- need to add enricher --> 
<service-activator ref="Enricher" method="enrich" input-channel="ValidatorChannel" output-channel="outBoundjmsChannel"/>    

<!-- outbound gateway --> 

<jms:outbound-channel-adapter channel="outBoundjmsChannel" connection-factory="AMQConnectionFactory" destination-name="outputQueue" 
message-converter="customMessageConvertor" order="1" > 

     <jms:request-handler-advice-chain> 
      <beans:bean class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice"> 
       <beans:property name="retryTemplate" > 
        <beans:bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> 
         <beans:property name="retryPolicy"> 
          <beans:bean class="org.springframework.retry.policy.SimpleRetryPolicy"> 
          <beans:property name="maxAttempts" value="2" /> 
          </beans:bean> 
         </beans:property> 
        <beans:property name="backOffPolicy"> 
         <beans:bean class="org.springframework.retry.backoff.FixedBackOffPolicy"> 
         <beans:property name="backOffPeriod" value="1000" /> 
         </beans:bean> 
         </beans:property> 
        </beans:bean>    
       </beans:property> 
       <beans:property name="recoveryCallback"> 
        <beans:bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer"> 
         <beans:constructor-arg ref="outBoundErrorChannel" /> 
        </beans:bean> 
       </beans:property> 

      </beans:bean> 
     </jms:request-handler-advice-chain> 
    </jms:outbound-channel-adapter> 



<!-- outBound error processor -->   
<service-activator ref="ErrorProcessor" method="errorHandling" input-channel="outBoundErrorChannel" output-channel="aggregatorChannel" />   
<!-- Post send processor --> 
<service-activator ref="PostProcessor" method="Postprocessing" input-channel="outBoundjmsChannel" output-channel="aggregatorChannel" order="2"/>   
<!-- aggregator --> 
<aggregator ref="Aggregator" correlation-strategy-method="aggregateStrategy" input-channel="aggregatorChannel" output-channel="finalOutputChannel" 
release-strategy-method="isRelease" method="aggregate" expire-groups-upon-completion="true"/> 


<!-- final processor or responder --> 
<service-activator ref="FinalProcessor" method="endProcessing" input-channel="finalOutputChannel"/>   

</beans:beans> 

В приведенной выше конфигурации, как сейчас я уже дал стратегию выхода как ложные и корреляционный метод, как пустая строка, если это работает, я буду генерировать UUID для партии и присоединяет UUID в сплиттере к corrlate.

при отладке вышеуказанной конфигурации я заметил, что исходящий канал ошибок принимается всякий раз, когда он пытается отправить исходящий адаптер (в моем случае его отправка дважды). Я не хочу повторять попытку в одном из приложений, а в другом приложении ему нужно попытаться отправить сообщение. В обоих случаях я хочу отправить сообщение в канал исходящей ошибки после последней попытки агрегирования, если это не удается, я обновлю статус в ErrorProcessor, как не удалось отправить.

Два вопроса. 1. Получаю дублирующее сообщение на канал и трудно идентифицировать последний сбой или успех. 2. Невозможно сделать логику для стратегии выпуска и трудно определить, что является дубликатом, и является ли его успешным или нет.

В этом случае я не смог найти общий способ сравнения объектов, потому что метод equals не имеет соответствующих атрибутов для сравнения, и это не будет правильный способ сравнения с логическим полем.

, пожалуйста, помогите мне решить эту проблему, чтобы продолжить разработку и выполнение рабочего процесса.

Большое спасибо за руководство меня продолжить. Спасибо, Криш S


В настоящее время

public Object errorHandling(Object object){ 
     OutBoundMessage outBoundMessage = null; 
     if(object instanceof MessagingException){ 

      outBoundMessage =((MessagingException) object).getFailedMessage(); 
     }else{ 
      //TODO: log the message 
     } 

     return outBoundMessage; 
    } 




    public String aggregateStrategy(OutBoundMessage outBoundMessage){ 

     //TODO: get the UUID from outbound message and return 
     return ""; 
    } 




public List<OutBoundMessage> splitter(InBoundMessage inBoundMessage){ 

     String[] message = inBoundMessage.getRawMessage().split(","); 
     long uuid = java.util.UUID.randomUUID().getLeastSignificantBits(); 
     List<OutBoundMessage> outBoundMessagelist = new ArrayList<OutBoundMessage>(); 
     for (String string : message) { 
      OutBoundMessage outBoundMessage = new outBoundMessage(); 
      outBoundMessage.setCorrelationID(uuid); 
      outBoundMessagelist.add(outBoundMessage); 
     } 

    } 

Добавлены по умолчанию ложно в следующем методе проверка

public boolean isRelease(List<OutBoundMessage> outBoundMessage){ 
     //TODO: need to define condition for closing the list aggregation 
     return false; 
    } 
+0

Вам следует поделиться конфигурацией обработки ошибок. И объясните больше, что означает «я не получаю сообщение в канале агрегатора». Это странно. Я могу предположить, что вы не можете агрегировать из-за потери 'correKey'. Или ... ваша служба обновления возвращает «null». –

+1

Спасибо, Билан, я настроил совет для обработки сообщения об ошибке, агрегатор даже не группирует успешное сообщение, и я сконфигурировал агрегатор в качестве группы истечения при завершении. поэтому агрегатор SI ожидает все расщепленное сообщение, в котором одно из сообщений генерирует исключение и берет канал ошибки. агрегатор все еще ожидает сообщения, которое не достигает агрегатора. сообщите мне, какая конфигурация должна быть выполнена для стратегии корреляции или выпуска. –

+1

Я попросил вас о конфиге. Такое «словосочетание» может быть бесконечным. Почему вы уверены, что сообщения об ошибках не доходят до агрегатора? Наверное, у тебя нет соответствующей «корреляции». Вы должны предоставить его из 'requestMessage' при создании' errorMessage'. Вот почему config был бы полезен, потому что я даже не понимаю, что такое «Совет», который вы используете для обработки ошибок. –

ответ

2

Пожалуйста, поделитесь ErrorProcessor исходным кодом. И correlation-strategy-method="aggregateStrategy" также.

Я хотел бы знать, как вы имеете дело с ErrorMessage там и как вы восстанавливаете correlationKey из сообщения после вашего ErrorProcessor.

Не знаете, как вы создаете свои собственные correlationKey, но <splitter> предоставляют applySequence = true по умолчанию. Таким образом, Correlation Details доступны в каждом расщепленном сообщении, чтобы иметь возможность агрегировать впоследствии.

для вашего ErrorMessage из ErrorMessageSendingRecoverer я могу порекомендовать обратить внимание на Exceptionpayload там. Похоже (от ErrorMessageSendingRecoverer исходного кода):

else if (!(lastThrowable instanceof MessagingException)) { 
     lastThrowable = new MessagingException((Message<?>) context.getAttribute("message"), 
       lastThrowable.getMessage(), lastThrowable); 
    } 
.... 
messagingTemplate.send(new ErrorMessage(lastThrowable)); 

Итак, что MessagingException, имеет «виновное» сообщение для Exception и точно, что сообщение имеет соответствующий Correlation Details заголовки для агрегатора. Поэтому вам следует полагаться на них, если вы хотите объединить ошибки в одну группу сообщений.

+0

@Artemisia Bilan ...Пожалуйста, предложите –

+0

. Я понял, что мой метод обработчика ошибок вызывает дублирующуюся проблему, если я возвращаю значение null, не заканчивая дублированием, но в моем коде как определить успешную и неудачную разницу в сообщениях в агрегаторе –

0

Наконец-то я понял, как это работает,

Я логическое значение ИСТИНА в сообщении конвертора и в Errorhandle я поставил его в ложное и вернуть нуль, так что восстановление является то, что сообщение получено, как неуспешного сообщения для агрегатора и понял, что происходит, когда я возвращаю объект Спасибо @ArtemBilan, ваш блок кода дал мне представление о том, что происходит, и что мне делать

+0

Рад слышать! Не уверен в вашем решении: с одной стороны это полностью не понятно, потому что мы не видим код. С другой стороны это звучит нестандартно, намекая мне, что вы должны прочитать больше документов по этому вопросу и не пытаться изобретать велосипед. Я действительно уверен, что стандартной «Корреляционной информации» должно быть достаточно для вас. И неудавшееся сообщение может быть совсем другим типом, хотя заголовок 'boolean' тоже хорошо работает. –

+0

Я использовал Java.util.UUID.toString() как идентификатор корреляции. Который я установил во всех объектах в списке для этой партии, которые я использовал в методе statergy. –

+0

Также для выпуска я установил размер партии во всех объектах в списке, где я сравниваю размер, чтобы освободить стабилизатор релиза statergy. –

Смежные вопросы