2016-03-30 2 views
1

Я использую подход 2 от http://spark.apache.org/docs/latest/streaming-kafka-integration.html, я использую проверку чек, и когда мне приходилось менять код и передислоцировать мой код, иногда контрольная точка бросает исключения, и если по какой-то причине мне пришлось удалить checkpoint directory, как я могу повторно использовать папки каталога контрольных точек для получения сообщений от kafka, я думаю, что в каталоге контрольной точки хранятся кафкинские смещения.spark kafka integration checkpoint повторное использование

+0

Посмотрите на это: http://blog.cloudera.com/blog/2015/03/exactly-once-spark -streaming-из-апаша-Кафка / – Markon

ответ

0

Да, я это видел. Повторно скомпилированный код иногда генерирует исключение (очень странные) при использовании контрольной точки.

Справедливости ради, точка контрольной точки заключается в том, что если есть проблема, связанная с данными, вы сможете восстановить ее.

Да: Когда вы делаете контрольную точку для KafkaDstream, ваши смещения сохраняются. Итак, когда вы выздоравливаете, вы начинаете с того места, где вы ушли последним.

Если вы хотите более мелкозернистый контроль ваших смещений Kafka, взгляните на cody's example. Он парень, который написал Direct KafkaDstream.

0

как я могу повторно использовать папки каталога контрольных точек, чтобы получать сообщения от Кафка, я думаю, что контрольная точка каталог имеет Кафка зачетов хранятся

Если вы изменили свой код, вы не можете. Контрольная точка Spark Streaming не только сохраняет базовый RDD, но также сохраняет метаданные, например, искровой график, выполняемый в момент времени, поэтому он может восстанавливать линию при возобновлении после сбоя, например.

Это означает, что если вы измените свой код, вам нужно будет удалить любые контрольные данные перед возобновлением вашей потоковой работы. Это также означает, что смещения kafka, хранящиеся в вашем каталоге контрольных точек, вам бесполезны. Вот почему рекомендуется вручную сохранять ваши смещения, если вы используете подход без потоков, введенный в Spark 1.3.

From the documentation (курсив мой):

Обратите внимание, что один недостаток этого подхода заключается в том, что она не обновляет смещения в Zookeeper, следовательно Zookeeper основе Кафка инструменты мониторинга не будет показывать прогресс. Однако вы можете получить доступ к смещениям обработанных с помощью этого подхода в каждой партии и обновить Zookeeper самостоятельно (см. ниже).

Там даже фрагмент кода, чтобы вы начали:

// Hold a reference to the current offset ranges, so it can be used downstream 
var offsetRanges = Array[OffsetRange]() 

directKafkaStream.transform { rdd => 
    offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
    rdd 
}.map { // Do stuff 
}.foreachRDD { rdd => 
    for (o <- offsetRanges) { 
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") 
    } 
} 
Смежные вопросы