2014-09-03 4 views
2

Я пытался проверить интеграцию верблюда с Кафка, как описано hereApache верблюд и Кафка интеграции

Ниже мой код

public class KafkaTest { 

    public static void main(String args[]) throws Exception { 
     CamelContext context = new DefaultCamelContext(); 

     context.addRoutes(new RouteBuilder() { 
      public void configure() { 
       from("kafka:test?zkConnect=localhost:2181&metadataBrokerList=localhost:9092") 
       .process(new Processor() { 
        @Override 
        public void process(Exchange exchange) throws Exception { 
         System.out.println(exchange.getIn().getBody()); 
        } 
       }) 
       .end(); 
      } 
     }); 

     context.start(); 
     while (true) { 

     } 
    } 
} 

Однако, я получаю следующее сообщение об ошибке

Exception in thread "main" org.apache.camel.FailedToCreateRouteException: Failed to create route route1: Route(route1)[[From[kafka:test?zkConnect=localhost:2181&... because of Failed to resolve endpoint: kafka://test?amp%3BmetadataBrokerList=localhost%3A9092&zkConnect=localhost%3A2181 due to: Failed to resolve endpoint: kafka://test?amp%3BmetadataBrokerList=localhost%3A9092&zkConnect=localhost%3A2181 due to: There are 2 parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint. 

Unknown parameters=[{metadataBrokerList=localhost:9092, zkConnect=localhost:2181}] 

Пожалуйста, представьте, чего не хватает.

ответ

1

Вы должны использовать правильные имена параметров, названные в official documentation.

from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181") 

версии вы имеете в виду, описаны в вики на GitHub, был внесен в Apache и несколько изменился с тех пор.

+0

Есть 2 компонента верблюжьей Кафка плавающие вокруг там. https://github.com/Giwi/camel-kafka => 3 года. Использует формат, используемый OP. https://github.com/apache/camel/tree/master/components/camel-kafka => CURRENT Использует параметры, представленные в этом ответе. – Jeff

0

Использовать класс конечных точек?

что-то вроде:

public static KafkaEndpoint endpoint(String host, String port, String topic, String offset, String groupId) { 
     String endpointUri = "kafka://" + host + ":" + port; 
     KafkaEndpoint endpoint = new DefaultCamelContext().getEndpoint(endpointUri, KafkaEndpoint.class); 
     endpoint.getConfiguration().setTopic(topic); 
     endpoint.getConfiguration().setKeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"); 
     endpoint.getConfiguration().setValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer"); 
     endpoint.getConfiguration().setAutoOffsetReset(offset); 
     endpoint.getConfiguration().setGroupId(groupId); 
     return endpoint; 
    } 

PollingConsumer consumer = endpoint.createPollingConsumer(); 

или

new RouteBuilder() { 
      public void configure() { 
       from(endpoint) 
       .process(new Processor() { 
        @Override 
        public void process(Exchange exchange) throws Exception { 
         System.out.println(exchange.getIn().getBody()); 
        } 
       }) 
       .end(); 
      } 
     }