2016-10-14 2 views
4

Я использую новейшие флаеры Flink-1.1.2-Hadoop-27 и flink-connector-kafka-0.10.2-hadoop1 ,ClassNotFoundException: org.apache.flink.streaming.api.checkpoint.CheckpointNotifier при потреблении темы kafka

Flink потребитель, как показано ниже:

StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); 
     if (properties == null) { 
      properties = new Properties(); 
      InputStream props = Resources.getResource(KAFKA_CONFIGURATION_FILE).openStream(); 
      properties.load(props); 

      DataStream<String> stream = env.addSource(new FlinkKafkaConsumer082<>(KAFKA_SIP_TOPIC, new SimpleStringSchema() , properties)); 

После исключение я получаю после выполнения:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/checkpoint/CheckpointNotifier 
at java.lang.ClassLoader.defineClass1(Native Method) 
at java.lang.ClassLoader.defineClass(Unknown Source) 
at java.security.SecureClassLoader.defineClass(Unknown Source) 
at java.net.URLClassLoader.defineClass(Unknown Source) 
at java.net.URLClassLoader.access$100(Unknown Source) 
at java.net.URLClassLoader$1.run(Unknown Source) 
at java.net.URLClassLoader$1.run(Unknown Source) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.net.URLClassLoader.findClass(Unknown Source) 
at java.lang.ClassLoader.loadClass(Unknown Source) 
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source) 
at java.lang.ClassLoader.loadClass(Unknown Source) 
at java.lang.ClassLoader.defineClass1(Native Method) 
at java.lang.ClassLoader.defineClass(Unknown Source) 
at java.security.SecureClassLoader.defineClass(Unknown Source) 
at java.net.URLClassLoader.defineClass(Unknown Source) 
at java.net.URLClassLoader.access$100(Unknown Source) 
at java.net.URLClassLoader$1.run(Unknown Source) 
at java.net.URLClassLoader$1.run(Unknown Source) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.net.URLClassLoader.findClass(Unknown Source) 
at java.lang.ClassLoader.loadClass(Unknown Source) 
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source) 
at java.lang.ClassLoader.loadClass(Unknown Source) 
at com.bt.oss.voice.main.FlnkConsumer.main(FlnkConsumer.java:50)Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.checkpoint.CheckpointNotifier 
at java.net.URLClassLoader.findClass(Unknown Source) 
at java.lang.ClassLoader.loadClass(Unknown Source) 
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source) 
at java.lang.ClassLoader.loadClass(Unknown Source) 
... 25 more 

ответ

5

Вы смешиваете версии. Потребитель Kafka для Flink 0.10.2 не будет работать с Flink 1.1.2.

Вы должны использовать разъем Кафки, снабженный Флинка 1.1.2 и включают в себя следующую Maven зависимость:

<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-connector-kafka-0.8_2.10</artifactId> 
    <version>1.1.2</version> 
</dependency> 

Пожалуйста, ознакомьтесь documentation для деталей.

Смежные вопросы