2016-12-08 3 views
4

В следующем коде даны данные, имеющие три значения в каждом столбце, как показано ниже.Суммировать расстояние в информационных фреймах Apache-Spark

import org.graphframes._ 
    import org.apache.spark.sql.DataFrame 
    val v = sqlContext.createDataFrame(List(
     ("1", "Al"), 
     ("2", "B"), 
     ("3", "C"), 
     ("4", "D"), 
     ("5", "E") 
    )).toDF("id", "name") 

    val e = sqlContext.createDataFrame(List(
     ("1", "3", 5), 
     ("1", "2", 8), 
     ("2", "3", 6), 
     ("2", "4", 7), 
     ("2", "1", 8), 
     ("3", "1", 5), 
     ("3", "2", 6), 
     ("4", "2", 7), 
     ("4", "5", 8), 
     ("5", "4", 8) 
    )).toDF("src", "dst", "property") 
val g = GraphFrame(v, e) 
val paths: DataFrame = g.bfs.fromExpr("id = '1'").toExpr("id = '5'").run() 
paths.show() 
val df=paths 
df.select(df.columns.filter(_.startsWith("e")).map(df(_)) : _*).show 

Выходной сигнал выше код приведен ниже ::

+-------+-------+-------+              
    |  e0|  e1|  e2| 
    +-------+-------+-------+ 
    |[1,2,8]|[2,4,7]|[4,5,8]| 
    +-------+-------+-------+ 

В выводе выше, мы можем видеть, что каждый столбец имеет три значения, и их можно интерпретировать следующим образом.

e0 : 
source 1, Destination 2 and distance 8 

e1: 
source 2, Destination 4 and distance 7 

e2: 
source 4, Destination 5 and distance 8 

e0 в основном, e1 и e3 являются края. Я хочу суммировать третий элемент каждого столбца, т. Е. Добавить расстояние каждого края, чтобы получить общее расстояние. Как я могу это достичь?

ответ

5

Это можно сделать так:

val total = df.columns.filter(_.startsWith("e")) 
.map(c => col(s"$c.property")) // or col(c).getItem("property") 
.reduce(_ + _) 

df.withColumn("total", total) 
+2

ли '.property' предназначены для быть общим заполнителем для элемента столбца, к которому вы пытаетесь получить доступ? –

+2

@ evan058 Столбцы OP пытается получить доступ к краям графт-рам. Они представлены как структуры с тремя полями ('src',' dst', 'property'). Так что это элемент столбца. –

2

Я хотел бы сделать коллекцию столбцов просуммировать, а затем использовать foldLeft на UDF:

scala> val df = Seq((Array(1,2,8),Array(2,4,7),Array(4,5,8))).toDF("e0", "e1", "e2") 
df: org.apache.spark.sql.DataFrame = [e0: array<int>, e1: array<int>, e2: array<int>] 

scala> df.show 
+---------+---------+---------+ 
|  e0|  e1|  e2| 
+---------+---------+---------+ 
|[1, 2, 8]|[2, 4, 7]|[4, 5, 8]| 
+---------+---------+---------+ 

scala> val colsToSum = df.columns 
colsToSum: Array[String] = Array(e0, e1, e2) 

scala> val accLastUDF = udf((acc: Int, col: Seq[Int]) => acc + col.last) 
accLastUDF: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function2>,IntegerType,List(IntegerType, ArrayType(IntegerType,false))) 

scala> df.withColumn("dist", colsToSum.foldLeft(lit(0))((acc, colName) => accLastUDF(acc, col(colName)))).show 
+---------+---------+---------+----+ 
|  e0|  e1|  e2|dist| 
+---------+---------+---------+----+ 
|[1, 2, 8]|[2, 4, 7]|[4, 5, 8]| 23| 
+---------+---------+---------+----+ 
Смежные вопросы