2015-04-23 4 views
0

У меня есть эти строки:Спарк Кортеж получить детали/РДД на ключ

(key1,Illinois|111|67342|...) 
(key1,Illinois|121|67142|...) 
(key2,Hawaii|113|67343|...) 
(key1,Illinois|211|67442|...) 
(key3,Hawaii|153|66343|...) 
(key3,Ohio|193|68343|...) 

(1) Как я могу получить уникальные ключи?

(2) Как получить количество строк PER ключа (key1 - 3 строки, key2 - 1 строка, ключ 3 - 2 строки ... так что результат будет: 3,1,2)

(3) Как получить размер в байтах строк на ключ (5МБ, 2MB, 3MB)


EDIT 1. это мой новый код:

val rdd : RDD[(String, Array[String])] = ... 
val rdd_res = rdd.groupByKey().map(row => (row._1, row._2.size, byteSize(row._2))) 

val rddKeys = rdd_res.map(row => row._1) 
val rddCount = rdd_res.map(row => row._2)  
val rddByteSize = rdd_res.map(row => row._3) 

Как реализовать byteSize? Я хочу получить размер, который будет сохранен на диске.


EDIT 2.

val rdd_res : RDD[(String, (Int, Int))] = rdd.aggregateByKey((0,0))((accum, value) => (accum._1 + 1, accum._2 + size(value)), (first, second) => (first._1 + second._1, first._2 + second._2)) 

    val rdd_res_keys = rdd_res.map(row=>row._1).collect().mkString(",") 
    val rdd_res_count = rdd_res.map(row=>row._2).collect().map(_._1).mkString(",") 
    val rdd_res_bytes = rdd_res.map(row=>row._2).collect().map(_._2).mkString(",") 

ответ

1

Для различных ключей, вы должны изменить порядок:

rdd.keys.distinct.collect 

Но технически это получить из подсчета ключей в карте ... countByKey через который возвращает карту key->count

rdd.countByKey 

И, чтобы получить размер байта, вам следует рассмотреть this SO question, так как он будет зависеть от декодирования. Но после того, как вы определились с методом размера, то вы можете получить его с помощью:

rdd.aggregateByKey(0)((accum, value) => accum + size(value), _ + _) 

Или, вы можете сделать все это в одном:

rdd.aggregateByKey((0,0))((accum, value) => (accum._1 + 1, accum._2 + size(value), (first, second) => (first._1 + second._1, first._2 + second._2)) 

Который должен дать в RDD[(String, (Int, Int))] где первый элемент в кортеже - это количество ключей, а второе - размер ключа

+0

@sophie Kiiiind of ... Вы должны собирать только один раз и использовать эти данные. Данные могут размещаться по горячему пути, но вы все еще используете одну и ту же DAG 3 раза. –

0

Учитывая у вас есть пара RDD из (ключ, значение).

Вы можете получить ключ и рассчитывать, используя ниже

rdd_res = rdd_inp.countByKey 

Вы можете перечислить размер для ключа с помощью ниже

rdd_size_res = rdd_inp.groupByKey().map((a,b)=>(a,size(b))) 

def size(src: List[String]):List[String] = { 

    src.map(a => (32 + a.length() * 2).toString()) 


    } 

Пожалуйста, проверьте, если вышеуказанные работы по вашему сценарию.

+0

Отредактировано мое сообщение, чтобы показать новый код, я использовал карту вместо foreach, так как последняя возвращает Unit. Для реализации byteSize я не могу заставить вашу работу работать. Я попробовал row._2.mkString (","). Length * 2 + 32), но он не соответствует размеру диска. (577432 bytes vs 6.7MB) – sophie

+0

Я не знаю, насколько большой набор данных может быть, но reduceByKey будет более быстрым вариантом для подсчета. –

+0

rdd может содержать 1 миллион строк. отдельный ключ будет около 100. im не уверен, как это сделать с помощью reduceByKey, так как мне нужен отдельный ключ и 2 агрегата. – sophie

Смежные вопросы