2015-09-15 2 views
2

Я использую искру с scala.Программно добавляя несколько столбцов в Spark DataFrame

У меня есть Dataframe с 3 столбцами: ID, время, RawHexdata. У меня есть пользовательская функция, которая принимает RawHexData и расширяет ее на X больше столбцов. Важно указать, что для каждой строки X одинаково (столбцы не меняются). Однако, прежде чем я получу первые данные, я не знаю, что такое столбцы. Но как только у меня есть голова, я могу это вывести.

Мне нужен второй Dataframe с указанными столбцами: Id, Time, RawHexData, NewCol1, ..., NewCol3.

«самый легкий» метод, который я могу думать, чтобы сделать это: 1. Deserialize каждая строка в формате JSON (каждый tyoe данные сериализуем здесь) 2. добавить свои новые столбцы, 3. десериализации новый dataframe от измененный json,

Однако это кажется пустой тратой, поскольку это включает в себя 2 дорогостоящих и избыточных этапа сериализации json. Я ищу более чистый рисунок.

Использование классов case, кажется плохой идеей, потому что я не знаю количество столбцов или имена столбцов заранее.

+1

Не могли бы вы предоставить более подробную информацию? Пример данных, содержащихся в «RawHexdata». – zero323

+0

вы всегда можете применить функцию '.withColumn()' только после того, как выполнены некоторые условия. – Niemand

+0

Rawhexdata - это гигантский двоичный блок, отправленный связью встроенных устройств. I содержит данные, которые будут десериализованы в другие плоские числовые данные: удваивает, ints, комплексные числа и т. Д. Позже я хотел бы позволить аналитику запросить эти данные с помощью Sparksql. Однако, когда данные находятся в блоке, это невозможно, поэтому я написал UDF «parseblob», который берет blob и возвращает объект map/json (я могу изменить тип возврата в соответствии с душой). Я хотел бы, чтобы содержимое этой карты было столбцами в другой таблице, где каждая строка связана с исходными необработанными данными. – eshalev

ответ

2

Что вы можете сделать для динамического расширения вашего DataFrame - это работать со строкой RDD, которую вы можете получить, позвонив по телефону dataFrame.rdd. Имея экземпляр , вы можете получить доступ к столбцу RawHexdata и проанализировать содержащиеся данные. Добавив вновь проанализированные столбцы в итоговый , вы почти решили свою проблему. Единственное, что необходимо для преобразования RDD[Row] обратно в DataFrame, это генерировать данные схемы для ваших новых столбцов. Вы можете сделать это, собирая одно значение RawHexdata на вашем драйвере, а затем извлечение типов столбцов.

Следующий код иллюстрирует этот подход.

object App { 

    case class Person(name: String, age: Int) 

    def main(args: Array[String]) { 
    val sparkConf = new SparkConf().setAppName("Test").setMaster("local[4]") 
    val sc = new SparkContext(sparkConf) 
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._ 

    val input = sc.parallelize(Seq(Person("a", 1), Person("b", 2))) 
    val dataFrame = input.df 

    dataFrame.show() 

    // create the extended rows RDD 
    val rowRDD = dataFrame.rdd.map{ 
     row => 
     val blob = row(1).asInstanceOf[Int] 
     val newColumns: Seq[Any] = Seq(blob, blob * 2, blob * 3) 
     Row.fromSeq(row.toSeq.init ++ newColumns) 
    } 

    val schema = dataFrame.schema 

    // we know that the new columns are all integers 
    val newColumns = StructType{ 
     Seq(new StructField("1", IntegerType), new StructField("2", IntegerType), new StructField("3", IntegerType)) 
    } 

    val newSchema = StructType(schema.init ++ newColumns) 

    val newDataFrame = sqlContext.createDataFrame(rowRDD, newSchema) 

    newDataFrame.show() 
    } 
} 
+0

Спасибо, хотя я не знаю тип каждого конкретного численного значения. Я могу добавить «переключатель» и построить функцию Seq – eshalev

+0

Точно @eshalev, считая, что все ваши «RawHexdata» содержат одни и те же столбцы, вы можете собрать один объект «RawHexdata» и рассчитать типы данных для полученных столбцов. –

0

SELECT является вашим другом, чтобы решить эту проблему, не возвращаясь к РДУ.

case class Entry(Id: String, Time: Long) 

val entries = Seq(
    Entry("x1", 100L), 
    Entry("x2", 200L) 
) 

val newColumns = Seq("NC1", "NC2", "NC3") 

val df = spark.createDataFrame(entries) 
    .select(col("*") +: (newColumns.map(c => lit(null).as(c))): _*) 

df.show(false) 

+---+----+----+----+----+ 
|Id |Time|NC1 |NC2 |NC3 | 
+---+----+----+----+----+ 
|x1 |100 |null|null|null| 
|x2 |200 |null|null|null| 
+---+----+----+----+----+ 
Смежные вопросы