2016-04-28 3 views
0

У меня есть два набора данных, как показано ниже. Каждый набор данных имеет «,» разделенные номера в каждой строке.Замена значений RDD на другое

Dataset 1

1,2,0,8,0

2,0,9,0,3

Dataset 2

7, 5,4,6,3

4,9,2,1,8

Я должен заменить нули первого набора данных с соответствующими значениями из набора данных 2.

Так что результат будет выглядеть следующим образом

1,2,4,8 , 3

2,9,9,1,3

Я заменил значения с кодом ниже.

val rdd1 = sc.textFile(dataset1).flatMap(l => l.split(",")) 
val rdd2 = sc.textFile(dataset2).flatMap(l => l.split(",")) 
val result = rdd1.zip(rdd2).map(x => if(x._1 == "0") x._2 else x._1) 

Выход меня имеет формат РДД [String]. Но мне нужен вывод в формате RDD [Array [String]], так как этот формат был бы более подходящим для моих дальнейших преобразований.

+0

Вы ищете что-то вроде 'валь результат = rdd1.zip (rdd2) .map (х => если (x._1 == "0") Array (x._2) else Array (x._1)) '? –

+0

@AlexisC. Нет. Rdd1 и rdd2 имеют тип RDD [Array [String]]. Итак, x._1 в вашем коде относится к массиву – yAsH

+0

Ну, это не ясно из вашего фрагмента. Вы являетесь плоским отображением после расщепления, в результате получается 'RDD [String]' и выполняется 'x._1 ==" 0 "'; так как 'x._1' может ссылаться на массив? Если вы не хотите 'RDD [Array [String]]' с 2 массивами (по одному для каждой строки)? –

ответ

2

Если вы хотите RDD[Array[String]], где каждый элемент массива соответствует строке, не плоские карты значений после расщепления, просто сопоставьте их.

scala> val rdd1 = sc.parallelize(List("1,2,0,8,0", "2,0,9,0,3")).map(l => l.split(",")) 
rdd1: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[1] at map at <console>:27 

scala> val rdd2 = sc.parallelize(List("7,5,4,6,3", "4,9,2,1,8")).map(l => l.split(",")) 
rdd2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[3] at map at <console>:27 

scala> val result = rdd1.zip(rdd2).map{case(arr1, arr2) => arr1.zip(arr2).map{case(v1, v2) => if(v1 == "0") v2 else v1}} 
result: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[5] at map at <console>:31 

scala> result.collect 
res0: Array[Array[String]] = Array(Array(1, 2, 4, 8, 3), Array(2, 9, 9, 1, 3)) 

или, может быть менее многословным:

val result = rdd1.zip(rdd2).map(t => t._1.zip(t._2).map(x => if(x._1 == "0") x._2 else x._1)) 
+0

У меня есть еще одно RDD, у которого есть пороговые значения для полученного выше результата в формате Array (Array (6, 100), Array (5, 100), Array (7, 100), Array (0, 100), Array (- 1, 100)). Как проверить, находятся ли значения в каждом массиве результата RDD между этими пороговыми значениями? – yAsH

Смежные вопросы