2015-04-24 3 views
15

У меня есть CSV, в котором поле имеет дату и время в определенном формате. Я не могу импортировать его непосредственно в свой Dataframe, потому что это должна быть метка времени. Так что я импортировать его в виде строки и преобразовать его в Timestamp как этотЛучший способ преобразования строкового поля в метку времени в Spark

import java.sql.Timestamp 
import java.text.SimpleDateFormat 
import java.util.Date 
import org.apache.spark.sql.Row 

def getTimestamp(x:Any) : Timestamp = { 
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss") 
    if (x.toString() == "") 
    return null 
    else { 
     val d = format.parse(x.toString()); 
     val t = new Timestamp(d.getTime()); 
     return t 
    } 
} 

def convert(row : Row) : Row = { 
    val d1 = getTimestamp(row(3)) 
    return Row(row(0),row(1),row(2),d1) 
} 

Есть лучше, более краткий способ сделать это с API Dataframe или искровым SQL? Вышеупомянутый метод требует создания RDD и снова предоставить схему для Dataframe.

ответ

6

Я не играл с искровым SQL, но пока я думаю, что это было бы более идиоматическим Scala (нулевое использование не считается хорошей практикой):

def getTimestamp(s: String) : Option[Timestamp] = s match { 
    case "" => None 
    case _ => { 
    val format = new SimpleDateFormat("MM/dd/yyyy' 'HH:mm:ss") 
    Try(new Timestamp(format.parse(s).getTime)) match { 
     case Success(t) => Some(t) 
     case Failure(_) => None 
    }  
    } 
} 

Пожалуйста, обратите внимание, я полагаю, вы знаете типы элементов заранее (если вы читаете его из csv-файла, все они String), поэтому я использую подходящий тип, например String, а не Any (все подтип Any).

Это также зависит от того, как вы хотите обрабатывать исключения синтаксического анализа. В этом случае, если возникает исключение синтаксического анализа, возвращается None.

Вы можете использовать его в дальнейшем с:

rows.map(row => Row(row(0),row(1),row(2), getTimestamp(row(3)) 
+0

Я сделал это раньше. Я чувствовал, что должен обратиться к основной проблеме, прежде чем переходить к таким тонкостям. Если есть лучшее решение, возможно, не придется делать это вообще. Проблема заключается в строке rows.map, которая возвращает rdd и должна быть преобразована в ddf. Так может быть, что ddf api отсутствует или я не знаю, как это сделать. – user568109

+0

Я не знаю, есть ли другой способ, но вы можете без проблем конвертировать любой RDD в DF. В этом конкретном примере с 'sqlContext.createDataFrame (rowRDD, schema)'. Для меня искра sql приятно запрашивать ваши данные по-подобному SQL-обращению, а не анализировать сами данные (например, использовать простые RDD-файлы). – jarandaf

+0

Попробуйте (новая отметка времени (формат.parse (s) .getTime)). ToOption – nont

1

Я хотел бы переместить метод getTimeStamp написал вами в mapPartitions РДДА и повторно GenericMutableRow между строками в итераторе:

val strRdd = sc.textFile("hdfs://path/to/cvs-file") 
val rowRdd: RDD[Row] = strRdd.map(_.split('\t')).mapPartitions { iter => 
    new Iterator[Row] { 
    val row = new GenericMutableRow(4) 
    var current: Array[String] = _ 

    def hasNext = iter.hasNext 
    def next() = { 
     current = iter.next() 
     row(0) = current(0) 
     row(1) = current(1) 
     row(2) = current(2) 

     val ts = getTimestamp(current(3)) 
     if(ts != null) { 
     row.update(3, ts) 
     } else { 
     row.setNullAt(3) 
     } 
     row 
    } 
    } 
} 

И вы все равно должны использовать схему для создания DataFrame

val df = sqlContext.createDataFrame(rowRdd, tableSchema) 

Использование GenericMutableRow в реализации итератора можно было бы найти в Aggregate Operator, InMemoryColumnarTableScan, ParquetTableOperations и т.д.

+0

Это очень близко к моему фактическому коду. Кроме того, если вы хотите разобрать файл csv, скорее всего, вы должны использовать spark-csv вместо split. То, что я хотел сделать, это добавить и изменить столбцы, которые возвратят вам rdd, который снова нужно будет преобразовать в ddf, указав схему. Есть более короткий путь. – user568109

+0

@ user568109, я не думаю, что есть один. Поскольку spark-sql потребуется схема, она должна каким-то образом получить ее. Если вы используете RDD [CaseClassX], spark-sql автоматически выведет схему для вас, из определения класса case. Но вы используете здесь строку (Array [Any]), никакой вывод DataType не может идти туда, поэтому вы просто передаете один. –

+0

Я думаю, что использование одной ссылки, каждый раз изменяя ее и возвращая ее в качестве ссылки, является рецептом катастрофы. Вы действительно использовали этот подход успешно? – maasg

1

У меня есть ISO8601 метку времени в моем наборе данных, и мне нужно, чтобы преобразовать его в формат «ГГГГ-ММ-ДД». Это то, что я сделал:

import org.joda.time.{DateTime, DateTimeZone} 
object DateUtils extends Serializable { 
    def dtFromUtcSeconds(seconds: Int): DateTime = new DateTime(seconds * 1000L, DateTimeZone.UTC) 
    def dtFromIso8601(isoString: String): DateTime = new DateTime(isoString, DateTimeZone.UTC) 
} 

sqlContext.udf.register("formatTimeStamp", (isoTimestamp : String) => DateUtils.dtFromIso8601(isoTimestamp).toString("yyyy-MM-dd")) 

И вы можете просто использовать UDF в своем искровом SQL-запросе.

31

Спарк> = 2,2

import org.apache.spark.sql.functions.to_timestamp 

val ts = to_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") 
df.withColumn("ts", ts).show(2, false) 

df.withColumn("ts", ts).show(2, false) 

// +---+-------------------+-------------------+ 
// |id |dts    |ts     | 
// +---+-------------------+-------------------+ 
// |1 |05/26/2016 01:01:01|2016-05-26 01:01:01| 
// |2 |#[email protected]#@#    |null    | 
// +---+-------------------+-------------------+ 

Спарк> = 1,6, < 2,2

Вы можете использовать функции обработки даты, которые были введены в Спарк 1.5. Предполагая, что вы следующие данные:

val df = Seq((1L, "05/26/2016 01:01:01"), (2L, "#[email protected]#@#")).toDF("id", "dts") 

Вы можете использовать unix_timestamp для разбора строк и приведение его в отметку времени

import org.apache.spark.sql.functions.unix_timestamp 

val ts = unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("timestamp") 

df.withColumn("ts", ts).show(2, false) 

// +---+-------------------+---------------------+ 
// |id |dts    |ts     | 
// +---+-------------------+---------------------+ 
// |1 |05/26/2016 01:01:01|2016-05-26 01:01:01.0| 
// |2 |#[email protected]#@#    |null     | 
// +---+-------------------+---------------------+ 

Как вы можете видеть, что охватывает как синтаксический анализ и обработка ошибок.

Spark> = 1.5, < 1,6

Вы должны будете использовать использовать что-то вроде этого:

unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss").cast("double").cast("timestamp") 

или

(unix_timestamp($"dts", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp") 

из-за SPARK-11724.

Спарк < 1,5

вы должны быть в состоянии использовать их с expr и HiveContext.

0

Я хотел бы использовать https://github.com/databricks/spark-csv

Это будет выводить метки времени для вас.

import com.databricks.spark.csv._ 
val rdd: RDD[String] = sc.textFile("csvfile.csv") 

val df : DataFrame = new CsvParser().withDelimiter('|') 
     .withInferSchema(true) 
     .withParseMode("DROPMALFORMED") 
     .csvRdd(sqlContext, rdd) 
Смежные вопросы