2016-03-13 2 views
0

У меня есть веб-приложение, которое отправляет сообщения kafka, считывая некоторые значения из запроса. Как отправить статус ошибки, если мой сервер kafka не работает. Прямо сейчас на сервере вниз дела prodcuer постоянно пытается подключиться и регистрирует ошибку ниже бесконечно.Kafka spring integation как hanlde, если сервер Kafka не работает

Ошибка:

14:56:10.181 [kafka-producer-network-thread | producer-1] WARN o.a.kafka.common.network.Selector - Error in I/O with localhost/127.0.0.1 
java.net.ConnectException: Connection refused 

конфигурации Производитель является:

<int:channel id="inputToKafka"> 
    <int:queue/> 
</int:channel> 

<int-kafka:outbound-channel-adapter 
     id="kafkaOutboundChannelAdapter" 
     auto-startup="true" 
     kafka-producer-context-ref="kafkaProducerContext" 
     channel="inputToKafka" > 
    <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="10" task-executor="taskExecutor"/> 
</int-kafka:outbound-channel-adapter> 

<task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/> 

<int-kafka:producer-context id="kafkaProducerContext"> 
    <int-kafka:producer-configurations> 
     <int-kafka:producer-configuration broker-list="localhost:9092" 
              key-class-type="java.lang.String" 
              value-class-type="java.lang.String" 
              sync="true" 
              send-timeout="10" 
              topic="test" 
              key-encoder="kafkaEncoder" 
              value-encoder="kafkaEncoder" 
              compression-type="none"/> 
    </int-kafka:producer-configurations> 
</int-kafka:producer-context> 
<bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.common.StringEncoder"> 
</bean> 
<bean id="valueEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder"> 
    <constructor-arg value="com.fastretailing.catalogPlatformSCMProducer.model.ProducerMessage" /> 
</bean> 

Отправка код:

ProducerMessage k = list.get(0); 
    boolean status = false; 
    try { 
     status = inputToKafka.send(
       MessageBuilder.withPayload(k.getJsonString(k)) 
         .setHeader(KafkaHeaders.MESSAGE_KEY, k.getFeedName()) // Note: the header was `messageKey` in earlier versions 
         .setHeader(KafkaHeaders.TOPIC, "test")  // Note: the header was `topic` in earlier versions 
         .build() 
     ); 
    } catch (Exception e) { 
     System.out.println("eeeeeeeeeeeeeeeeeeeeeeeeeeeeee"); 
     System.out.println(e); 
    } 
    System.out.println(status); 
    if (!status){ 
     System.out.println("errrrrrrrrrrrrrrrrrrrrrrrr"); 
    } 

Я всегда получаю статус как истинный. Даже когда сервер kafka не работает. И вышеприведенное соединение отказало, что ошибка постоянно отображается.

ответ

0

Удалите подольщика и элемент <queue/> от inputToKafka.

Все, что говорится в статусе, заключается в том, что сообщение было помещено в канал ok.

Выполнение этой задачи DirectChannel (путем удаления очереди) означает, что переходная передача будет выполняться в вызывающем потоке.

+0

Спасибо большое @Gary .. Я попробую это и скоро обновится. –

+0

Я пробовал, как вы сказали. Но когда я положил сервер kafka на тестирование обработки исключений, мой сервер постоянно регистрирует приведенное ниже исключение. Как я могу поймать это и дать ответ об ошибке? \t \t java.net.ConnectException: Соединение отклонено \t \t \t на sun.nio.ch.SocketChannelImpl.checkConnect (Native Method) ~ [на: 1.8.0_45] –

+0

Это идет от основного клиента пытается подключиться - по умолчанию он делает это каждые 10 мс - вы можете увеличить его или изменить свойство 'reconnect.backoff.ms', или вы можете увеличить уровень журнала для этого класса до ERROR. Кроме того, по умолчанию исключение не будет выдано до 60 секунд; поэтому уменьшите эту задержку, я изменил [пример приложения] (https://github.com/garyrussell/spring-integration-samples/commit/52c8890e7abb7ac55d70114f4499e0ce9c24b6fa) и протестировал его, и он работал, как ожидалось. Дополнительную информацию об этих свойствах можно найти в [kafka docs] (http://kafka.apache.org/082/documentation.html). –

Смежные вопросы