2016-04-12 3 views
1

У меня есть этот простой Кафка потокКафка directstream dstream карта не печатает

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 

// Each Kafka message is a flight 
val flights = messages.map(_._2) 

flights.foreachRDD(rdd => { 
    println("--- New RDD with " + rdd.partitions.length + " partitions and " + rdd.count() + " flight records"); 
    rdd.map { flight => {   
    val flightRows = FlightParser.parse(flight) 
    println ("Parsed num rows: " + flightRows) 
    } 
    }   
}) 

ssc.start() 
ssc.awaitTermination() 

Кафки сообщение, Спарк Streaming он в состоянии получить их как РД. Но второй println в моем коде ничего не печатает. я смотрел журналы журналов драйверов при запуске в локальном режиме [2], проверял журналы нитей при запуске в режиме «пряжа-клиент».

Что мне не хватает?

Вместо rdd.map, следующий код печатает хорошо в консоли водителя искры:

for(flight <- rdd.collect().toArray) { 
    val flightRows = FlightParser.parse(flight) 
    println ("Parsed num rows: " + flightRows) 
} 

Но я боюсь, что обработка этого объекта полета может произойти в проекте водителя искры, а не исполнитель. Пожалуйста, поправьте меня, если я ошибаюсь.

Благодаря

+1

Вы посмотрели журналы работника * исполнителя *? Возможно, он не нашел ваш класс FlightParser? –

ответ

1

rdd.map является ленивым преобразование. Это не будет реализовано, если на этот RDD не будет вызвано действие.
В этом конкретном случае мы могли бы использовать rdd.foreach, который является одним из самых общих действий на RDD, предоставляя нам доступ к каждому элементу в RDD.

flights.foreachRDD{ rdd => 
    rdd.foreach { flight =>   
     val flightRows = FlightParser.parse(flight) 
     println ("Parsed num rows: " + flightRows) // prints on the stdout of each executor independently 
    } 
} 

Учитывая, что это РДД действие выполняется в исполнителях, мы найдем Println вывод в STDOUT Исполнителя.

Если вы хотите напечатать данные на драйвере, вы можете collect данные RDD в пределах DStream.foreachRDD закрытия.

flights.foreachRDD{ rdd => 
    val allFlights = rdd.collect() 
    println(allFlights.mkString("\n")) // prints to the stdout of the driver 
} 
+0

Спасибо @massg за ваше предложение. Когда я пробую свой первый подход, я получаю следующее исключение: org.apache.spark.SparkException: Задача не сериализуемый Вызванный: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext Я предполагаю, что это происходит потому, что переменная полета доступна только в Spark Driver, а не у исполнителей. Что мне не хватает? –

Смежные вопросы