2017-01-16 6 views
7

Есть ли эквивалент функции Pandas Melt в Apache Spark в PySpark или, по крайней мере, в Scala?Как расплавить Spark DataFrame?

Я запускал образец данных до сих пор в python, и теперь я хочу использовать Spark для всего набора данных.

Заранее спасибо.

+0

Проверьте это: http://chappers.github.io/web%20micro%20log/2016/03/07/implementing-simple-melt-function-for-pyspark/ – MYGz

+0

Извините за отложенный ответ ... Ошибка выполнения задания даже для небольшого набора данных образца (rdd), созданного с помощью rdd = sc.parallelize ([("x", 1,4), ("y", 3,5), ("z", 2 , 6)]) –

ответ

12

Нет встроенной функции (если вы работаете с поддержкой SQL и поддержкой Hive, вы можете использовать stack function, но она не отображается в Spark и не имеет встроенной реализации), но тривиально сворачивать свои собственные. Необходимые импорт:

from pyspark.sql.functions import array, col, explode, lit, struct 
from pyspark.sql import DataFrame 
from typing import Iterable 

Пример реализации:

def melt(
     df: DataFrame, 
     id_vars: Iterable[str], value_vars: Iterable[str], 
     var_name: str="variable", value_name: str="value") -> DataFrame: 
    """Convert :class:`DataFrame` from wide to long format.""" 

    # Create array<struct<variable: str, value: ...>> 
    _vars_and_vals = array(*(
     struct(lit(c).alias(var_name), col(c).alias(value_name)) 
     for c in value_vars)) 

    # Add to the DataFrame and explode 
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals)) 

    cols = id_vars + [ 
      col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]] 
    return _tmp.select(*cols) 

И некоторые тесты (основанные на Pandas doctests):

import pandas as pd 

pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'}, 
        'B': {0: 1, 1: 3, 2: 5}, 
        'C': {0: 2, 1: 4, 2: 6}}) 

pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C']) 
A variable value 
0 a  B  1 
1 b  B  3 
2 c  B  5 
3 a  C  2 
4 b  C  4 
5 c  C  6 
sdf = spark.createDataFrame(pdf) 
melt(sdf, id_vars=['A'], value_vars=['B', 'C']).show() 
+---+--------+-----+ 
| A|variable|value| 
+---+--------+-----+ 
| a|  B| 1| 
| a|  C| 2| 
| b|  B| 3| 
| b|  C| 4| 
| c|  B| 5| 
| c|  C| 6| 
+---+--------+-----+ 

Примечание. Для использования с устаревшими версиями Python удалите аннотации типов.

3

Пошел через этот вопрос в моем поиске реализации расплава в Spark для scala. Проводка моего порта Scala в случае, если кто-то тоже наткнется на это.

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.{DataFrame} 
/** Extends the [[org.apache.spark.sql.DataFrame]] class 
* 
* @param df the data frame to melt 
*/ 
implicit class DataFrameFunctions(df: DataFrame) { 

    /** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format. 
    * 
    * melt is (kind of) the inverse of pivot 
    * melt is currently (02/2017) not implemented in spark 
    * 
    * @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html) 
    * @see this is a scala adaptation of http://stackoverflow.com/questions/41670103/pandas-melt-function-in-apache-spark 
    * 
    * @todo method overloading for simple calling 
    * 
    * @param id_vars the columns to preserve 
    * @param value_vars the columns to melt 
    * @param var_name the name for the column holding the melted columns names 
    * @param value_name the name for the column holding the values of the melted columns 
    * 
    */ 

    def melt(
      id_vars: Seq[String], value_vars: Seq[String], 
      var_name: String = "variable", value_name: String = "value") : DataFrame = { 

     // Create array<struct<variable: str, value: ...>> 
     val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*) 

     // Add to the DataFrame and explode 
     val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals)) 

     val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }} 

     return _tmp.select(cols: _*) 

    } 
} 

Поскольку я не настолько продвинутый, учитывая scala, я уверен, что есть место для улучшения. Любые комментарии приветствуются.