2015-11-10 4 views
0

Я хочу создать по существу sumproduct по столбцам в Spark DataFrame. У меня есть DataFrame, который выглядит следующим образом:SumProduct в Spark DataFrame

id val1 val2 val3 val4 
123 10  5  7  5 

Я также есть карта, которая выглядит как:

val coefficents = Map("val1" -> 1, "val2" -> 2, "val3" -> 3, "val4" -> 4) 

Я хочу взять значение в каждом столбце DataFrame, умножить его на соответствующий дорожим с карты, и возвращает результат в новый столбец так, по существу:

(10*1) + (5*2) + (7*3) + (5*4) = 61 

Я попытался это:

val myDF1 = myDF.withColumn("mySum", {var a:Double = 0.0; for ((k,v) <- coefficients) a + (col(k).cast(DoubleType)*coefficients(k));a}) 

но получил ошибку при перегрузке метода «+». Даже если я это решила, я не уверен, что это сработает. Есть идеи? Я всегда мог динамически строить SQL-запрос как текстовую строку и делать это так, но я надеялся на что-то более красноречивое.

Любые идеи приветствуются.

ответ

2

Проблема с кодом является то, что вы пытаетесь добавить Column к Double. cast(DoubleType) влияет только на тип хранимого значения, а не на тип самого столбца. Поскольку Double не предоставляет метод *(x: org.apache.spark.sql.Column): org.apache.spark.sql.Column, все не удается.

Чтобы сделать его работу вы можете, например, сделать что-то вроде этого:

import org.apache.spark.sql.Column 
import org.apache.spark.sql.functions.{col, lit} 

val df = sc.parallelize(Seq(
    (123, 10, 5, 7, 5), (456, 1, 1, 1, 1) 
)).toDF("k", "val1", "val2", "val3", "val4") 

val coefficients = Map("val1" -> 1, "val2" -> 2, "val3" -> 3, "val4" -> 4) 

val dotProduct: Column = coefficients 
    // To be explicit you can replace 
    // col(k) * v with col(k) * lit(v) 
    // but it is not required here 
    // since we use * f Column.* method not Int.* 
    .map{ case (k, v) => col(k) * v } // * -> Column.* 
    .reduce(_ + _) // + -> Column.+ 

df.withColumn("mySum", dotProduct).show 
// +---+----+----+----+----+-----+ 
// | k|val1|val2|val3|val4|mySum| 
// +---+----+----+----+----+-----+ 
// |123| 10| 5| 7| 5| 61| 
// |456| 1| 1| 1| 1| 10| 
// +---+----+----+----+----+-----+ 
2

Похоже, вопрос в том, что вы на самом деле не делают ничего с a

for((k, v) <- coefficients) a + ... 

Вы, вероятно, имел в виду a += ...


Кроме того, некоторые советы для очистки блока кода внутри withColumn звонок:

Вам не нужно звонить coefficients(k), потому что у вас уже есть значение в v от for((k,v) <- coefficients)

Scala довольно хорошо сделать остротами, но это своего рода обман, если вы должны поставить точку с запятой в этой одной строке: P Я хотел бы предложить, разбивая секцию вычисления суммы в одну строку в выражении ,

Выражение суммы может быть переписано в виде, который позволяет избежать fold с использованием var (идиоматична Скала обычно избегает var с), например,

import org.apache.spark.sql.functions.lit 

coefficients.foldLeft(lit(0.0)){ 
    case (sumSoFar, (k,v)) => col(k).cast(DoubleType) * v + sumSoFar 
} 
0

Я не уверен, если это возможно через DataFrame API, так как вы только способны работать с колоннами и не все предопределенные затворы (например, картой параметров).

я изложил способ ниже, используя основную RDD из DataFrame:

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.Row 

// Initializing your input example. 
val df1 = sc.parallelize(Seq((123, 10, 5, 7, 5))).toDF("id", "val1", "val2", "val3", "val4") 

// Return column names as an array 
val names = df1.columns 

// Grab underlying RDD and zip elements with column names 
val rdd1 = df1.rdd.map(row => (0 until row.length).map(row.getInt(_)).zip(names)) 

// Tack on accumulated total to the existing row 
val rdd2 = rdd0.map { seq => Row.fromSeq(seq.map(_._1) :+ seq.map { case (value: Int, name: String) => value * coefficents.getOrElse(name, 0) }.sum) } 

// Create output schema (with total) 
val totalSchema = StructType(df1.schema.fields :+ StructField("total", IntegerType)) 

// Apply schema to create output dataframe 
val df2 = sqlContext.createDataFrame(rdd1, totalSchema) 

// Show output: 
df2.show() 
... 
+---+----+----+----+----+-----+ 
| id|val1|val2|val3|val4|total| 
+---+----+----+----+----+-----+ 
|123| 10| 5| 7| 5| 61| 
+---+----+----+----+----+-----+