2015-08-17 6 views
3

У меня есть следующий РДД, который имеет 4 раздела: -mapPartitions возвращает пустой массив

val rdd=sc.parallelize(1 to 20,4) 

Теперь я пытаюсь позвонить mapPartitions на это: -

scala> rdd.mapPartitions(x=> { println(x.size); x }).collect 
5 
5 
5 
5 
res98: Array[Int] = Array() 

Почему он возвращает пустой массив? Функция anonymoys просто возвращает тот же самый итератор, который он получил, а затем как он возвращает пустой массив? Самое интересное в том, что если я удалю Println заявление, он действительно возвращает непустой массив: -

scala> rdd.mapPartitions(x=> { x }).collect 
res101: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) 

Это я не понимаю. Как получилось, что println (который просто печатает размер итератора) влияет на конечный результат функции?

ответ

6

Это потому, что x - это TraversableOnce, что означает, что вы пересекли его, позвонив по телефону size, а затем вернул его обратно.

Вы могли бы работать вокруг него целый ряд способов, но вот один:

rdd.mapPartitions(x=> { 
    val list = x.toList; 
    println(list.size); 
    list.toIterator 
}).collect 
1

Чтобы понять, что происходит, мы должны взглянуть на подпись функции вы передаете в mapPartitions:

(Iterator[T]) ⇒ Iterator[U] 

И что такое Iterator? Если вы посмотрите на Iterator documentation вы увидите, что это черта, которая простирается TraversableOnce:

trait Iterator[+A] extends TraversableOnce[A] 

Выше должен дать вам подсказку, что происходит в вашем случае. Итераторы обеспечивают два метода: hasNext и next. Чтобы получить size Iterator, вам нужно просто перебрать его. После этого hasNext возвращает false, и в результате вы получите пустой Iterator.

Смежные вопросы