2015-04-23 3 views
2

Если у меня есть RDD из ("a","b","c")Spark как сгруппировать по произвольному количеству ключей?

и генератор ключей является чем-то вроде

def keygen(x:String) = x match { 
    case "a" => Seq("x","y") 
    case "b" => Seq("x") 
    case "c" => Seq() 
} 

Как получить ключ-значение RDD из ("x"->Seq("a","b"),"y"->Seq("b"))

мой способ сделать это.

val sample = sc.parallelize(Seq("a", "b", "c")) 

def keygen(x: String) = x match { 
    case "a" => Seq("x", "y") 
    case "b" => Seq("x") 
    case "c" => Seq() 
} 
val sampleWithKey = sample.flatMap(x => keygen(x).map(y => (y, x))).groupBy(_._1).mapValues(_.map(_._2)) 
val result = sampleWithKey.collect() 
println("result: ", result.mkString("(", ",", ")")) 

получить (x,List(a, b)),(y,List(a))

+0

Ну ... ваш '' keygen' генерирует Seq [String] 'поэтому я не думаю, что вы хотите' RDD', где предполагаемые 'ключи' являются' String'. Также ... ваши 'RDD' должны быть' TypeSafe', поэтому вы не можете иметь 'RDD' с непоследовательным типом, например' ("x" -> ("a", "b"), "y" -> ("b")) 'который является' [(String, (String, String)), (String, (String))] ' –

+0

@SarveshKumarSingh вопрос редактируется. – Renkai

ответ

0

... Ну это вроде выглядит странно, но вы можете добиться этого, как следует,

def keygen(x:String) = x match { 
    case "a" => Seq("x","y") 
    case "b" => Seq("x") 
    case "c" => Seq("Empty") 
} 


val stringRdd = s.parallelize(List("a", "b", "c")) 
// RDD[ "a", "b", "c" ] 

val keyedRdd = stringRdd.map(string => (keygen(string), string)) 
// RDD[ (Seq("x", "y"), a), (Seq("x"), "b"), (Seq("Empty"), "c") ] 

val keyFlatRdd = keyedRdd 
    .flatMap({ case (keySeq, string) => keySeq.map(key => (key, string)) }) 
    .filter({ case (key, string) => !key.equalsIgnoreCase("Empty") }) 
// RDD[ ("x", "a"), ("y", "a"), ("x", "b") ] 

val finalRdd = keyFlatRdd 
    .groupBy({ case(key, string) => key } 
    .map({ case (key, seq) => (key, seq.map(_._2)) }) 
// RDD[ ("x", Seq("a", "b")), ("y", Seq("a")) ] 
Смежные вопросы