0

Работа с Kafka Spark-Streaming. Возможность читать и обрабатывать данные, отправленные от Продюсера. У меня есть сценарий здесь, давайте предположим, что продюсер создает сообщения, а Consumer отключается на некоторое время и включается. Теперь Conumser только считывает данные в реальном времени. Вместо этого он должен был также сохранить данные, откуда он прекратил чтение. Вот мой pom.xml, который я использовал.Kafka Spark-Streaming offset issue

<properties> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
     <spark.version>2.0.1</spark.version> 
     <kafka.version>0.8.2.2</kafka.version> 
    </properties> 


    <dependencies> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming_2.11</artifactId> 
      <version>${spark.version}</version> 
      <scope>provided</scope> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 --> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming-kafka_2.11</artifactId> 
      <version>1.6.2</version> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 --> 
     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka_2.11</artifactId> 
      <version>${kafka.version}</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>${kafka.version}</version> 
     </dependency> 

     <dependency> 
      <groupId>org.scala-lang</groupId> 
      <artifactId>scala-library</artifactId> 
      <version>2.11.1</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.11</artifactId> 
      <version>${spark.version}</version> 
      <scope>provided</scope> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.json/json --> 
     <dependency> 
      <groupId>org.json</groupId> 
      <artifactId>json</artifactId> 
      <version>20160810</version> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.json4s/json4s-ast_2.11 --> 
     <dependency> 
      <groupId>org.json4s</groupId> 
      <artifactId>json4s-ast_2.11</artifactId> 
      <version>3.2.11</version> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> 
     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-common</artifactId> 
      <version>2.2.0</version> 
     </dependency> 

Я пробовал работать с Kafka-v0.10.1.0 Продюсером и Conumser. Поведение такое же, как ожидалось (потребитель читает данные от того, где он остался). Итак, в этой версии смещение забирается правильно.

Пробовал использовать ту же версию в вышеуказанном pom.xml, но не с java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker.

Я понимаю совместимость версий, но я также ищу непрерывный поток.

+0

Вы посмотрели мой ответ? Были ли решены проблемы? – oh54

ответ

0

Различное поведение, вероятно, связано с тем, что Kafka претерпела довольно большие изменения между версиями 0.8 и 0.10.

Если вам не нужно использовать старую версию, я предлагаю переключиться на более новые.

Посмотрите на эту ссылку:

https://spark.apache.org/docs/latest/streaming-kafka-integration.html

Проект Кафка представил новый потребительский апи между версиями 0.8 и 0.10, так что есть 2 отдельные соответствующие пакеты Спарк Streaming доступны.

Если вы хотите использовать Кафка v0.10.1.0, вы должны указать, таким образом, некоторые Кафка искру потоковую интеграции зависимость в https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11.

Нечто подобное, например:

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11 --> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> 
     <version>2.1.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.11</artifactId> 
     <version>2.1.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.11</artifactId> 
     <version>0.10.1.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.11</artifactId> 
     <version>2.1.0</version> 
    </dependency> 

Дополнительное примечание: вы используете Hadoop 2.2.0, который был выпущен в октябре 2013 года и является, таким образом, с точки зрения древней Hadoop, вы должны рассмотреть вопрос об изменении его к более новая версия.

Дайте мне знать, если это поможет.

Смежные вопросы