2015-10-12 3 views
0

Я бегу искру локально, чтобы понять, как countByValueAndWindow работыСпарк повторение задачи indefinietly

val Array(brokers, topics) = Array("192.xx.xx.x", "test1") 

// Create context with 2 second batch interval 
val sparkConf = new SparkConf().setAppName("ReduceByWindowExample").setMaster("local[1,1]") 
val ssc = new StreamingContext(sparkConf, Seconds(2)) // batch size 2 
ssc.checkpoint("D:\\SparkCheckPointDirectory") 
// Create direct kafka stream with brokers and topics 
val topicsSet = topics.split(",").toSet 
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicsSet) 

// Get the lines, split them into words, count the words and print 
val lines = messages.map(_._2.toInt) 
val keyValuelines = lines.map { x => (x, 1) } 

val windowedlines=lines.countByValueAndWindow(Seconds(4),Seconds(2)) 
//window,interval 
// val windowedlines = lines.reduceByWindow((x, y) => { x + y }, Seconds(4) , Seconds(2)) 
    windowedlines.print() 

ssc.start() 
ssc.awaitTermination() 

все работает файл до числовых данных поставляется на тему Кафки, как я использую toInt, когда пустая строка «» написано на тему kafka, это не позволяет жаловаться NumberFormatExceotion, это нормально, но проблема в том, что он снова и снова повторяет эту неопределенность и жалуется на то же самое NumberFormatException Есть ли способ контролировать количество искровых периодов времени, чтобы попытаться преобразовать строку в Int, например Spark должен попробовать только [раз], а затем перейти к следующей партии данных

+0

Я использую Спарк 1.4 –

ответ

1

Вы должны были использовать обработку исключений как лучшую функцию языков java, scala, которые гарантируют, что программа не получит отказ. Здесь, как я отредактировал вам код, пожалуйста, проверьте, работает ли это для вас.

import scala.util.Try 

val Array(brokers, topics) = Array("192.xx.xx.x", "test1") 

// Create context with 2 second batch interval 
val sparkConf = new SparkConf().setAppName("ReduceByWindowExample").setMaster("local[1,1]") 
val ssc = new StreamingContext(sparkConf, Seconds(2)) // batch size 2 
ssc.checkpoint("D:\\SparkCheckPointDirectory") 
// Create direct kafka stream with brokers and topics 
val topicsSet = topics.split(",").toSet 
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicsSet) 

// Get the lines, split them into words, count the words and print 

val lines = messages.map(x => { 
    val convertedValue = Try(x._2.toInt) 
if (convertedValue.isSuccess) convertedValue.get else 0 
}) 

val keyValuelines = lines.map { x => (x, 1) } 

val windowedlines=lines.countByValueAndWindow(Seconds(4),Seconds(2)) 
//window,interval 
// val windowedlines = lines.reduceByWindow((x, y) => { x + y }, Seconds(4) , Seconds(2)) 
    windowedlines.print() 

ssc.start() 
ssc.awaitTermination() 
+0

Да, это всегда можно сделать, я сделал это явно, чтобы увидеть, как искра будет вести себя, я считаю, что если задача была неудачной, то он должен перейти к следующей задаче, а не повтором снова и снова –

+0

Это не так с искрами, вы должны справиться с ситуацией, иначе либо в каком-то случае она прекратит весь процесс, либо в некоторых случаях снова и снова будет перебирать ту же ошибку, но никогда не пропускает ее. –

+0

Спасибо Kshitij за помощь, получил точку. Если возможно, вы можете поделиться какой-либо ссылкой, если знаете какие-либо связанные с этой темой, могли бы помочь –

0

Хотя может быть способ сконфигурировать максимальное количество попыток для конкретной записи, я думаю, что правильный путь для этого - это фактически обработать исключение. Я считаю, что следующий код следует отфильтровать освобожденные записи:

import scala.util.Try 
... 
val keyValueLines = messages.flatMap { case (e1, e2) => 
    val e2int = Try(e2.toInt) 
    if (e2int.isSuccess) Option((e2int.get, 1)) else None 
} 

Преобразования flatMap() удаляет None «s из результата при извлечении (Int, Int) кортежа из Option для всех остальных записей.

Смежные вопросы