2015-12-23 3 views
4

Подробное описание конфигурации:выключатель Настройка с сервис-активаторов

<int:publish-subscribe-channel id="toKafka"/> 

    <int:publish-subscribe-channel id="sendMessageToKafkaChannel"/> 

    <int:service-activator input-channel="toKafka" output-channel="sendMessageToKafkaChannel" order="1" ref="conditionalProducerService" method="producerCircuitBreaker"> 
     <int:request-handler-advice-chain> 
       <ref bean="circuitBreakerAdvice" /> 
     </int:request-handler-advice-chain> 
    </int:service-activator> 

    <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext" 
                     auto-startup="true" channel="toKafka" message-key="kafka_messageKey"/> 

    <bean id="circuitBreakerAdvice" class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice"> 
    <property name="threshold" value="2"/>      
    <property name="halfOpenAfter" value="15000" />      
    </bean> 

public Message<?> producerCircuitBreaker(Message<?> payload) { 
     throw new RuntimeException("foo Pro"); 
    } 

for(int i=0;i<4;i++){ 
      toKafka.send(MessageBuilder 
           .withPayload(messageVO.getMessageContentVO())         
           .setHeader(KafkaHeaders.TOPIC, topic) 
           .setHeader(KafkaHeaders.PARTITION_ID,Integer.parseInt(messageVO.getPartition())). 
           build()); 

       APPLOGGER.info("sending message"); 
    } 

Ожидать, чтобы получить процесс потерпеть неудачу в 2 раза с исключением, а затем «выключателя открыто» исключение но он просто останавливается после того, как выбрал исключение в консоли.

Также как мы можем настроить канал ошибки здесь.

https://gist.github.com/anonymous/67aae50e548c78470cd0

обновленные конфигурации:

<int:service-activator input-channel="toKafka" ref="gw"> 
    <int:request-handler-advice-chain> <ref bean="circuitBreakerAdvice"/> 
         </int:request-handler-advice-chain> 
    </int:service-activator> 

    <int:channel id="failedChannel1" /> 


    <int:gateway id="gw" default-request-channel="toKafka" default-reply-timeout="0" error-channel="failedChannel1" /> 

    <int:chain input-channel="failedChannel1"> 
     <int:transformer expression="'failed:'+payload.failedMessage.payload+ ' with a' +payload.cause.message" /> 
     <int-stream:stderr-channel-adapter append-newline="true"/> 
      </int:chain> 

становится ниже исключения.
не удалось: TestVo [data = sample message]] с Невозможно обработать сообщение.
https://gist.github.com/anonymous/921be7691c41d125dc84
однако он работает с тем же сообщением иначе. (содержимое сообщения изменено намеренно)

Также попытался поставить недопустимое значение для контекста производителя: например. broker-list/value-class-type как недопустимый тип класса, чем ожидалось, как показано ниже.

Ошибка ниже, но ожидается, что CB попадет в изображение, и сообщение должно перетекать в канал ошибки.

в случае класса value-class: CB не вызывается, однако сообщение, поступающее на канал ошибки, но есть много сообщений для 1 опубликованного сообщения.

не удался: TestVo [данные = {TES сообщение}}] с преобразователем Нет найден, способным преобразовывать от типа xx.xxx.vo.TestVo к типу java.lang.String

них является происходящим в консоль много раз.

в случае брокерского списка: это просто исключение для исключения в консоли.

https://gist.github.com/anonymous/6ece517fb5e82ac73492

Ожидаемое: CB, чтобы ссылаться и поток сообщений в канале ошибки во всех случаях.

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext" 
                     auto-startup="true" channel="toKafka" message-key="kafka_messageKey"/> 

    <int-kafka:producer-context id="producerContext" producer-properties="producerProperties"> 
                  <int-kafka:producer-configurations> 
                      <int-kafka:producer-configuration 
                          broker-list="1.2.3:9092" topic="headers['topic']" key-class-type="java.lang.String" 
                          value-class-type="java.lang.String" 
                          value-encoder="kafkaEncoder" key-encoder="kafkaKeyEncoder" 
                          compression-type="none" /> 
                  </int-kafka:producer-configurations> 
              </int-kafka:producer-context> 

ответ

3

С этим кодом, вам нужно try {...} вокруг send().

Первые две попытки поймают ваш RuntimeException; следующий будет улавливать исключение автоматического выключателя.

Используйте Messaging Gateway с каналом ошибок вместо прямой отправки на канал.

EDIT

Этот код ...

<int:service-activator input-channel="toKafka" ref="gw"> 
    <int:request-handler-advice-chain> <ref bean="circuitBreakerAdvice"/> 
        </int:request-handler-advice-chain> 
</int:service-activator> 

<int:gateway id="gw" default-request-channel="toKafka" default-reply-timeout="0" error-channel="failedChannel1" /> 

При отправке сообщения в toKafka, шлюз будет вызываться, который отправит сообщение toKafka в цикле.

Это вызовет переполнение стека.

+0

спасибо за предложение, он работает с блоком try/catch. Но теперь я обновил конфигурацию, как предлагалось с помощью шлюза, и хотел бы вызвать CB для тайм-аута подключения/недействительных данных брокера и т. Д. И других проблем, связанных с окружающей средой. Обновленная конфигурация в оригинальном посте. Пожалуйста, предложите. – sam

+0

Нам нужно увидеть полную трассировку стека для второго условия - просто сообщение об ошибке недостаточно. –

+0

уже добавили выше 2 ссылки на сообщение об ошибке, добавив его снова здесь, спасибо: https://gist.github.com/anonymous/921be7691c41d125dc84 https://gist.github.com/anonymous/6ece517fb5e82ac73492 – sam

Смежные вопросы