Подробное описание конфигурации:выключатель Настройка с сервис-активаторов
<int:publish-subscribe-channel id="toKafka"/>
<int:publish-subscribe-channel id="sendMessageToKafkaChannel"/>
<int:service-activator input-channel="toKafka" output-channel="sendMessageToKafkaChannel" order="1" ref="conditionalProducerService" method="producerCircuitBreaker">
<int:request-handler-advice-chain>
<ref bean="circuitBreakerAdvice" />
</int:request-handler-advice-chain>
</int:service-activator>
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext"
auto-startup="true" channel="toKafka" message-key="kafka_messageKey"/>
<bean id="circuitBreakerAdvice" class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2"/>
<property name="halfOpenAfter" value="15000" />
</bean>
public Message<?> producerCircuitBreaker(Message<?> payload) {
throw new RuntimeException("foo Pro");
}
for(int i=0;i<4;i++){
toKafka.send(MessageBuilder
.withPayload(messageVO.getMessageContentVO())
.setHeader(KafkaHeaders.TOPIC, topic)
.setHeader(KafkaHeaders.PARTITION_ID,Integer.parseInt(messageVO.getPartition())).
build());
APPLOGGER.info("sending message");
}
Ожидать, чтобы получить процесс потерпеть неудачу в 2 раза с исключением, а затем «выключателя открыто» исключение но он просто останавливается после того, как выбрал исключение в консоли.
Также как мы можем настроить канал ошибки здесь.
https://gist.github.com/anonymous/67aae50e548c78470cd0
обновленные конфигурации:
<int:service-activator input-channel="toKafka" ref="gw">
<int:request-handler-advice-chain> <ref bean="circuitBreakerAdvice"/>
</int:request-handler-advice-chain>
</int:service-activator>
<int:channel id="failedChannel1" />
<int:gateway id="gw" default-request-channel="toKafka" default-reply-timeout="0" error-channel="failedChannel1" />
<int:chain input-channel="failedChannel1">
<int:transformer expression="'failed:'+payload.failedMessage.payload+ ' with a' +payload.cause.message" />
<int-stream:stderr-channel-adapter append-newline="true"/>
</int:chain>
становится ниже исключения.
не удалось: TestVo [data = sample message]] с Невозможно обработать сообщение.
https://gist.github.com/anonymous/921be7691c41d125dc84
однако он работает с тем же сообщением иначе. (содержимое сообщения изменено намеренно)
Также попытался поставить недопустимое значение для контекста производителя: например. broker-list/value-class-type как недопустимый тип класса, чем ожидалось, как показано ниже.
Ошибка ниже, но ожидается, что CB попадет в изображение, и сообщение должно перетекать в канал ошибки.
в случае класса value-class: CB не вызывается, однако сообщение, поступающее на канал ошибки, но есть много сообщений для 1 опубликованного сообщения.
не удался: TestVo [данные = {TES сообщение}}] с преобразователем Нет найден, способным преобразовывать от типа xx.xxx.vo.TestVo к типу java.lang.String
них является происходящим в консоль много раз.
в случае брокерского списка: это просто исключение для исключения в консоли.
https://gist.github.com/anonymous/6ece517fb5e82ac73492
Ожидаемое: CB, чтобы ссылаться и поток сообщений в канале ошибки во всех случаях.
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext"
auto-startup="true" channel="toKafka" message-key="kafka_messageKey"/>
<int-kafka:producer-context id="producerContext" producer-properties="producerProperties">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration
broker-list="1.2.3:9092" topic="headers['topic']" key-class-type="java.lang.String"
value-class-type="java.lang.String"
value-encoder="kafkaEncoder" key-encoder="kafkaKeyEncoder"
compression-type="none" />
</int-kafka:producer-configurations>
</int-kafka:producer-context>
спасибо за предложение, он работает с блоком try/catch. Но теперь я обновил конфигурацию, как предлагалось с помощью шлюза, и хотел бы вызвать CB для тайм-аута подключения/недействительных данных брокера и т. Д. И других проблем, связанных с окружающей средой. Обновленная конфигурация в оригинальном посте. Пожалуйста, предложите. – sam
Нам нужно увидеть полную трассировку стека для второго условия - просто сообщение об ошибке недостаточно. –
уже добавили выше 2 ссылки на сообщение об ошибке, добавив его снова здесь, спасибо: https://gist.github.com/anonymous/921be7691c41d125dc84 https://gist.github.com/anonymous/6ece517fb5e82ac73492 – sam