2016-11-14 3 views
1

Я использую Спарк Кафка интеграцию 0.10 и мне нужен два уровня агрегации на потоке:Кафка Спарк Streaming множественной агрегация

  1. Первая поминутная интервал
  2. Другой просуммировать по 15 минут интервал.

Также предпочтение заключается в накоплении значений минутного интервала, а затем сбросе его, когда 15 минут превышены. B/c 15-минутные значения должны сохраняться.

Наличие двух reduceByKeysByWindow s на разных раздвижных окнах не работает, поскольку оно дает исключение KafkaConcurrentModification.

+1

Пожалуйста, введите код, который вы используете прямо сейчас (https://stackoverflow.com/help/mcve). –

ответ

0

tl; dr Кажется, что работает. Просьба привести пример, который терпит неудачу.

Я использую Spark 2.0.2 (это было выпущено today).

Мой пример выглядит следующим образом (с некоторым кодом удалены для краткости):

val ssc = new StreamingContext(sc, Seconds(10)) 
import org.apache.spark.streaming.kafka010._ 

val dstream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    preferredHosts, 
    ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)) 

def reduceFunc(v1: String, v2: String) = s"$v1 + $v2" 
dstream.map { r => 
    println(s"value: ${r.value}") 
    val Array(key, value) = r.value.split("\\s+") 
    println(s">>> key = $key") 
    println(s">>> value = $value") 
    (key, value) 
}.reduceByKeyAndWindow(
    reduceFunc, windowDuration = Seconds(30), slideDuration = Seconds(10)) 
    .print() 

dstream.foreachRDD { rdd => 
    // Get the offset ranges in the RDD 
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
    for (o <- offsetRanges) { 
    println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to ${o.untilOffset}") 
    } 
} 

ssc.start 

Что бы вы изменили, чтобы увидеть исключение (ы) вы столкнулись?

Весь проект доступен как spark-streaming-kafka-direct.

Смежные вопросы