Я использую Apache Camel для использования сообщений из темы kafka, а затем обрабатываю сообщение, обрабатывая, если возникает исключение, я перенаправляю это сообщение на другую тему и процесс kafka, что в отдельный маршрут. поэтому у меня есть маршрут, как показано ниже.Apache camel route from kafka to another kafka topic on error
из ("kafka1"). Process ("someProcessor"). End(); onException (Throwable.class) .process (exchange -> {exchange.getIn(). SetBody ("Сообщение с информацией об ошибке")}). To ("kafka2");
Выше кода фактически отправляет сообщение об ошибке в ту же самую кафку (kafka1).
Я решил это, установив exchange.getIn(). SetHeader (KafkaConstants.TOPIC, "kafka2")) в процессе onException. Это ожидаемое поведение? почему он игнорирует kafka2 и вместо этого использует kafka1?
1) Версия верблюд используется - 2.14.0
2) Кафка конечная точка URL-адреса -
потребительскую
from("kafka:" + ("kafka.broker") + "?topic="
+ ("offer.kafka.topic")
+ "&zookeeperHost=" + ("kafka.zookeeper.host")
+ "&zookeeperPort=" + ("kafka.zookeeper.port")
+ "&groupId=" + ("offer.kafka.group.id")
+ "&consumerStreams=" + ("kafka.streams")
+ "&autoCommitIntervalMs=" + ("product.kafka.consumer.auto.commit.intervals")
+ "&zookeeperConnectionTimeoutMs=" + ("zookeeper.connection.timeout")
+ "&rebalanceMaxRetries=" + ("kafka.rebalance.max.retries")
+ "&rebalanceBackoffMs=" + ("kafka.rebalance.backoffs.ms")
+ "&zookeeperSessionTimeoutMs=" + ("zookeeper.session.timeout")
+ "&autoOffsetReset=" + ("kafka.auto.offset.reset")
+ "&fetchMessageMaxBytes=" + ("kafka.fetch.message.max.bytes")
+ "&socketReceiveBufferBytes=" + ("receive.buffer.bytes"))
.routeId("offerEventRoute").to("direct:offerEventRoute");
Производитель -
to("kafka:" + ("error.kafka.broker") + "?topic="
+ ("error.kafka.topic")
+ "&zookeeperHost=" + ("error.kafka.zookeeper.host")
+ "&zookeeperPort=" + ("error.kafka.zookeeper.port")
+ "&groupId=" + ("error.kafka.group.id")
+ "&zookeeperConnectionTimeoutMs=" + ("error.zookeeper.connection.timeout")
+ "&rebalanceMaxRetries=" + ("rebalance.max.retries")
+ "&rebalanceBackoffMs=" + ("rebalance.backoffs.ms")
+ "&zookeeperSessionTimeoutMs=" + ("zookeeper.session.timeout")
+ "&autoOffsetReset=" + ("auto.offset.reset")
+ "&messageSendMaxRetries=" + ("error.max.retries")
+ "&serializerClass=kafka.serializer.StringEncoder"
);
Спасибо Himanshu, я обновил все детали в описании – Amit