2016-01-12 4 views
3

Я пытаюсь создать простой комплект для производителя Kafka в версии Apache Karaf 4.0.3.Комплект Karaf - Kafka OSGI - выпуск продюсера

Вот мой Java-код

Properties props = new Properties(); 
props.put("bootstrap.servers", "localhost:9092"); 
props.put("acks", "all"); 
props.put("retries", 0); 
props.put("batch.size", 16384); 
props.put("linger.ms", 1); 
props.put("buffer.memory", 33554432); 
//props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
//props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
props.put("partitioner.class","org.apache.kafka.clients.producer.internals.DefaultPartitioner"); 
Producer<String, String> producer = new KafkaProducer<String,String>(props,new StringSerializer(),new StringSerializer()); 
//for(int i = 0; i < 100; i++) 
    producer.send(new ProducerRecord<String, String>("test","data", outputData)); 

producer.close(); 

я четко заявил, соответствующую зависимость в pom.xml

<dependency> 
     <groupId>org.apache.servicemix.bundles</groupId> 
     <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId> 
     <version>0.9.0.0_1</version> 
</dependency> 

Я развертывается Кафка клиента пакет тоже.

, но при запуске производителя я вижу ниже исключения на с первой попытки.

Exception in thread "pool-135-thread-1" java.lang.ExceptionInInitializerError 
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:194) 
    . 
    . 
    . 
    at com.google.common.util.concurrent.Futures$6.run(Futures.java:1319) 
    at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72) 
    at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72) 
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.clients.producer.internals.DefaultPartitioner for configuration partitioner.class: Class org.apache.kafka.clients.producer.internals.DefaultPartitioner could not be found. 
    at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:255) 
    at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:78) 
    at org.apache.kafka.common.config.ConfigDef.define(ConfigDef.java:94) 
    at org.apache.kafka.clients.producer.ProducerConfig.<clinit>(ProducerConfig.java:206) 
    ... 12 more 

А затем последовательно этот ...

Exception in thread "pool-136-thread-1" java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.clients.producer.ProducerConfig 
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:194) 
. 
. 
. 
at com.google.common.util.concurrent.Futures$6.run(Futures.java:1319) 
at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72) 
at com.gilt.gfc.guava.future.FutureConverters$ScalaFutureAdapter$$anonfun$addListener$1.apply(FutureConverters.scala:72) 
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745 

Кто encoutered подобный вопрос с расслоением ??

+0

OSGi выполнения не заботится о своих зависимости Maven. Он заботится только о MANIFEST внутри вашего пакета. Если вы загружаете классы по имени, как вам кажется, вам нужно добавить конфигурацию времени сборки, чтобы соответствующие пакеты были добавлены в список импортированных пакетов. Или вы добавляете 'DynamicImport-Package: *' к вашему MANIFEST. Как именно вы это делаете, зависит от того, как вы строите свой пакет. С 'maven-bundle-plugin'? – Ralf

+0

Да, у меня такая же проблема. Самое смешное, что если я вложу кувшин кафки (либо servicemix один, либо оригинальный кафка), когда я создаю экземпляр KafkaProducer, я сразу же получу эту ошибку, хотя я могу очень просто обратиться к KafkaProducer. Либо что-то странное, это испортить сообщение об ошибке, либо происходит некоторая темная загрузка классов ... –

ответ

1

с использованием клиентской версии Kafka 0.8.2.2_1, решил проблему.

1

Я видел эту проблему в 0.9.0. Оказалось, что загружен загрузчик контекста потока, и в этом случае Kafka использует этот загрузчик классов для разрешения. Таким образом, контекст резьбы Загрузчик классов должен быть либо:

  • загрузчики классов, которые могут решить все связанные Кафка вещи
  • null

Не знаю, если это будет укусить меня, но добавил:

Thread.currentThread().setContextClassLoader(null); 

сделал трюк.

+0

Ваше решение согласуется с обходным решением, предложенным в JIRA: https://issues.apache.org/jira/browse/KAFKA-3218 – Kyr

+0

Когда этот запрос на растяжение объединяется, должно быть другое решение: https://github.com/apache/kafka/pull/1421 – lightswitch05

1

Might мне полезным для других, работы вокруг предложенной в JIRA

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class); 

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);