2015-07-13 2 views
0

Есть ли способ собрать все RDD[(String, String)] в один RDD[Map[String, String]]?Spark RDD [(String, String)] в RDD [Карта [String, String]]

Е.Г., для файла input.csv:

1,one 
2,two 
3,three 

Код:

val file = sc.textFile("input.csv") 
val pairs = file.map(line => { val a = line.split(","); (a(0), a(1)) }) 
val rddMap = ??? 

выход (приблизительно):

val map = rddMap.collect 
map: Array[scala.collection.immutable.Map[String,String]] = Array(Map(1 -> one, 2 -> two, 3 -> three)) 

Пробовал pairs.collectAsMap но он возвращает Map не внутри RDD.

+4

Но почему вы хотите получить карту внутри RDD? RDD - это коллекция, и, насколько я могу судить по вашему коду, вы просто хотите одну карту, поэтому нет смысла обертывать ее RDD только одним элементом. –

+0

Я хочу кэшировать эту карту между несколькими заданиями. Все решения, которые я нашел, работали с RDD, а не с обычными объектами. – red1ynx

+0

По-прежнему спрашивайте себя, хотите ли вы поделится «RDD [Map [String, String]]. Таким образом, вы не можете смириться с параллелизмом.Если карта мала и вам действительно нужна карта, возможно, посмотрите на широковещательные переменные и аккумуляторы (https://spark.apache.org/docs/latest/programming-guide.html#shared-variables). –

ответ

0

Если вы хотите сохранить свою карту только на время выполнения программы вашего дирижара, вы можете собрать ее на локальную карту (в драйвере) для следующей задачи, она будет доступна ближе (вы можете просто использовать ее в функции, переданной следующей задаче). Если вы не хотите траспортировать его много раз, вы можете транслировать его.

С другой стороны, если вы хотите использовать его в разных программах драйвера, вы можете просто его сериализовать и сохранить на hdfs (или любую другую используемую вами систему хранения). В этом случае, даже если у вас есть RDD, вы не сможете сохранить его между драйверами, не сохраняя его в файловой системе.

+0

Существуют внешние системы кэширования, такие как Tachyon, не обязательные для сохранения в файловой системе. В любом случае мне нужно использовать 'RDD.persist()'. – red1ynx

+0

Это сохраняется в распределенном fasion, и нет смысла делать это с помощью локального объекта. Сохраняет ли постоянная карта между потоками и считывает их с драйвером слишком дорого для вас? – abalcerek

+0

Простой взлом будет 'sc.parallelize (List (localMap)). Persist()'. Я не думаю, что это хорошая идея. – abalcerek

1

Я действительно не согласен с тем, что вы пытаетесь сделать. Потому что, если вы это сделаете, вы будете распределены по кластеру, но это не будет одна карта!

Вы можете использовать пару RDD с ключом и использовать метод lookup, чтобы найти свое значение по заданному ключу!

def lookup(key: K): Seq[V] // Return the list of values in the RDD for key key. 

А вот пример про это использование:

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2) 
val b = a.map(x => (x.length, x)) 
b.lookup(5) 
res0: Seq[String] = WrappedArray(tiger, eagle) 

Для получения дополнительной информации о pair RDDs, я полагаю, что вы читали Chapter 4. Working with Key/Value Pairs - Learning Spark.

0

Сколько карт вы бы получили в RDD[Map[String, String]]? Только один, не так ли? RDD распространяет свой контент, потому что это распределенная коллекция , но если он содержит только один элемент, то становится довольно сложным распространять эту коллекцию, не так ли?

Я предлагаю вам искать хэш-поиск в PairRDD от String. К счастью, у вас уже есть это в API, с функцией lookup.

Посмотрите на code for lookup, он использует hashing, чтобы добраться до вашего ключа, аналогичным образом будет Map. Правильное создание ключей и значений в вашем PairRDD достаточно для ваших целей, даже если их создание сложное.

Смежные вопросы