2015-08-05 4 views
2

Мой входных наборов данных выглядитreduceBykey Спарк поддержания порядка

id1, 10, v1 
id2, 9, v2 
id2, 34, v3 
id1, 6, v4 
id1, 12, v5 
id2, 2, v6 

и я хочу выход

id1; 6,v4 | 10,v1 | 12,v5 
id2; 2,v6 | 9,v2 | 34,v3 

Это такое, что

id1: array[num(i),value(i)] where num(i) should be sorted 

То, что я пробовал:

  • Получить идентификатор и 2-й столбец в качестве ключа, sortByKey, но так как это строка, сортировки не бывает, как межды, но как строка

  • Получить 2-й столбец в качестве ключа, sortByKey, а затем получить идентификатор и ключ и 2 колонка в стоимостном выражении, reduceByKey. Но в этом случае, делая reduceByKey; порядок не сохраняется. Даже groupByKey не мешает заказу. На самом деле это ожидается.

Любая помощь будет оценена по достоинству.

ответ

6

Поскольку вы не предоставили никакой информации о типе ввода я предполагаю, что это RDD[(String, Int, String)]:

val rdd = sc.parallelize(
    ("id1", 10, "v1") :: ("id2", 9, "v2") :: 
    ("id2", 34, "v3") :: ("id1", 6, "v4") :: 
    ("id1", 12, "v5") :: ("id2", 2, "v6") :: Nil) 

rdd 
    .map{case (id, x, y) => (id, (x, y))} 
    .groupByKey 
    .mapValues(iter => iter.toList.sortBy(_._1)) 
    .sortByKey() // Optional if you want id1 before id2 

Edit:

Чтобы получить вывод, который вы описали in the comments вы можете заменить функция, переданная до mapValues с чем-то вроде этого:

def process(iter: Iterable[(Int, String)]): String = { 
    iter.toList 
     .sortBy(_._1) 
     .map{case (x, y) => s"$x,$y"} 
     .mkString("|") 
} 
+0

Большое спасибо @ zero323 – user2200660

+0

Большое спасибо @ zero323. Результатом rdd является «» RDD [(String, List [(Int, String)])] «« Можете ли вы также рассказать мне, как преобразовать этот RDD в «RDD [(String, String)]». Таким образом, в основном ваш вывод «" (id1, List ((6, v4), (10, v1), (12, v5))) ", но мне нужно" "(id1; 6, v4 | 10, v1 | 12, v5) "" – user2200660

+0

Несомненно, проверьте изменения. – zero323