Вот ситуация: У меня постоянно растущий набор данных, которые я хочу обрабатывать с помощью RDD через кластер Hadoop.Apache Spark: кеш и разделы
Вот небольшой пример:
val elementA = (1, Seq(2, 3))
val elementB = (2, Seq(1, 3))
val elementC = (3, Seq(1, 2))
val testRDD = sc.parallelize(Seq(elementA, elementB, elementC)).
map(x => (x._1, x._2)).setName("testRDD").cache()
val elementD = (4, Seq(1, 3))
val elementD1 = (1, Seq(4))
val elementD2 = (3, Seq(4))
val testAdd = sc.parallelize(Seq(elementD, elementD1, elementD2)).
map(x => (x._1, x._2)).setName("testAdd")
val testResult = testRDD.cogroup(testAdd).mapValues(x => (x._1 ++ x._2).flatten)
Результат будет выглядеть следующим образом (порядок элементов может меняться):
(1, List(2, 3, 4))
(2, List(1, 3))
(3, List(1, 2, 4))
(4, List(1, 3))
Вот мои цели:
- Я хочу до
.cache()
мой RDD в памяти кластера. - Я хочу иметь возможность добавлять новые элементы в существующее RDD.
Вот что я выяснил:
- Каждый раздел в РДУ кэширует отдельно и полностью (например, у меня была коллекция 100 элементов и 4 разделов, я назвал
.cache().collect()
иcache().first()
и получил 4 кэшированных раздела в первом случае и 1 во втором случае). - Результат
testRDD.cogroup(testAdd)
- это новый RDD, который можно снова кэшировать, и если мы попытаемся использоватьvar testRDD
и позвонимtestRDD = testRDD.cogroup(testAdd)
, мы потеряем связь с кэшированными данными. - Я знаю, что RDD наиболее подходит для пакетных приложений, и у меня есть это:
Seq()
для каждого нового элемента будет вычисляться из свойств других элементов.
Есть ли способ изменить текущий RDD, не удаляя все эти элементы из кэша?
я хоть о создании своего рода временного хранения и объединения временного хранения с текущим хранением после достижения некоторого предела на временном хранении ...
Спасибо за ответ, но вот еще один вопрос - есть определенные изменения в уже кэшированных элементах (см. Мой пример), как использовать здесь 'var' hepls? Могу ли я просто вызвать 'testRDD.cache()' на 'var testRDD', и он будет автоматически кэшировать мои обновления после действий? Довольно трудно поверить в эту магию ... – SuppieRK
Думаю, вам нужно будет вызывать 'testRDD.cache()' каждый раз, когда вы добавляете (объединяете) больше элементов, что фактически создает новый RDD. –
Нет, вам не нужно снова вызывать '.cache()', он создаст новый 'MapPartitionRDD' (_in my case_). Но, похоже, вы можете вызывать '.cache()' один раз для вашего 'var testRDD'. – SuppieRK