2015-05-19 3 views
5

Я пробовал несколько разных сценариев, чтобы попытаться использовать DataFrames от Spark для обработки таких вещей, как sciPy kurtosis или numpy std. Вот пример кода, но он просто зависает на наборе данных 10x10 (10 строк с 10 столбцами). Я пробовал:pySpark DataFrames Функции агрегации с SciPy

print df.groupBy().agg(kurtosis(df.offer_id)).collect() 

print df.agg(kurtosis(df.offer_ID)).collect() 

Но это не работает без проблем:

print df.agg(F.min(df.offer_id), F.min(df.decision_id)).collect() 

Моя догадка, потому что F является: from pyspark.sql import functions as F функция SQL программируются Как я использую dataframes делать такие вещи, как эксцесс. на наборе данных?

Это также просто висит:

print df.map(kurtosis(df.offer_id)).collect() 

ответ

2

К сожалению Спарк текущей UDF поддержки SQL в Python для UDF, немного не хватает. Я искал попытку добавить некоторые UDF в Scala и вызывать их из Python для проекта, над которым я работаю, поэтому я сделал быстрое доказательство концепции, используя эксцесс, как UDAF для реализации. Филиал в настоящее время живет в https://github.com/holdenk/sparklingpandas/tree/add-kurtosis-support

первый шаг определения нашего UDAF в Scala - это, вероятно, меньше, чем идеал, но вот реализация:

object functions { 
    def kurtosis(e: Column): Column = new Column(Kurtosis(EvilSqlTools.getExpr(e))) 
} 

case class Kurtosis(child: Expression) extends AggregateExpression { 
    def this() = this(null) 

    override def children = child :: Nil 
    override def nullable: Boolean = true 
    override def dataType: DataType = DoubleType 
    override def toString: String = s"Kurtosis($child)" 
    override def newInstance() = new KurtosisFunction(child, this) 
} 

case class KurtosisFunction(child: Expression, base: AggregateExpression) extends AggregateFunction { 
    def this() = this(null, null) 

    var data = scala.collection.mutable.ArrayBuffer.empty[Any] 
    override def update(input: Row): Unit = { 
    data += child.eval(input) 
    } 

    // This function seems shaaady 
    // TODO: Do something more reasonable 
    private def toDouble(x: Any): Double = { 
    x match { 
     case x: NumericType => EvilSqlTools.toDouble(x.asInstanceOf[NumericType]) 
     case x: Long => x.toDouble 
     case x: Int => x.toDouble 
     case x: Double => x 
    } 
    } 
    override def eval(input: Row): Any = { 
    if (data.isEmpty) { 
     println("No data???") 
     null 
    } else { 
     val inputAsDoubles = data.toList.map(toDouble) 
     println("computing on input "+inputAsDoubles) 
     val inputArray = inputAsDoubles.toArray 
     val apacheKurtosis = new ApacheKurtosis() 
     val result = apacheKurtosis.evaluate(inputArray, 0, inputArray.size) 
     println("result "+result) 
     Cast(Literal(result), DoubleType).eval(null) 
    } 
    } 
} 

Затем мы можем использовать подобную логику, используемый в Спарк в SQL functions.py реализация:

"""Our magic extend functions. Here lies dragons and a sleepy holden.""" 
from py4j.java_collections import ListConverter 

from pyspark import SparkContext 
from pyspark.sql.dataframe import Column, _to_java_column 

__all__ = [] 
def _create_function(name, doc=""): 
    """ Create a function for aggregator by name""" 
    def _(col): 
     sc = SparkContext._active_spark_context 
     jc = getattr(sc._jvm.com.sparklingpandas.functions, name)(col._jc if isinstance(col, Column) else col) 
     return Column(jc) 
    _.__name__ = name 
    _.__doc__ = doc 
    return _ 

_functions = { 
    'kurtosis': 'Calculate the kurtosis, maybe!', 
} 


for _name, _doc in _functions.items(): 
    globals()[_name] = _create_function(_name, _doc) 
del _name, _doc 
__all__ += _functions.keys() 
__all__.sort() 

и тогда мы можем идти вперед и назвать его как UDAF например так:

from sparklingpandas.custom_functions import * 
import random 
input = range(1,6) + range(1,6) + range(1,6) + range(1,6) + range(1,6) + range(1,6) 
df1 = sqlContext.createDataFrame(sc.parallelize(input)\ 
            .map(lambda i: Row(single=i, rand= random.randint(0,100000)))) 
df1.collect() 
import pyspark.sql.functions as F 
x = df1.groupBy(df1.single).agg(F.min(df1.rand)) 
x.collect() 
j = df1.groupBy(df1.single).agg(kurtosis(df1.rand)) 
j.collect() 
+0

Я не думаю, что решение UDF работает, потому что я делаю следующее: kert = udf (lambda x: kurtosis (x), FloatType()) print df.select (kert (df.offer_id)). Collect () не работает, потому что он передается в каждом значении отдельно. Вы не можете сделать с ним .agg, поэтому я пытаюсь думать о другом. – theMadKing

+1

Thats true, я на самом деле работаю на Sparkling Pandas в качестве побочного проекта, и этот вид меня заинтересовал, поэтому я начал работу по реализации этой поддержки. Я уточню свой ответ, чтобы получить детали. – Holden

+0

Обновлено (его много кода в основном потому, что нам нужно сделать некоторые вещи на стороне Scala + стороне Python). – Holden

Смежные вопросы