2016-12-03 3 views
0
val r1 = sc.parallelize((1 to 100 by 1)).map(i => (i, i % 10)) 
val r2 = for(i <- 0 to 9) yield r1.filter(_._2 == i).repartition(2) 

val tt = r2.map {r => 
    println(s"partitions ${r.getNumPartitions}") 
    val t = r.mapPartitionsWithIndex((i, p) => { 
     val len = p.toList.size 
     p.map(j => (j._1, j._2, i, len)) 
    }) 
    t 
}.reduce(_ union _) 
println (s"total ${tt.count}") 
println(tt.collect().mkString("\n")) 

Мое ожидание, что внешняя два println будет производить total 100 и печать кортежи в tt РДУ, как (10, 0, 1, 5) или так. Но выполнение этого в искровой оболочке дает только total 0. Почему tt RDD пуст. Меня это смущает. Обратите внимание, что внутренний println печатает partitions 2 для каждого RDD в r2. Я использую Spark 1.6.1Неожиданный выход из mapPartitionsWithIndex

ответ

2

Ваша проблема здесь в вашей лямбда-функции - вы потребляете итератор разделов, преобразовывая его в список, поэтому он пуст, когда вы вызываете p.map(j => (j._1, j._2, i, len))!

Вот как вы можете это исправить:

val t = r.mapPartitionsWithIndex((i, p) => { 
    val elements = p.toArray 
    val len = elements.length 
    elements.iterator.map(j => (j._1, j._2, i, len)) 
}) 
Смежные вопросы