tl; dr Кажется, что работает. Просьба привести пример, который терпит неудачу.
Я использую Spark 2.0.2 (это было выпущено today).
Мой пример выглядит следующим образом (с некоторым кодом удалены для краткости):
val ssc = new StreamingContext(sc, Seconds(10))
import org.apache.spark.streaming.kafka010._
val dstream = KafkaUtils.createDirectStream[String, String](
ssc,
preferredHosts,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets))
def reduceFunc(v1: String, v2: String) = s"$v1 + $v2"
dstream.map { r =>
println(s"value: ${r.value}")
val Array(key, value) = r.value.split("\\s+")
println(s">>> key = $key")
println(s">>> value = $value")
(key, value)
}.reduceByKeyAndWindow(
reduceFunc, windowDuration = Seconds(30), slideDuration = Seconds(10))
.print()
dstream.foreachRDD { rdd =>
// Get the offset ranges in the RDD
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to ${o.untilOffset}")
}
}
ssc.start
Что бы вы изменили, чтобы увидеть исключение (ы) вы столкнулись?
Весь проект доступен как spark-streaming-kafka-direct.
Пожалуйста, введите код, который вы используете прямо сейчас (https://stackoverflow.com/help/mcve). –