2015-11-20 3 views
3

У меня есть DataFrame 66 столбцов для обработки (почти каждое значение столбца должно быть изменено каким-то образом), поэтому я бегу следующее заявлениеСпарк предел Scala 2,10 кортеж

val result = data.map(row=> (
     modify(row.getString(row.fieldIndex("XX"))), 
     (...) 
     ) 
    ) 

до 66-й колонке. Поскольку scala в этой версии имеет ограничение на максимальный набор из 22 пар, я не могу это сделать. Вопрос в том, есть ли обходной путь для этого? После всех операций линии я преобразуя его в ДФ с конкретными именами столбцов

result.toDf("c1",...,"c66") 
    result.storeAsTempTable("someFancyResult") 

«изменить» функция просто пример, чтобы показать мою точку

+1

переключатель в scala 2.11? – Odomontois

+0

Хотелось бы, чтобы это было так просто, но это не так. – Silverrose

+0

@Odomontois AFAIK Scala 2.11 не поддерживает кортежи мощности> 22, т. Е. Нет Tuple23. – moem

ответ

5

Если вы все это изменение значения из существующего DataFrame лучше использовать UDF вместо отображение по RDD:

import org.apache.spark.sql.functions.udf 

val modifyUdf = udf(modify) 
data.withColumn("c1", modifyUdf($"c1")) 

Если по какой-то причине выше не соответствует вашим потребностям самое простое, что вы можете сделать, это воссоздать DataFrame из RDD[Row]. например:

import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.{StructField, StructType, IntegerType} 


val result: RDD[Row] = data.map(row => { 
    val buffer = ArrayBuffer.empty[Any] 

    // Add value to buffer 
    buffer.append(modify(row.getAs[String]("c1"))) 

    // ... repeat for other values 

    // Build row 
    Row.fromSeq(buffer) 
}) 

// Create schema 
val schema = StructType(Seq(
    StructField("c1", StringType, false), 
    // ... 
    StructField("c66", StringType, false) 
)) 

sqlContext.createDataFrame(result, schema) 
+0

Правда, если у него уже есть Dataframe, это более простой маршрут –

+0

@Ewan. Похоже, это так. Это не только самый простой подход, но и гораздо более эффективный. – zero323

+0

Спасибо большое! Это то, что спасает меня :) – Silverrose

1

Обойти это довольно неудобный, но это делает работа, попробуйте этот пример кода, чтобы вы начали, вы можете видеть, что есть более чем 22 колонны осуществляется доступ:

object SimpleApp { 
    class Record(val x1: String, val x2: String, val x3: String, ... val x24:String) extends Product with Serializable { 
    def canEqual(that: Any) = that.isInstanceOf[Record] 

    def productArity = 24 

    def productElement(n: Int) = n match { 
     case 0 => x1 
     case 1 => x2 
     case 2 => x3 
     ... 
     case 23 => x24 
    } 
    } 

    def main(args: Array[String]) { 

    val conf = new SparkConf().setAppName("Product Test") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc); 

    val record = new Record("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x") 

    import sqlContext._ 
    sc.parallelize(record :: Nil).registerAsTable("records") 

    sql("SELECT x1 FROM records").collect() 
    } 
} 
Смежные вопросы