2016-11-15 4 views
4

Я пытаюсь прочитать сообщения от kafka (версия 10) в искры и попытаться распечатать его.Запросы с потоковыми источниками должны выполняться с помощью writeStream.start();

 import spark.implicits._ 

     val spark = SparkSession 
       .builder 
       .appName("StructuredNetworkWordCount") 
       .config("spark.master", "local") 
       .getOrCreate() 

      val ds1 = spark.readStream.format("kafka") 
    .option("kafka.bootstrap.servers", "localhost:9092") 
.option("subscribe", "topicA") .load() 
      ds1.collect.foreach(println) 
     ds1.writeStream 
      .format("console") 
      .start() 
      ds1.printSchema() 

получаю исключение ошибок в потоке «основные» org.apache.spark.sql.AnalysisException: Запросы с источниками потоковой передачи должны быть выполнены с writeStream.start() ;;

ответ

5

Вы ветвление план запроса: из того же DS1 вы пытаетесь:

  • ds1.collect.foreach(...)
  • ds1.writeStream.format(...){...}

Но вы только вызовом .start() на второй ветви, оставляя другое свисание без прерывания, которое, в свою очередь, вызывает исключение, которое вы возвращаете.

+0

так в чем же проблема? – user1870400

+0

'.start()' обе ветви? Это нижний план? – ssice

+0

Я второй комментарий здесь. Можем ли мы получить правильное решение здесь? Может быть, образец кода? Благодаря! – DataGeek

1

Я столкнулся с таким же вопросом.i исправленный выпуск используя нижеследующий код. Это может быть полезно для исправленной проблемы.

val df = session 
    .readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers", brokers) 
    .option("subscribe", "streamTest2") 
    .load(); 

    val query = df.writeStream 
    .outputMode("append") 
    .format("console") 
    .start() 
query.awaitTermination() 
0

при чтении о сообщении об ошибке

org.apache.spark.sql.AnalysisException: Запросы с источниками потоковых должны быть выполнены с writeStream.start() ;;

Я нашел это article, что объясняет это и предоставляет другой подход. Я попробую сам и опубликую результаты позже, если это сработает для меня.

Смежные вопросы