2017-01-05 2 views
3

У меня есть Спарк DataframeScala - Спарк Dataframe - Преобразование строк Карта переменной

 
Level Hierarchy Code 
-------------------------- 
Level1 Hier1  1 
Level1 Hier2  2 
Level1 Hier3  3 
Level1 Hier4  4 
Level1 Hier5  5 
Level2 Hier1  1 
Level2 Hier2  2 
Level2 Hier3  3 

Мне нужно преобразовать это в переменную карты как карты [String, карта [Int, String]]

т.е.

Map["Level1", Map[1->"Hier1", 2->"Hier2", 3->"Hier3", 4->"Hier4", 5->"Hier5"]] 
Map["Level2", Map[1->"Hier1", 2->"Hier2", 3->"Hier3"]] 

Просьба предложить подходящий подход для достижения этой функциональности.

Моя попытка. Это работает, но некрасиво

val level_code_df =master_df.select("Level","Hierarchy","Code").distinct() 
val hierarchy_names = level_code_df.select("Level").distinct().collect() 
val hierarchy_size = hierarchy_names.size 
var hierarchyMap : scala.collection.mutable.Map[String, scala.collection.mutable.Map[Int,String]] = scala.collection.mutable.Map[String, scala.collection.mutable.Map[Int,String]]()  
for(i <- 0 to hierarchy_size.toInt-1)  
println("names:"+hierarchy_names(i)(0)) 
val name = hierarchy_names(i)(0).toString() 
val code_level_map = level_code_df.rdd.map{row => { 
if(name.equals(row.getAs[String]("Level"))){ 
Map(row.getAs[String]("Code").toInt -> row.getAs[String]("Hierarchy")) 
} else 
Map[Int, String]() 
    }}.reduce(_++_) 

    hierarchyMap = hierarchyMap + (name -> (collection.mutable.Map() ++ code_level_map))  
    }   

    }  
+1

Привет, я добавил свой код в сообщение. –

ответ

3

Вы должны использовать dataframe.groupByKey("level") followeed по mapGroups. Не забудьте также включить Kryo кодировщик карты:

case class Data(level: String, hierarhy: String, code: Int) 
val data = Seq(
Data("Level1","Hier1",1), 
Data("Level1","Hier2",2), 
Data("Level1","Hier3",3), 
Data("Level1","Hier4",4), 
Data("Level1","Hier5",5), 
Data("Level2","Hier1",1), 
Data("Level2","Hier2",2), 
Data("Level2","Hier3",3)).toDS 
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Map[Int, String]]] 

Спарк 2.0+:

data.groupByKey(_.level).mapGroups{ 
    case (level, values) => Map(level -> values.map(v => (v.code, v.hierarhy)).toMap) 
}.collect() 
//Array[Map[String,Map[Int,String]]] = Array(Map(Level1 -> Map(5 -> Hier5, 1 -> Hier1, 2 -> Hier2, 3 -> Hier3, 4 -> Hier4)), Map(Level2 -> Map(1 -> Hier1, 2 -> Hier2, 3 -> Hier3))) 

Спарк 1.6+:

data.rdd.groupBy(_.level).map{ 
    case (level, values) => Map(level -> values.map(v => (v.code, v.hierarhy)).toMap) 
}.collect() 
//Array[Map[String,Map[Int,String]]] = Array(Map(Level2 -> Map(1 -> Hier1, 2 -> Hier2, 3 -> Hier3)), Map(Level1 -> Map(5 -> Hier5, 1 -> Hier1, 2 -> Hier2, 3 -> Hier3, 4 -> Hier4))) 
+0

Спасибо за ваш ответ. Надеюсь, ваш код соответствует Spark 2.0+. Будет ли этот код работать в Spark 1.6 –

+0

Обновленный ответ для искры-1.6. Почти то же самое, просто нужно преобразовать в rdd – prudenko

0

@ ответ prudenko является, вероятно, самым кратким - и должен работать с Spark 1.6 или более поздней версией. Но - если вы ищете решение, которое остается с DataFrames API (а не DataSets), вот один, используя простой UDF:

val mapCombiner = udf[Map[Int, String], mutable.WrappedArray[Map[Int, String]]] {_.reduce(_ ++ _)} 

val result: Map[String, Map[Int, String]] = df 
    .groupBy("Level") 
    .agg(collect_list(map($"Code", $"Hierarchy")) as "Maps") 
    .select($"Level", mapCombiner($"Maps") as "Combined") 
    .rdd.map(r => (r.getAs[String]("Level"), r.getAs[Map[Int, String]]("Combined"))) 
    .collectAsMap() 

ВНИМАНИЕ, что это будет работать плохо (или ОАЯ), если на один ключ могут быть тысячи разных значений (значение Level), но так как вы все равно собираете все это в память драйвера, это, вероятно, не будет проблемой, или ваше требование не будет работать независимо.

Смежные вопросы