2016-09-22 3 views
0

Я хочу сохранить DStream в HDFS, используя формат паркета. Проблема в том, что в моем классе case используется joda.DateTime, в то время как Spark SQL этого не поддерживает. Например:Преобразование DStream класса case с joda.DateTime в Spark DataFrame

case class Log (timestamp: DateTime, ...dozen of other fields here...) 

Но я получил сообщение об ошибке: java.lang.UnsupportedOperationException: Схема для типа org.joda.time.DateTime не поддерживается при попытке преобразовать RDD в DF:

def output(logdstream: DStream[Log]) { 
     logdstream.foreachRDD(elem => { 
      val df = elem.toDF() 
      df.saveAsParquet(...) 
     }); 
    } 

Мои модели сложны и имеют много полей, поэтому я не хочу писать разные классы case, чтобы избавиться от joda.DateTime. Другим вариантом будет сохранение непосредственно от json до паркета, но это не идеально. Есть ли простой способ сделать автоматическое преобразование из joda.DateTime в sql.Timestamp, которое будет использоваться с искровым (конвертировать в DataFrame Spark).

Спасибо.

ответ

0

Это немного многословно, но отображающие попыток вход Спарк SQL Row:

logdstream.foreachRDD(rdd => { 
    rdd.map(log => Row(
    log.timestamp.toDate, 
    log.field2, 
    ... 
)).toDF().saveAsParquest(...) 
}) 
+0

Привет, Не уверен, что, если я правильно понимаю. Но ошибка возникает в выражении: val df = elem.toDF(); другими словами, я не могу преобразовать RDD [Log] в dataframe, используя функцию .toDF(). Ваше предлагаемое решение, похоже, предполагает, что df уже доступен? – auxdx

+0

Вы правы, я пропустил это. Я изменил ответ. – bear911