2016-03-18 4 views
0

У меня есть приложение Spark Streaming для анализа событий, поступающих от брокера Kafka. У меня есть правила, как показано ниже, и новые правила могут быть получены путем объединения существующих:Spark Streaming Replay

If this event type occurs raise an alert. 
If this event type occurs more than 3 times in a 5-minute interval, raise an alert. 

Параллельно я сохраняю все входящие данные на Кассандру. Мне нравится делать это потоковое приложение для исторических данных из Кассандры. Например,

<This rule> would have generated <these> alerts for <last week>. 

Есть ли способ сделать это в Spark или это в дорожной карте? Например, Apache Flink имеет обработку времени события. Но перенос существующей кодовой базы на нее кажется трудным, и я хотел бы решить эту проблему при повторном использовании моего существующего кода.

ответ

0

Это довольно прямолинейно, с некоторыми оговорками. Во-первых, это помогает понять, как это работает со стороны Кафки.

Kafka управляет так называемыми смещениями - каждое сообщение в Kafka имеет смещение относительно его положения в разделе. (Разделы являются логическими делениями темы.) Первое сообщение в разделе имеет смещение 0L, второе - 1L и т. Д. За исключением того, что из-за опрокидывания журнала и, возможно, сжатия темы, 0L не всегда является самым ранним смещением в раздел.

Первое, что вам нужно сделать, это собрать смещения для всех разделов, которые вы хотите прочитать с самого начала. Вот функция, которая делает это:

def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = { 
    val time = kafka.api.OffsetRequest.LatestTime 
    val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo](
    (new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000)) 
) 
    val req = new kafka.javaapi.OffsetRequest(
    reqInfo, kafka.api.OffsetRequest.CurrentVersion, "test" 
) 
    val resp = consumer.getOffsetsBefore(req) 
    val offsets = resp.offsets(topic, partition) 
    (offsets(offsets.size - 1), offsets(0)) 
} 

Вы назвали бы это так:

val (firstOffset,nextOffset) = getOffsets(consumer, "MyTopicName", 0) 

Для всего, что вы когда-либо хотели знать о получении коррекций от Кафки, read this. Это загадочно, мягко говоря. (Дайте мне знать, когда вы в полной мере понять, второй аргумент PartitionOffsetRequestInfo, например.)

Теперь, когда у вас есть firstOffset и lastOffset раздела вы хотите посмотреть на исторически, вы затем использовать параметр createDirectStreamfromOffset, который типа: fromOffset: Map[TopicAndPartition, Long]. Вы должны установить значение LongfirstOffset, которое вы получили от getOffsets().

Что касается nextOffset - вы можете использовать это для определения в своем потоке при переходе от обработки исторических данных к новым данным. Если msg.offset == nextOffset, то вы обрабатываете первую неисторическую запись в разделе.

Теперь для предостережений, непосредственно from the documentation:

  • После начала контекст, никаких новых потоковых вычислений не может быть настроить или добавить к нему.
  • Как только контекст был остановлен, он не может быть перезаписан .
  • Только один StreamingContext может быть активным в JVM на одновременно.
  • stop() в StreamingContext также останавливает SparkContext.К остановите только StreamingContext, установите необязательный параметр stop() с именем stopSparkContext на false.
  • SparkContext могут быть повторно использованы для создания нескольких StreamingContexts, до тех пор, как предыдущий StreamingContext останавливается (без остановки) SparkContext перед созданием следующего StreamingContext.

Это из-за эти предостережения, что я захватить nextOffset в то же время, как firstOffset - так что я могу держать поток, но изменить контекст от обработки исторического до сегодняшнего времени.

+0

Спасибо за подробный ответ. Я прочитал его и прочитаю его снова, чтобы понять его лучше. Между тем, я использую Spark 'скользящие окна' для подсчета времени. Как я могу это достичь? – yolgun