2015-11-11 2 views
1

Предположим, у меня есть один RDD из Tuple2, как показано ниже:Как объединить два RDD с разными ключами в java Spark?

<session1_w1, <0.2, 2>>, 
<session1_w2, <1.3, 4>>, 
<session1_w3, <0.4, 3>>, 
<session2_w1, <0.5, 2>>, 
<session2_w2, <2.3, 6>> 

Мне нужно отобразить его на следующей РДУ, таким образом, что последнее поле является суммирование последних полей кортежей с тем же значением ключа частичного например session1:

2 + 4 + 3 => 9 
2 + 6 => 8 

Так Результат, который я ожидаю:

<session1_w1, 0.2, 9>, 
<session1_w2, 1.3, 9>, 
<session1_w3, 0.4, 9>, 
<session2_w1, <0.5, 8>>, 
<session2_w2, <2.3, 8>> 

Это своего рода сокращения, но я не хочу потерять оригинальные ключи.

Я могу рассчитать суммирование путем сопоставления, а затем сводя к следующему RDD, но тогда мне нужно объединить этот RDD с первым RDD, чтобы получить результат.

<session1, 9> <session2, 8> 

Есть идеи?

ответ

2

используется groupBy, сохраняющая структуру вашего РДА (но не сохраняет порядок, так что если вы хотите сохранить порядок, вы должны zipWithIndex и позже sortBy индекс).

В противном случае, если у вас есть RDD[(String,(Double,Int))]:

// This should give you an RDD[(Iterative(String,Double),Int)] 
val group = myRDD.groupBy(_._1).map(x => (x._2.map(y => (y._1,y._2._1)), 
              x._2.map(y => y._2._2).reduce(_+_))) 

// This will give you back your RDD of [Summed Int, String, Double] which you can then map. 
val result = group.map(x => (x._2,x._1)).flatMapValues(x => x) 

Вы также можете сделать простой reduceByKey (без Double), а затем присоединиться к его обратно к первоначальному РДУ, так что оригинальные Парный сохраняются.

========== EDIT ============

Второе решение присоединиться просто использует РДД присоединиться. У вас есть исходный RDD в формате RDD[(String,(Double,Int))], и я предполагаю, что вы уже получили свой RDD [(String,Int)], где String - сессия, Int - это сумма. Операция соединения является просто:

RDDOriginal.join(RDDwithSum).map(x=>(x._1,x._2._1._1,x._2._2)) // This should give you the Session (String) followed by the Double and the Int (the sum). 

метод соединится и не сохраняет порядок, если вы хотите сохранить заказ, вы должны сделать zipWithIndex.

+0

Спасибо, но для вашего второго решения, как я могу присоединиться к двум RDD с разными ключами? Например, первый имеет «key = session1_w1», а второй имеет «key = session1» –

+0

@Mashaye. Каково ваше определение для 'join'? –

+0

Обычно соединение выполняется между Rdds с одним и тем же набором ключей. У моих RDD есть разные ключи, как описано в вопросе. Во всяком случае, я использовал второе решение, которое предоставил GameOfThrows, и это сработало. –

Смежные вопросы