Работа с Kafka Spark-Streaming. Возможность читать и обрабатывать данные, отправленные от Продюсера. У меня есть сценарий здесь, давайте предположим, что продюсер создает сообщения, а Consumer отключается на некоторое время и включается. Теперь Conumser только считывает данные в реальном времени. Вместо этого он должен был также сохранить данные, откуда он прекратил чтение. Вот мой pom.xml, который я использовал.Kafka Spark-Streaming offset issue
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spark.version>2.0.1</spark.version>
<kafka.version>0.8.2.2</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.json/json -->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20160810</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.json4s/json4s-ast_2.11 -->
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-ast_2.11</artifactId>
<version>3.2.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.2.0</version>
</dependency>
Я пробовал работать с Kafka-v0.10.1.0 Продюсером и Conumser. Поведение такое же, как ожидалось (потребитель читает данные от того, где он остался). Итак, в этой версии смещение забирается правильно.
Пробовал использовать ту же версию в вышеуказанном pom.xml, но не с java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
.
Я понимаю совместимость версий, но я также ищу непрерывный поток.
Вы посмотрели мой ответ? Были ли решены проблемы? – oh54