2015-12-17 5 views
0

Итак, у меня есть rdd: Array [String] с именем Adat, и я хочу преобразовать его в цикле и получить новое RDD, которое я могу использовать вне области цикла. Я пробовал это, но результат не то, что я хочу.Преобразование RDD внутри цикла

val sharedA = { 
    for { 
    i <- 0 to shareA.toInt - 1 
    j <- 0 to shareA.toInt - 1 
    } yield { 
    Adat.map(x => (x(1).toInt, i % shareA.toInt, j % shareA.toInt, x(2))) 
    } 
} 

Приведенный выше код преобразует SharedA рдд в IndexedSeq [РДУ [(Int, Int, Int, String)]], и когда я пытаюсь напечатать его результат является:

MapPartitionsRDD[12] at map at planet.scala:99 

MapPartitionsRDD[13] at map at planet.scala:99 and so on. 

Как преобразовать sharedA в RDD[(Int, Int, Int, String)]?

Если я делаю это так, у sharedA есть правильный тип данных, но я не могу использовать его вне области видимости.

 for { i <- 0 to shareA.toInt -1 
     j<-0 to shareA.toInt-1 } 
     yield { 
     val sharedA=Adat.map(x => (x(1).toInt,i % shareA.toInt ,j %  
     shareA.toInt,x(2)))  
     } 
+0

Я не» t понять, что является точкой вашего кода :('i% shareA.toInt',' j% shareA.toInt' должен быть просто равен 'i' и' j' соответственно и является единственным изменяющимся элементом. Не могли бы вы предоставить пример ввода и ожидаемого результата. – zero323

+0

shareA уже вычисляется и рассматривает его как маленькое целое число, но его значение не имеет значения, скажем, что RDD Adat в начале имеет некоторые данные, разделенные запятой, с помощью цикла, который я хочу добавить с картой, больше данных, используя i и j и я хочу получить результат к новому RDD, который я мог бы использовать вне области видимости. –

ответ

0

Я не совсем понимаю ваше описание, но flatMap должен сделать трюк:

val rdd = sc.parallelize(Seq(Array("", "0", "foo"), Array("", "1", "bar"))) 
val n = 2 

val result = rdd.flatMap(xs => for { 
    i <- 0 to n 
    j <- 0 to n 
} yield (xs(1).toInt, i, j, xs(2))) 

result.take(5) 
// Array[(Int, Int, Int, String)] = 
// Array((0,0,0,foo), (0,0,1,foo), (0,0,2,foo), (0,1,0,foo), (0,1,1,foo)) 

Реже подход будет вызывать SparkContext.union по результатам:

val resultViaUnion = sc.union(for { 
    i <- 0 to n 
    j <- 0 to n 
} yield rdd.map(xs => (xs(1).toInt, i, j, xs(2)))) 

resultViaUnion.take(5) 
// Array[(Int, Int, Int, String)] = 
// Array((0,0,0,foo), (1,0,0,bar), (0,0,1,foo), (1,0,1,bar), (0,0,2,foo)) 
+0

Это должно сделать это спасибо за помощь –

Смежные вопросы