0

среды:Flink с Кафки подключения

Ubuntu 16.04.1 LTS 
Flink 1.1.3 
Kakfa 0.10.1.1 

Я пытаюсь соединить Flink с Кафка (Флинка 1.1.3 Kakfa 0.10.1.1) Я уже попробовать все исправления, которые я мог бы найти, но ни один из они работают.

pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
<modelVersion>4.0.0</modelVersion> 
<groupId>ux</groupId> 
<artifactId>logs</artifactId> 
<version>1.3-SNAPSHOT</version> 
<packaging>jar</packaging> 
<name>Flink Quickstart Job</name> 
<url>http://www.myorganization.org</url> 
<properties> 
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    <flink.version>1.3-SNAPSHOT</flink.version> 
    <slf4j.version>1.7.7</slf4j.version> 
    <log4j.version>1.2.17</log4j.version> 
</properties> 
<repositories> 
    <repository> 
     <id>apache.snapshots</id> 
     <name>Apache Development Snapshot Repository</name> 
     <url>https://repository.apache.org/content/repositories/snapshots/</url> 
     <releases> 
      <enabled>false</enabled> 
     </releases> 
     <snapshots> 
      <enabled>true</enabled> 
     </snapshots> 
    </repository> 
</repositories> 
<dependencies> 
    <dependency> 
     <groupId>org.apache.flink</groupId> 
     <artifactId>flink-java</artifactId> 
     <version>${flink.version}</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.flink</groupId> 
     <artifactId>flink-streaming-java_2.11</artifactId> 
     <version>${flink.version}</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.flink</groupId> 
     <artifactId>flink-clients_2.11</artifactId> 
     <version>${flink.version}</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.flink</groupId> 
     <artifactId>flink-connector-kafka-0.10_2.10</artifactId> 
     <version>1.3-SNAPSHOT</version> 
    </dependency> 
    <dependency> 
     <groupId>org.slf4j</groupId> 
     <artifactId>slf4j-log4j12</artifactId> 
     <version>${slf4j.version}</version> 
    </dependency> 
    <dependency> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
     <version>${log4j.version}</version> 
    </dependency> 
</dependencies> 
<profiles> 
    <profile> 
     <id>build-jar</id> 
     <activation> 
      <activeByDefault>false</activeByDefault> 
     </activation> 
     <dependencies> 
      <dependency> 
       <groupId>org.apache.flink</groupId> 
       <artifactId>flink-java</artifactId> 
       <version>${flink.version}</version> 
       <scope>provided</scope> 
      </dependency> 
      <dependency> 
       <groupId>org.apache.flink</groupId> 
       <artifactId>flink-streaming-java_2.11</artifactId> 
       <version>${flink.version}</version> 
       <scope>provided</scope> 
      </dependency> 
      <dependency> 
       <groupId>org.apache.flink</groupId> 
       <artifactId>flink-clients_2.11</artifactId> 
       <version>${flink.version}</version> 
       <scope>provided</scope> 
      </dependency> 
      <dependency> 
       <groupId>org.slf4j</groupId> 
       <artifactId>slf4j-log4j12</artifactId> 
       <version>${slf4j.version}</version> 
       <scope>provided</scope> 
      </dependency> 
      <dependency> 
       <groupId>log4j</groupId> 
       <artifactId>log4j</artifactId> 
       <version>${log4j.version}</version> 
       <scope>provided</scope> 
      </dependency> 
     </dependencies> 
     <build> 
      <plugins> 
       <plugin> 
        <groupId>org.apache.maven.plugins</groupId> 
        <artifactId>maven-shade-plugin</artifactId> 
        <version>2.4.1</version> 
        <executions> 
         <execution> 
          <phase>package</phase> 
          <goals> 
           <goal>shade</goal> 
          </goals> 
          <configuration> 
           <artifactSet> 
            <excludes combine.self="override"></excludes> 
           </artifactSet> 
          </configuration> 
         </execution> 
        </executions> 
       </plugin> 
      </plugins> 
     </build> 
    </profile> 
</profiles> 
<build> 
    <plugins> 
     <plugin> 
      <groupId>org.apache.maven.plugins</groupId> 
      <artifactId>maven-shade-plugin</artifactId> 
      <version>2.4.1</version> 
      <executions> 
       <execution> 
        <phase>package</phase> 
        <goals> 
         <goal>shade</goal> 
        </goals> 
        <configuration> 
         <artifactSet> 
          <excludes> 
           <exclude>org.apache.flink:flink-annotations</exclude> 
           <exclude>org.apache.flink:flink-shaded-hadoop2</exclude> 
           <exclude>org.apache.flink:flink-shaded-curator-recipes</exclude> 
           <exclude>org.apache.flink:flink-core</exclude> 
           <exclude>org.apache.flink:flink-scala_2.11</exclude> 
           <exclude>org.apache.flink:flink-runtime_2.11</exclude> 
           <exclude>org.apache.flink:flink-optimizer_2.11</exclude> 
           <exclude>org.apache.flink:flink-clients_2.11</exclude> 
           <exclude>org.apache.flink:flink-avro_2.11</exclude> 
           <exclude>org.apache.flink:flink-examples-batch_2.11</exclude> 
           <exclude>org.apache.flink:flink-examples-streaming_2.11</exclude> 
           <exclude>org.apache.flink:flink-streaming-scala_2.11</exclude> 
           <exclude>org.apache.flink:flink-scala-shell_2.11</exclude> 
           <exclude>org.apache.flink:flink-python</exclude> 
           <exclude>org.apache.flink:flink-metrics-core</exclude> 
           <exclude>org.apache.flink:flink-metrics-jmx</exclude> 
           <exclude>org.apache.flink:flink-statebackend-rocksdb_2.11</exclude> 
           <exclude>log4j:log4j</exclude> 
           <exclude>org.scala-lang:scala-library</exclude> 
           <exclude>org.scala-lang:scala-compiler</exclude> 
           <exclude>org.scala-lang:scala-reflect</exclude> 
           <exclude>com.data-artisans:flakka-actor_*</exclude> 
           <exclude>com.data-artisans:flakka-remote_*</exclude> 
           <exclude>com.data-artisans:flakka-slf4j_*</exclude> 
           <exclude>io.netty:netty-all</exclude> 
           <exclude>io.netty:netty</exclude> 
           <exclude>commons-fileupload:commons-fileupload</exclude> 
           <exclude>org.apache.avro:avro</exclude> 
           <exclude>commons-collections:commons-collections</exclude> 
           <exclude>org.codehaus.jackson:jackson-core-asl</exclude> 
           <exclude>org.codehaus.jackson:jackson-mapper-asl</exclude> 
           <exclude>com.thoughtworks.paranamer:paranamer</exclude> 
           <exclude>org.xerial.snappy:snappy-java</exclude> 
           <exclude>org.apache.commons:commons-compress</exclude> 
           <exclude>org.tukaani:xz</exclude> 
           <exclude>com.esotericsoftware.kryo:kryo</exclude> 
           <exclude>com.esotericsoftware.minlog:minlog</exclude> 
           <exclude>org.objenesis:objenesis</exclude> 
           <exclude>com.twitter:chill_*</exclude> 
           <exclude>com.twitter:chill-java</exclude> 
           <exclude>commons-lang:commons-lang</exclude> 
           <exclude>junit:junit</exclude> 
           <exclude>org.apache.commons:commons-lang3</exclude> 
           <exclude>org.slf4j:slf4j-api</exclude> 
           <exclude>org.slf4j:slf4j-log4j12</exclude> 
           <exclude>log4j:log4j</exclude> 
           <exclude>org.apache.commons:commons-math</exclude> 
           <exclude>org.apache.sling:org.apache.sling.commons.json</exclude> 
           <exclude>commons-logging:commons-logging</exclude> 
           <exclude>commons-codec:commons-codec</exclude> 
           <exclude>com.fasterxml.jackson.core:jackson-core</exclude> 
           <exclude>com.fasterxml.jackson.core:jackson-databind</exclude> 
           <exclude>com.fasterxml.jackson.core:jackson-annotations</exclude> 
           <exclude>stax:stax-api</exclude> 
           <exclude>com.typesafe:config</exclude> 
           <exclude>org.uncommons.maths:uncommons-maths</exclude> 
           <exclude>com.github.scopt:scopt_*</exclude> 
           <exclude>commons-io:commons-io</exclude> 
           <exclude>commons-cli:commons-cli</exclude> 
          </excludes> 
         </artifactSet> 
         <filters> 
          <filter> 
           <artifact>org.apache.flink:*</artifact> 
           <excludes> 
            <exclude>org/apache/flink/shaded/com/**</exclude> 
            <exclude>web-docs/**</exclude> 
           </excludes> 
          </filter> 
          <filter> 
           <artifact>*:*</artifact> 
           <excludes> 
            <exclude>META-INF/*.SF</exclude> 
            <exclude>META-INF/*.DSA</exclude> 
            <exclude>META-INF/*.RSA</exclude> 
           </excludes> 
          </filter> 
         </filters> 
         <createDependencyReducedPom>false</createDependencyReducedPom> 
        </configuration> 
       </execution> 
      </executions> 
     </plugin> 
     <plugin> 
      <groupId>org.apache.maven.plugins</groupId> 
      <artifactId>maven-compiler-plugin</artifactId> 
      <version>3.1</version> 
      <configuration> 
       <source>1.8</source> 
       <target>1.8</target> 
      </configuration> 
     </plugin> 
    </plugins> 
</build> 

мой код Java:

import java.util.Properties; 
import org.apache.flink.api.common.functions.MapFunction; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema; 

public class App 
{ 
    public static void main(String[] args) throws Exception { 

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

     Properties properties = new Properties(); 
     properties.setProperty("bootstrap.servers", "localhost:9092"); 
     properties.setProperty("zookeeper.connect", "localhost:2181"); 
     properties.setProperty("group.id", "flink_consumer"); 

     DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<> 
      ("ux_logs", new SimpleStringSchema(), properties)); 

     messageStream.rebalance().map(new MapFunction<String, String>() { 

      private static final long serialVersionUID = -6867736771747690202L; 

      public String map(String value) throws Exception { 
       return "Kafka and Flink says: " + value; 
      } 
     }).print(); 

     env.execute(); 
    } 
} 

Но я получаю эту ошибку:

java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/checkpoint/CheckpointedFunction 
    at java.lang.ClassLoader.defineClass1(Native Method) 
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763) 
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) 
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at java.lang.ClassLoader.defineClass1(Native Method) 
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763) 
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) 
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at java.lang.ClassLoader.defineClass1(Native Method) 
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763) 
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) 
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at ux.App.main(App.java:27) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) 
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) 
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320) 
    at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) 
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) 
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) 
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) 
    Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.checkpoint.CheckpointedFunction 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 

Нужно ли мне удалить мою кафку и запустить более старую версию? Неправильно ли мой соединитель кафки flink? Я пытался использовать этот плагин, но он не работал. (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/linking.html)

Спасибо.

+1

К сожалению Flink 1.1.x не предоставляет потребителю для Кафка 0,10. К счастью, хотя Flink 1.2.0 делает. Поэтому попробуйте обновить версию flink до версии 1.2.0. Также измените версию разъема flink-kafka на 1.2.0. –

ответ

2

Вы должны понизить разъем

<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-connector-kafka-0.8_2.10</artifactId> 
    <version>1.1.2</version> 
</dependency> 

Вот подобный вопрос: https://stackoverflow.com/a/40037895/1252056

+0

Спасибо за ответ. Я уже устал, что исправить, но я получаю эту ошибку: java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/checkpoint/CheckpointNotifier – fddias

0

Кафка 0,9 и более новые версии не нужно Zookeeper. Вам необходимо обновить Flink или понизить Кафку версию

--topic myTopic --bootstrap.servers 10.123.34.56:9092 --group.id myGroup 

FlinkKafkaConsumer09<CarDB> consumerBinden = new FlinkKafkaConsumer09<CarDB>(
       kafkaTopic, 
       new ClassClassSchema(), 
       parameterTool.getProperties()); 


<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-connector-kafka-0.9_2.10</artifactId> 
    <version>${flink.version}</version> 
</dependency> 
+0

Что такое зоопарк? – MadPhysicist

Смежные вопросы