2016-03-15 3 views
1

Я пытаюсь преобразовать некоторый ввод в формат, который я хочу, в искровом информационном кадре. вход У меня есть последовательность этого случая класса с до 10000000 классов (или, возможно, также струнные Json, прежде чем я преобразовать его в класс случае ..):Как получить световой информационный кадр apache в правильном формате

case class Element(paramName: String, value: Int, time: Int) 

В результате я хочу dataframe как это:

|Time | ParamA | ParamB | ParamC | Param 10,000 | 
|1000 | 432432 | 8768768 | Null....... | 75675678622 | 
|2000 | Null.......| Null.........| 734543 | Null................. | 

....
Так что не каждый параметр должен иметь, чтобы быть определены для всех временных интервалов. Недостающие значения должны быть заполнены нулем. И, вероятно, будет 10 000 параметров и около 1000 временных интервалов.

Как мне сделать это прямо сейчас, кажется, очень плохо с эффективностью:

case class Elements(name: String, value: Int, time: Int) 

case class GroupedObjects(time: Int, params: (String, Int)*) 

//elements contains the seq of Element 
val elementsRdd: RDD[Elements] = sc.parallelize(elements) 
val groupedRDD: RDD[GroupedObjects] = elementsRdd 
    .groupBy(element => element.time) 
    .map(tuple => GroupedObjects(tuple._1, tuple._2.map(element => 
    (element.name, element.value)).toSeq: _*)) 

//transforming back to json string to get right format for RDD 
val jsonRDD: RDD[String] = groupedRDD.map { obj => 
    "{\"time\":" + obj.time + obj.params.map(tuple => 
    ",\"" + tuple._1 + "\":" + tuple._2).reduce(_ + _) + "}" 
} 
val df = sqlContext.read.json(jsonRDD).orderBy("time") 
df.show(10) 

Проблема, которую я вижу здесь, безусловно, изменение обратно в строку, только читать его снова в праве формат. Я был бы очень рад за любую помощь, показывающую мне, как получить класс входных данных в формате требуемого формата данных.
Так как я делаю это прямо сейчас, это очень медленно, и я получаю исключение размера кучи для 10 000 000 строк ввода.

ответ

2

Вы могли бы попытаться построить объекты строк и определить схему RDD вручную, что-то вроде следующего примера:

// These extra imports will be required if you don't have them already 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.{IntegerType, StructField, StructType} 

//elements contains the seq of Element 
val elementsRdd = sc.parallelize(elements) 

val columnNames = elementsRdd.map(_.name).distinct().collect().sorted 

val pivoted = elementsRdd.groupBy(_.time).map { 
    case (time, elemsByTime) => 
    val valuesByColumnName = elemsByTime.groupBy(_.name).map { 
     case (name, elemsByTimeAndName) => (name, elemsByTimeAndName.map(_.value).sum) 
    } 
    val allValuesForRow = columnNames.map(valuesByColumnName.getOrElse(_, null)) 
    (time, allValuesForRow) 
} 

val schema = StructType(StructField("Time", IntegerType) :: columnNames.map(columnName => StructField(columnName, IntegerType, nullable = true)).toList) 
val rowRDD = pivoted.map(p => Row.fromSeq(p._1 :: p._2.toList)) 
val df = sqlContext.createDataFrame(rowRDD, schema) 
df.show(10) 

Я попытался это локально 10000000 элементов, как это:

val elements = (1 to 10000000).map(i => Element("Param" + (i % 1000).toString, i + 100, i % 10000)) 

И это успешно завершается в разумные сроки.

2

As of Spark 1.6 есть функция pivot. Он работает с DataFrames. Так как вы используете классы случае, это так же просто, как:

val elementsRdd: RDD[Elements] = sc.parallelize(elements) 
val elementsDF = elementsRdd.toDF() 

Вы можете сделать:

elementsDF.groupBy($"time").pivot(...) 

Смотрите документацию для GroupedData больше на pivot(), но это должно быть более чем достаточно для вы должны продолжать.

+0

Эй, Дэвид, благодарю вас за ваш ответ. Я просто попробовал это следующим образом: 'val dfTransformed = df.groupBy (« time »). Pivot (« name »). Sum (« value »)' Сумма - это просто возврат данных. Это работает очень хорошо для множества временных меток и нескольких параметров (время от 1 до 1000000 и params 2), но для многих параметров и меньше временных меток это не очень хорошо работает. Мой usecase будет 1000 временных меток и 10 000 различных параметров. Например, для 100 временных меток и 10000 это берет навсегда и приводит к исключению пространства кучи java даже быстрее, чем мой первый подход. Я делаю это неправильно? – rincewind

+0

Например, с моим первым подходом я мог выполнить 10 000 параметров с 100 временными метками и получить исключение кучи при размещении до 1000 временных меток. С поворотным подходом я получаю исключение кучи уже с 10 000 параметров и 100 временных меток. – rincewind

+0

Я не думаю, что вы делаете что-то неправильно. Честно говоря, 1.6 относительно новый - другой ответ, создающий ряд объектов на лету, - это то, с чем я больше знаком. Вы пробовали этот путь? Поскольку это новая функциональность, вы, возможно, подталкиваете ее к тому, для чего она предназначена. Возможно, вы захотите задать конкретный вопрос: почему «pivot» намного медленнее, чем другой подход.При этом я понимаю, что Spark рассчитан на длинные, тонкие данные, а не на короткие, широкие данные. Наверное, не будет весело, как правило, иметь 1 000 000 столбцов в Spark. –