2014-09-25 3 views
12

Я новичок в Apache Spark, и я знаю, что основная структура данных - это RDD. Теперь я пишу некоторые приложения, для которых требуется информация о позициях элемента. Например, после преобразования ArrayList в RDD (Java) для каждого целого в RDD мне нужно знать его (глобальный) индекс массива. Можно ли это сделать?Как получить положение элемента в RDD Spark?

Как я знаю, существует функция take (int) для RDD, поэтому я считаю, что позиционная информация сохраняется в RDD.

ответ

11

По существу, метод zipWithIndex() RDD, похоже, делает это, но он не сохранит первоначальный порядок данных, из которых был создан RDD. По крайней мере, вы получите стабильный заказ.

val orig: RDD[String] = ... 
val indexed: RDD[(String, Long)] = orig.zipWithIndex() 

Причина вы вряд ли найдете что-то, что сохраняет порядок в исходных данных, утопает в API док для zipWithIndex():

«Молнии это РДД с его индексами элементов. Сначала упорядочение основано на на индексе раздела, а затем на упорядочении позиций в каждом разделе . Таким образом, первый элемент в первом разделе получает индекс 0, а последний элемент последнего раздела получает наибольший индекс. Этот номер похож на zipWithIndex от Scala, но использует Long вместо In t как тип индекса. Этот метод требует, чтобы вызвать искру работу, когда это РДД содержит более одного раздел.»

Так выглядит исходный порядок отбрасывается. Если сохранение первоначального порядка является важным для вас, это выглядит, как вам нужно добавить индекс перед тем вы создаете RDD.

+0

Да, добавление индекса массива в качестве дополнительного атрибута перед созданием RDD может решить эту проблему. Однако существуют два серьезных ограничения: 1) Очевидно, что этот дополнительный атрибут индекса будет по меньшей мере вдвое превышать стоимость хранения, и такая стоимость может быть еще больше, например, в массиве integer/float, для индекса добавляется длинное поле int. 2) Поскольку добавление дополнительных значений индекса не может быть загружено в Spark, такое преобразование данных также не может быть распараллелено Spark. Таким образом, я должен использовать другие параллельные методы для добавления индекса. – SciPioneer

14

Я считаю, в большинстве случаев, zipWithIndex() будет делать трюк, и он будет сохранять порядок. перечитал комментарии. Я понимаю, что это точно означает держать порядок в RDD.

scala> val r1 = sc.parallelize(List("a", "b", "c", "d", "e", "f", "g"), 3) 
scala> val r2 = r1.zipWithIndex 
scala> r2.foreach(println) 
(c,2) 
(d,3) 
(e,4) 
(f,5) 
(g,6) 
(a,0) 
(b,1) 

Вышеприведённый пример. Красный имеет 3 раздела и a с индексом 0, b с индексом 1 и т. Д.

+0

Спасибо за ваш ответ! В большинстве случаев этот метод не является плохим, поскольку элемент во входном массиве/списке может быть относительно большим объектом. Тем не менее, это может быть проблемой для массивов примитивного типа, например, целочисленного массива, поскольку это, казалось бы, единственное решение довольно неэффективно с точки зрения затрат на вычисления и хранения. Во всяком случае, я очень доволен вашим ответом. Я надеюсь, что когда-нибудь, естественно, поддержание индекса без (zipWithIndex) может стать истинным для RDD Spark. – SciPioneer

+0

Основываясь на дизайне Spark, я не могу изобразить хороший способ поддерживать индекс элемента без ущерба для хранилища. –