2

Предполагая существование РДА кортежей подобного следующему:Spark RDD: как наиболее эффективно вычислять статистику?

(key1, 1) 
(key3, 9) 
(key2, 3) 
(key1, 4) 
(key1, 5) 
(key3, 2) 
(key2, 7) 
... 

Что является наиболее эффективным (и, в идеале, распространяемый) способ вычисления статистики, соответствующую каждый ключ? (На данный момент я ищу, чтобы вычислить стандартное отклонение/отклонение, в частности). Как я понимаю, мои варианты составляют:

  1. Используйте colStats function in MLLib: Этот подход имеет преимущество легко адаптируемой использовать другие функции mllib.stat позже, если другие статистические вычисления считаются необходимыми. Однако он работает на RDD Vector, содержащий данные для каждого столбца, поэтому, как я понимаю, для этого подхода потребуется, чтобы полный набор значений для каждого ключа был собран на одном узле, что казалось бы не идеальным для больших наборы данных. Является ли Spark Vector всегда подразумевать, что данные в Vector резидентны локально, на одном узле?
  2. Выполните a groupByKey, затем stats: Скорее всего, тасование тяжелое, as a result of the groupByKey operation.
  3. Выполните aggregateByKey, инициализации нового StatCounter и используя StatCounter::merge как функции последовательности и объединителя: Это подход recommended by this StackOverflow answer, и избегает groupByKey от варианта 2. Тем не менее, я не смог найти хорошую документацию для StatCounter в PySpark.

мне нравится Вариант 1, потому что это делает код более расширяемым в том, что он легко может вместить более сложные расчеты с использованием других функций MLLib с подобными контрактами, но если Vector входов по своей природе требуют, чтобы наборы данных быть собраны локально, то он ограничивает размеры данных, на которых код может эффективно работать. Между двумя другими, Вариант 3 выглядит более эффективным, потому что он избегает groupByKey, но я надеялся подтвердить, что это так.

Есть ли другие варианты, которые я не рассматривал? (В настоящее время я использую Python + PySpark, но я открыт для решений в Java/Scala, если есть языковая разница.)

+0

Возможный дубликат [поиск мин/макс с помощью pyspark за один проход по данным] (http://stackoverflow.com/questions/36559809/finding-min-max-with-pyspark-in-single-pass-over -данные) –

ответ

2

Вы можете попробовать reduceByKey. Это довольно просто, если мы хотим вычислить min():

rdd.reduceByKey(lambda x,y: min(x,y)).collect() 
#Out[84]: [('key3', 2.0), ('key2', 3.0), ('key1', 1.0)] 

Для расчета mean, вам сначала нужно создать (value, 1) кортежи, которые мы используем для расчета как sum и count в reduceByKey операции.Наконец, мы разделим их друг с другом, чтобы прибыть в mean:

meanRDD = (rdd 
      .mapValues(lambda x: (x, 1)) 
      .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) 
      .mapValues(lambda x: x[0]/x[1])) 

meanRDD.collect() 
#Out[85]: [('key3', 5.5), ('key2', 5.0), ('key1', 3.3333333333333335)] 

Для variance, вы можете использовать формулу (sumOfSquares/count) - (sum/count)^2, который мы переводим следующим образом:

varRDD = (rdd 
      .mapValues(lambda x: (1, x, x*x)) 
      .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1], x[2]+y[2])) 
      .mapValues(lambda x: (x[2]/x[0] - (x[1]/x[0])**2))) 

varRDD.collect() 
#Out[106]: [('key3', 12.25), ('key2', 4.0), ('key1', 2.8888888888888875)] 

я использовал значения типа double вместо int в манекене данные для точной иллюстрации вычислительных средних и дисперсии:

rdd = sc.parallelize([("key1", 1.0), 
         ("key3", 9.0), 
         ("key2", 3.0), 
         ("key1", 4.0), 
         ("key1", 5.0), 
         ("key3", 2.0), 
         ("key2", 7.0)]) 
Смежные вопросы