2015-12-13 6 views
0
def description(list:Array[String]): Array[String] = { 
    for (y <- list) yield modulelookup.lookup(take(4)) + " " + brandlookup.lookup(y.drop(4)).toString() 
} 

val printRDD = outputRDD.collect().map(x=> (description(x._1),x._2)) 

- мой текущий код. Я бы хотел сделать это без сбора. modulelookup и brandlookup - это RDD. Как это сделать?Итеративный поиск по карте

ответ

2

Если modulelookup и brandlookup относительно малы вы можете преобразовать их транслировать переменные и использовать для отображения следующим образом:

val modulelookupBD = sc.broadcast(modulelookup.collectAsMap) 
val brandlookupBD = sc.broadcast(brandlookup.collectAsMap) 

def description(list:Array[String]): Array[String] = list.map(x => { 
    val module = modulelookupBD.value.getOrElse(x.take(4), "") 
    val brand = brandlookupBD.value.getOrElse(x.drop(4), "") 
    s"$module $brand" 
}) 

val printRDD = outputRDD.map{case (xs, y) => (description(xs), y)} 

Если нет не существует эффективного способа обработки этого. Вы можете попробовать flatMap, join и groupByKey, но для любого большого набора данных эта комбинация может быть непомерно дорогостоящей.

val indexed = outputRDD.zipWithUniqueId 
val flattened = indexed.flatMap{case ((xs, _), id) => xs.map(x => (x, id))} 

val withModuleAndBrand = flattened 
    .map(xid => (xid._1.take(4), xid)) 
    .join(modulelookup) 
    .values 
    .map{case ((x, id), module) => (x.drop(4), (id, module))} 
    .join(brandlookup) 
    .values 
    .map{case ((id, module), brand) => (id, s"$module $brand")} 
    .groupByKey 

val final = withModuleAndBrand.join(
    indexed.map{case ((_, y), id) => (id, y)} 
).values 

Замена RDD на DataFrames может сократить шаблонный код, но производительность будет оставаться проблемой.

+0

Это один тщательный ответ. Спасибо. – user1050325

Смежные вопросы