2016-04-11 3 views

ответ

7

Попробуйте это:

>>> from pyspark.statcounter import StatCounter 
>>> 
>>> rdd = sc.parallelize([9, -1, 0, 99, 0, -10]) 
>>> stats = rdd.aggregate(StatCounter(), StatCounter.merge, StatCounter.mergeStats) 
>>> stats.minValue, stats.maxValue 
(-10.0, 99.0) 
2

Вот еще работает безвкусное решение с использованием аккумуляторов. Неэлегантность заключается в том, что вы должны определить нулевые/начальные значения перед началом работы, чтобы они не мешали данным:

from pyspark.accumulators import AccumulatorParam 
class MinMaxAccumulatorParam(AccumulatorParam): 
    def zero(self, value): 
     return value 
    def addInPlace(self, val1, val2): 
     return(min(val1[0],val2[0]), max(val1[1],val2[1])) 

minmaxAccu = sc.accumulator([500,-500], MinMaxAccumulatorParam()) 

def g(x): 
    global minmaxAccu 
    minmaxAccu += (x,x) 

rdd = sc.parallelize([1, 2, 3, 4, 5]) 

rdd.foreach(g) 

In [149]: minmaxAccu.value 
Out[149]: (1, 5) 
Смежные вопросы