2015-12-23 3 views
4

Как получить текущую временную метку партии (DStream) в потоке Spark?Как получить текущую временную метку в потоке Spark

У меня есть приложение для искрообразования, в котором входные данные будут проходить множество преобразований.

Мне нужна текущая временная метка во время выполнения, чтобы проверить временную метку во входных данных.

Если сравнивать с текущим временем, то временная метка может отличаться от каждого преобразования преобразования RDD.

Есть ли способ получить временную метку, когда запущена специальная партия микрочипов Spark или какой интервал ее периодической загрузки?

+0

Привет, вы нашли ответ для этого? –

ответ

5
dstream.foreachRDD((rdd, time)=> { 
    // time is scheduler time for the batch job.it's interval was your window/slide length. 
}) 
+2

Спасибо. Я особенно ищут преобразования, такие как flatmap, filter .... –

+0

Это работает для Scala, но, к сожалению, пока не похоже на эквивалент Pyspark: https://spark.apache.org/docs/latest/api/ python/pyspark.streaming.html # pyspark.streaming.DStream.foreachRDD. Мне пришлось использовать 'datetime.datetime.now()' как обходной путь внутри функции, переданной 'foreachRDD()'. – snark

1
dstream.transform(
    (rdd, time) => { 
     rdd.map(
      (time, _) 
     ) 
    } 
).filter(...) 
1

Поздний ответ ... но все же, если это поможет кому-то, метка времени может быть извлечена в миллисекундах. Сначала определим функцию с помощью Java API для форматирования:

Использование Java 7:

def returnFormattedTime(ts: Long): String = { 
    val date = new Date(ts) 
    val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") 
    val formattedDate = formatter.format(date) 
    formattedDate 
} 

Или же, используя Java 8:

def returnFormattedTime(ts: Long): String = { 
    val date = Instant.ofEpochMilli(ts) 
    val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneId.systemDefault()) 
    val formattedDate = formatter.format(date) 
    formattedDate 
} 

Наконец, используйте метод foreachRDD, чтобы получить временную метку:

dstreamIns.foreachRDD((rdd, time) => 
    .... 
    println(s"${returnFormattedTime(time.milliseconds)}") 
    .... 
) 
Смежные вопросы