2015-12-15 3 views
1

Я использую Apache Camel для использования сообщений из темы kafka, а затем обрабатываю сообщение, обрабатывая, если возникает исключение, я перенаправляю это сообщение на другую тему и процесс kafka, что в отдельный маршрут. поэтому у меня есть маршрут, как показано ниже.Apache camel route from kafka to another kafka topic on error

из ("kafka1"). Process ("someProcessor"). End(); onException (Throwable.class) .process (exchange -> {exchange.getIn(). SetBody ("Сообщение с информацией об ошибке")}). To ("kafka2");

Выше кода фактически отправляет сообщение об ошибке в ту же самую кафку (kafka1).

Я решил это, установив exchange.getIn(). SetHeader (KafkaConstants.TOPIC, "kafka2")) в процессе onException. Это ожидаемое поведение? почему он игнорирует kafka2 и вместо этого использует kafka1?

1) Версия верблюд используется - 2.14.0

2) Кафка конечная точка URL-адреса -

потребительскую

from("kafka:" + ("kafka.broker") + "?topic=" 
      + ("offer.kafka.topic") 
      + "&zookeeperHost=" + ("kafka.zookeeper.host") 
      + "&zookeeperPort=" + ("kafka.zookeeper.port") 
      + "&groupId=" + ("offer.kafka.group.id") 
      + "&consumerStreams=" + ("kafka.streams") 
      + "&autoCommitIntervalMs=" + ("product.kafka.consumer.auto.commit.intervals") 
      + "&zookeeperConnectionTimeoutMs=" + ("zookeeper.connection.timeout") 
      + "&rebalanceMaxRetries=" + ("kafka.rebalance.max.retries") 
      + "&rebalanceBackoffMs=" + ("kafka.rebalance.backoffs.ms") 
      + "&zookeeperSessionTimeoutMs=" + ("zookeeper.session.timeout") 
      + "&autoOffsetReset=" + ("kafka.auto.offset.reset") 
      + "&fetchMessageMaxBytes=" + ("kafka.fetch.message.max.bytes") 
      + "&socketReceiveBufferBytes=" + ("receive.buffer.bytes")) 
      .routeId("offerEventRoute").to("direct:offerEventRoute"); 

Производитель -

to("kafka:" + ("error.kafka.broker") + "?topic=" 
         + ("error.kafka.topic") 
         + "&zookeeperHost=" + ("error.kafka.zookeeper.host") 
         + "&zookeeperPort=" + ("error.kafka.zookeeper.port") 
         + "&groupId=" + ("error.kafka.group.id") 
         + "&zookeeperConnectionTimeoutMs=" + ("error.zookeeper.connection.timeout") 
         + "&rebalanceMaxRetries=" + ("rebalance.max.retries") 
         + "&rebalanceBackoffMs=" + ("rebalance.backoffs.ms") 
         + "&zookeeperSessionTimeoutMs=" + ("zookeeper.session.timeout") 
         + "&autoOffsetReset=" + ("auto.offset.reset") 
         + "&messageSendMaxRetries=" + ("error.max.retries") 
         + "&serializerClass=kafka.serializer.StringEncoder" 
     ); 

ответ

0

Можете ли вы предоставить несколько подробностей о коде, например,

1) Верблюд, используемый

2) URL-адрес вашей конечной точки Kafka.

любого шанса вы используете атрибут «bridgeEndpoint» в конечной точке URL ..

+0

Спасибо Himanshu, я обновил все детали в описании – Amit

1

Вы должны установить bridgeEndPoint к истинному в вашем производителе конечной точки Кафка. В противном случае он ищет имя темы в заголовках обмена и использует это как название темы для производителя.

По умолчанию это неверно.

Смежные вопросы