2015-07-06 6 views
1

Вот ситуация: У меня постоянно растущий набор данных, которые я хочу обрабатывать с помощью RDD через кластер Hadoop.Apache Spark: кеш и разделы

Вот небольшой пример:

val elementA = (1, Seq(2, 3)) 
val elementB = (2, Seq(1, 3)) 
val elementC = (3, Seq(1, 2)) 

val testRDD = sc.parallelize(Seq(elementA, elementB, elementC)). 
    map(x => (x._1, x._2)).setName("testRDD").cache() 

val elementD = (4, Seq(1, 3)) 
val elementD1 = (1, Seq(4)) 
val elementD2 = (3, Seq(4)) 

val testAdd = sc.parallelize(Seq(elementD, elementD1, elementD2)). 
    map(x => (x._1, x._2)).setName("testAdd") 

val testResult = testRDD.cogroup(testAdd).mapValues(x => (x._1 ++ x._2).flatten) 

Результат будет выглядеть следующим образом (порядок элементов может меняться):

(1, List(2, 3, 4)) 
(2, List(1, 3)) 
(3, List(1, 2, 4)) 
(4, List(1, 3)) 

Вот мои цели:

  1. Я хочу до .cache() мой RDD в памяти кластера.
  2. Я хочу иметь возможность добавлять новые элементы в существующее RDD.

Вот что я выяснил:

  1. Каждый раздел в РДУ кэширует отдельно и полностью (например, у меня была коллекция 100 элементов и 4 разделов, я назвал .cache().collect() и cache().first() и получил 4 кэшированных раздела в первом случае и 1 во втором случае).
  2. Результат testRDD.cogroup(testAdd) - это новый RDD, который можно снова кэшировать, и если мы попытаемся использовать var testRDD и позвоним testRDD = testRDD.cogroup(testAdd), мы потеряем связь с кэшированными данными.
  3. Я знаю, что RDD наиболее подходит для пакетных приложений, и у меня есть это: Seq() для каждого нового элемента будет вычисляться из свойств других элементов.

Есть ли способ изменить текущий RDD, не удаляя все эти элементы из кэша?

я хоть о создании своего рода временного хранения и объединения временного хранения с текущим хранением после достижения некоторого предела на временном хранении ...

ответ

1

РДА неизменен, так что вы не можете добавлять новые элементы к ним. Однако вы можете создать новый RDD, объединив исходный RDD с новыми элементами, аналогично тому, что вы сделали с вашим RDD testResult.

Если вы хотите использовать ту же переменную для нового RDD с обновлениями, вы можете использовать var, а не val для этого RDD. например

var testRDD = sc.parallelize(...) val testAdd = sc.parallelize(...) testRDD = testRDD.union(testAdd) testRDD.cache()

Это создаст линию, соединяющую два оригинальных РД. Это может вызвать проблемы, если вы слишком часто вызываете union на testRDD. Чтобы исправить это, вы можете вызвать контрольную точку на testRDD после того, как она была объединена столько раз, скажем каждые 10 обновлений. Вы также можете рассмотреть возможность переотбора на testRDD при контрольной точке.

Все элементы, добавленные в testRDD, должны оставаться в кеше, используя эту технику.

+0

Спасибо за ответ, но вот еще один вопрос - есть определенные изменения в уже кэшированных элементах (см. Мой пример), как использовать здесь 'var' hepls? Могу ли я просто вызвать 'testRDD.cache()' на 'var testRDD', и он будет автоматически кэшировать мои обновления после действий? Довольно трудно поверить в эту магию ... – SuppieRK

+0

Думаю, вам нужно будет вызывать 'testRDD.cache()' каждый раз, когда вы добавляете (объединяете) больше элементов, что фактически создает новый RDD. –

+0

Нет, вам не нужно снова вызывать '.cache()', он создаст новый 'MapPartitionRDD' (_in my case_). Но, похоже, вы можете вызывать '.cache()' один раз для вашего 'var testRDD'. – SuppieRK

Смежные вопросы