2017-01-24 1 views
1

Мне в настоящее время нужно случайным образом отсортировать элементы в RDD в Spark для k элементов. Я заметил, что существует метод takeSample. Подпись метода выглядит следующим образом.Есть ли способ попробовать Spark RDD для определенного количества элементов вместо процента?

takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T] 

Однако это не возвращает RDD. Существует еще один метод выборки, который возвращает RDD, sample.

sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] 

Я не хочу, чтобы использовать первый метод takeSample, потому что он не возвращает RDD и потянет значительное количество данных обратно в программу драйвера (проблемы с памятью). Я пошел вперед и использовал метод sample, но мне пришлось вычислить fraction (в процентах) следующим образом.

val rdd = sc.textFile("some/path") //creates the rdd 
val N = rdd.count() //total items in the rdd 
val fraction = k/N.toDouble 
val sampledRdd = rdd.sample(false, fraction, 67L) 

Проблема с этим подходом/метода является то, что я не могу быть в состоянии получить RDD ровно K элементов. Например, если мы предположим, N = 10, то

  • к = 2, фракция = 20%, отбирали пробы пунктов = 2
  • к = 3, фракция = 30%, отбирали пробы пунктов = 3
  • и так далее

Но при N = 11, то

  • к = 2, фракция = 18,1818%, отбирали пробы элементов =?
  • k = 3, fraction = 27.2727%, выборочные элементы =?

В последнем примере для fraction = 18.1818%, сколько элементов будет в результате RDD?

Кроме того, это то, что говорит documentation о аргументе фракции.

 
expected size of the sample as a fraction of this RDD's size 
- without replacement: probability that each element is chosen; fraction must be [0, 1] 
- with replacement: expected number of times each element is chosen; fraction must be greater than or equal to 0 

Поскольку я выбрал without replacement, мне кажется, что моя фракция должна быть вычислена следующим образом. Обратите внимание, что каждый элемент имеет равную вероятность для выбора (что я пытаюсь выразить).

val N = rdd.count() 
val fraction = 1/N.toDouble 
val sampleRdd = rdd.sample(false, fraction, 67L) 

Таким образом, это k/N или 1/N? Кажется, что документация указывает во всех разных направлениях с размером выборки и вероятностью выборки.

И, наконец, примечания к документации.

Это НЕ гарантируется, что обеспечивает точно долю от количества данного RDD.

Который, а затем возвращает меня к моему первоначальному вопросу/проблеме: если API RDD не гарантирует выборку точно k элементов из RDD, как мы эффективно это делаем?

Как я писал этот пост, я обнаружил, что уже есть another SO post, задавая почти тот же вопрос. Я нашел приемлемый ответ неприемлемым.Здесь я также хотел уточнить аргумент фракции.

Интересно, есть ли способ сделать это с помощью наборов данных и DataFrames?

ответ

1

Это решение не так красиво, но я надеюсь, что было бы полезно подумать. Трюк использует дополнительный балл и получает k-й по величине счет в качестве порога.

val k = 100 
val rdd = sc.parallelize(0 until 1000) 
val rddWithScore = rdd.map((_, Math.random)) 
rddWithScore.cache() 
val threshold = rddWithScore.map(_._2) 
    .sortBy(t => t) 
    .zipWithIndex() 
    .filter(_._2 == k) 
    .collect() 
    .head._1 
val rddSample = rddWithScore.filter(_._2 < threshold).map(_._1) 
rddSample.count() 

Выходной сигнал будет

k: Int = 100 
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[58] at parallelize at <console>:31 
rddWithScore: org.apache.spark.rdd.RDD[(Int, Double)] = MapPartitionsRDD[59] at map at <console>:32 
threshold: Double = 0.1180443408900893 
rddSample: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[69] at map at <console>:40 
res10: Long = 100 
Смежные вопросы