2014-10-20 2 views
2

Я новичок искры, я использую Спарк потоковое с Кафкой ..Спарк Streaming Cache и Трансформации

Моя длительность потокового 1 секунда.

Предположим, я получаю 100 записей в 1-й партии и 120 записей в 2-й партии и 80 записей в 3-й партии

--> {sec 1 1,2,...100} --> {sec 2 1,2..120} --> {sec 3 1,2,..80} 

я применяю свою логику в 1-й партии и есть результат => Result1

I хочу использовать result1 во время обработки 2-й партии и иметь объединенный результат как результата1, так и 120 записей 2-й партии as => result2

Я попытался кэшировать результат, но я не могу получить кешированный результат1 в 2s is Возможно? или показать некоторый свет о том, как достичь моей цели здесь?

JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, String.class,String.class, StringDecoder.class,StringDecoder.class, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2()); 

Я обрабатываю сообщения и нахожу слово, которое является результатом в течение 1 секунды.

if(resultCp!=null){ 
       resultCp.print(); 
       result = resultCp.union(words.mapValues(new Sum())); 

      }else{ 
       result = words.mapValues(new Sum()); 
      } 

resultCp = result.cache(); 

, когда во 2-й партии resultCp не должен быть пустым, но она возвращает нулевое значение, так что в любой момент времени у меня есть, что конкретные данные секунд в одиночку я хочу найти кумулятивный результат. Кто-нибудь знает, как это сделать.

Я узнал, что как только начинается искровой поток jssc.start() контроль больше не на нашем конце, он лежит с искрами. Можно ли отправить результат 1-й партии во вторую партию, чтобы найти накопленное значение?

Любая помощь очень ценится. Заранее спасибо.

ответ

1

Я думаю, что вы ищете updateStateByKey, который создает новый DStream, применяя кумулятивную функцию к предоставленному DStream и некоторому состоянию. Этот пример из примера Спарк пакет охватывает случай в вопросе:

Во-первых, вам нужно функцию обновления, которая принимает новые значения и ранее известное значение:

val updateFunc = (values: Seq[Int], state: Option[Int]) => { 
    val currentCount = values.sum 

    val previousCount = state.getOrElse(0) 

    Some(currentCount + previousCount) 
} 

Эта функция используется для создания Dstream, который кумулирует значения из источника dstream. Как это:

// Create a NetworkInputDStream on target ip:port and count the 
// words in input stream of \n delimited test (eg. generated by 'nc') 
val lines = ssc.socketTextStream(args(0), args(1).toInt) 
val words = lines.flatMap(_.split(" ")) 
val wordDstream = words.map(x => (x, 1)) 

// Update the cumulative count using updateStateByKey 
// This will give a Dstream made of state (which is the cumulative count of the words) 
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) 

Источник: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala

+0

Спасибо я отсортированные это сам, спасибо за ваше время :) – mithra

Смежные вопросы