2016-06-07 2 views
1

У меня есть процесс потокового искры, который считывает данные из kafka, в DStream.Кэширование DStream в Spark Streaming

В моем трубопроводе я два раза (один за другим):

DStream.foreachRDD (преобразования по РДУ и вставляя в пункт назначения).

(каждый раз, когда я делаю разные обработки и вставляю данные в другое место назначения).

Мне было интересно, как бы DStream.cache, сразу после того, как я прочитал данные из работы Кафки? Можно ли это сделать?

Этот процесс теперь фактически считывает данные два раза из Кафки?

Пожалуйста, имейте в виду, что это не представляется возможным поставить два foreachRDDs в один (потому что два пути совершенно различны, есть statefull преобразования есть - которые должны быть appliend на DStream ...)

Спасибо за помощь

+0

Dstream.cache будет работать. Он кэширует поток в первый раз, когда видит действие. А для последующего действия в DStream он использует кеш. – Knight71

+0

@ Knight71 Мне также нужно установить DStream.unpersist (true), как и при кешировании RDD, в конце, когда DStream больше не нужен? –

+0

Данные Dstream будут очищены автоматически после всех операций и будут решаться искровым потоком на основе преобразований. – Knight71

ответ

3

Там вы два варианта:

  • Dstream.cache() Используйте, чтобы отметить основные РДУ в кэш. Spark Streaming позаботится о том, чтобы отменить RDD после таймаута, контролируемого конфигурацией spark.cleaner.ttl.

  • Используйте дополнительные foreachRDD применять cache() и unpersist(false) побочные операции с РДУ в DStream:

например:

val kafkaDStream = ??? 
val targetRDD = kafkaRDD 
         .transformation(...) 
         .transformation(...) 
         ... 
// Right before the lineage fork mark the RDD as cacheable: 
targetRDD.foreachRDD{rdd => rdd.cache(...)} 
targetRDD.foreachRDD{do stuff 1} 
targetRDD.foreachRDD{do stuff 2} 
targetRDD.foreachRDD{rdd => rdd.unpersist(false)} 

Обратите внимание, что вы можете включить кэш как первый заявление do stuff 1, если это вариант.

Я предпочитаю этот вариант, потому что он дает мне тонкий контроль над жизненным циклом кеша и позволяет мне очищать вещи, как только это необходимо, а не в зависимости от ttl.

+0

'' 'spark.cleaner.ttl''' удален. Что нового контроля над этим? – okwap

Смежные вопросы