2015-07-27 5 views
0

Я использую искру 1.2 с scala и имею пару RDD с (String, String). Образец записи выглядит следующим образом:Удалить дубликаты ключей от Spark Scala

<Key, value> 
id_1, val_1_1; val_1_2 
id_2, val_2_1; val_2_2 
id_3, val_3_1; val_3_2 
id_1, val_4_1; val_4_2 

Я просто хочу, чтобы удалить все записи с дубликатом ключа, поэтому в приведенном выше примере, четвёртая запись будет удалена, поскольку ID_1 является дубликат ключа.

Помощь Pls.

Спасибо.

+1

Где есть дубликаты ключей, как вы решите, какое значение сохранить? – mattinbits

+0

Его просто первое значение, которое мне нужно. – user2200660

+1

Проблема в том, что, когда Spark делает 'reduceByKey', как это предлагается в ответе ниже, вы не можете узнать, какое значение будет выбрано. Нет никакой гарантии, что Spark поддерживает упорядочение строк. Есть ли что-то в значении (например, факт _1_1), который вы можете использовать для дифференциации? – mattinbits

ответ

9

Вы можете использовать reduceByKey:

val rdd: RDD[(K, V)] = // ... 
val res: RDD[(K, V)] = rdd.reduceByKey((v1, v2) => v1) 
+0

Спасибо. Почему это не пришло мне в голову. Это было просто. Благодарю. – user2200660

+0

Рад, что это помогло :) –

1

Если необходимо выбрать всегда первая запись для данного ключа, а затем, комбинируя @JeanLogeart ответ с комментарием от @Paul,

import org.apache.spark.{SparkContext, SparkConf} 

val data = List(
    ("id_1", "val_1_1; val_1_2"), 
    ("id_2", "val_2_1; val_2_2"), 
    ("id_3", "val_3_1; val_3_2"), 
    ("id_1", "val_4_1; val_4_2")) 

val conf = new SparkConf().setMaster("local").setAppName("App") 
val sc = new SparkContext(conf) 
val dataRDD = sc.parallelize(data) 
val resultRDD = dataRDD.zipWithIndex.map{ 
    case ((key, value), index) => (key, (value, index)) 
}.reduceByKey((v1,v2) => if(v1._2 < v2._2) v1 else v2).mapValues(_._1) 
resultRDD.collect().foreach(v => println(v)) 
sc.stop() 
Смежные вопросы