Я пытаюсь преобразовать некоторый ввод в формат, который я хочу, в искровом информационном кадре. вход У меня есть последовательность этого случая класса с до 10000000 классов (или, возможно, также струнные Json, прежде чем я преобразовать его в класс случае ..):Как получить световой информационный кадр apache в правильном формате
case class Element(paramName: String, value: Int, time: Int)
В результате я хочу dataframe как это:
|Time | ParamA | ParamB | ParamC | Param 10,000 |
|1000 | 432432 | 8768768 | Null....... | 75675678622 |
|2000 | Null.......| Null.........| 734543 | Null................. |
....
Так что не каждый параметр должен иметь, чтобы быть определены для всех временных интервалов. Недостающие значения должны быть заполнены нулем. И, вероятно, будет 10 000 параметров и около 1000 временных интервалов.
Как мне сделать это прямо сейчас, кажется, очень плохо с эффективностью:
case class Elements(name: String, value: Int, time: Int)
case class GroupedObjects(time: Int, params: (String, Int)*)
//elements contains the seq of Element
val elementsRdd: RDD[Elements] = sc.parallelize(elements)
val groupedRDD: RDD[GroupedObjects] = elementsRdd
.groupBy(element => element.time)
.map(tuple => GroupedObjects(tuple._1, tuple._2.map(element =>
(element.name, element.value)).toSeq: _*))
//transforming back to json string to get right format for RDD
val jsonRDD: RDD[String] = groupedRDD.map { obj =>
"{\"time\":" + obj.time + obj.params.map(tuple =>
",\"" + tuple._1 + "\":" + tuple._2).reduce(_ + _) + "}"
}
val df = sqlContext.read.json(jsonRDD).orderBy("time")
df.show(10)
Проблема, которую я вижу здесь, безусловно, изменение обратно в строку, только читать его снова в праве формат. Я был бы очень рад за любую помощь, показывающую мне, как получить класс входных данных в формате требуемого формата данных.
Так как я делаю это прямо сейчас, это очень медленно, и я получаю исключение размера кучи для 10 000 000 строк ввода.
Эй, Дэвид, благодарю вас за ваш ответ. Я просто попробовал это следующим образом: 'val dfTransformed = df.groupBy (« time »). Pivot (« name »). Sum (« value »)' Сумма - это просто возврат данных. Это работает очень хорошо для множества временных меток и нескольких параметров (время от 1 до 1000000 и params 2), но для многих параметров и меньше временных меток это не очень хорошо работает. Мой usecase будет 1000 временных меток и 10 000 различных параметров. Например, для 100 временных меток и 10000 это берет навсегда и приводит к исключению пространства кучи java даже быстрее, чем мой первый подход. Я делаю это неправильно? – rincewind
Например, с моим первым подходом я мог выполнить 10 000 параметров с 100 временными метками и получить исключение кучи при размещении до 1000 временных меток. С поворотным подходом я получаю исключение кучи уже с 10 000 параметров и 100 временных меток. – rincewind
Я не думаю, что вы делаете что-то неправильно. Честно говоря, 1.6 относительно новый - другой ответ, создающий ряд объектов на лету, - это то, с чем я больше знаком. Вы пробовали этот путь? Поскольку это новая функциональность, вы, возможно, подталкиваете ее к тому, для чего она предназначена. Возможно, вы захотите задать конкретный вопрос: почему «pivot» намного медленнее, чем другой подход.При этом я понимаю, что Spark рассчитан на длинные, тонкие данные, а не на короткие, широкие данные. Наверное, не будет весело, как правило, иметь 1 000 000 столбцов в Spark. –