Это довольно прямолинейно, с некоторыми оговорками. Во-первых, это помогает понять, как это работает со стороны Кафки.
Kafka управляет так называемыми смещениями - каждое сообщение в Kafka имеет смещение относительно его положения в разделе. (Разделы являются логическими делениями темы.) Первое сообщение в разделе имеет смещение 0L
, второе - 1L
и т. Д. За исключением того, что из-за опрокидывания журнала и, возможно, сжатия темы, 0L
не всегда является самым ранним смещением в раздел.
Первое, что вам нужно сделать, это собрать смещения для всех разделов, которые вы хотите прочитать с самого начала. Вот функция, которая делает это:
def getOffsets(consumer: SimpleConsumer, topic: String, partition: Int) : (Long,Long) = {
val time = kafka.api.OffsetRequest.LatestTime
val reqInfo = Map[TopicAndPartition,PartitionOffsetRequestInfo](
(new TopicAndPartition(topic, partition)) -> (new PartitionOffsetRequestInfo(time, 1000))
)
val req = new kafka.javaapi.OffsetRequest(
reqInfo, kafka.api.OffsetRequest.CurrentVersion, "test"
)
val resp = consumer.getOffsetsBefore(req)
val offsets = resp.offsets(topic, partition)
(offsets(offsets.size - 1), offsets(0))
}
Вы назвали бы это так:
val (firstOffset,nextOffset) = getOffsets(consumer, "MyTopicName", 0)
Для всего, что вы когда-либо хотели знать о получении коррекций от Кафки, read this. Это загадочно, мягко говоря. (Дайте мне знать, когда вы в полной мере понять, второй аргумент PartitionOffsetRequestInfo
, например.)
Теперь, когда у вас есть firstOffset
и lastOffset
раздела вы хотите посмотреть на исторически, вы затем использовать параметр createDirectStream
fromOffset
, который типа: fromOffset: Map[TopicAndPartition, Long]
. Вы должны установить значение Long
/в firstOffset
, которое вы получили от getOffsets()
.
Что касается nextOffset
- вы можете использовать это для определения в своем потоке при переходе от обработки исторических данных к новым данным. Если msg.offset == nextOffset
, то вы обрабатываете первую неисторическую запись в разделе.
Теперь для предостережений, непосредственно from the documentation:
- После начала контекст, никаких новых потоковых вычислений не может быть настроить или добавить к нему.
- Как только контекст был остановлен, он не может быть перезаписан .
- Только один StreamingContext может быть активным в JVM на одновременно.
- stop() в StreamingContext также останавливает SparkContext.К остановите только StreamingContext, установите необязательный параметр stop() с именем stopSparkContext на false.
- SparkContext могут быть повторно использованы для создания нескольких StreamingContexts, до тех пор, как предыдущий StreamingContext останавливается (без остановки) SparkContext перед созданием следующего StreamingContext.
Это из-за эти предостережения, что я захватить nextOffset
в то же время, как firstOffset
- так что я могу держать поток, но изменить контекст от обработки исторического до сегодняшнего времени.
Спасибо за подробный ответ. Я прочитал его и прочитаю его снова, чтобы понять его лучше. Между тем, я использую Spark 'скользящие окна' для подсчета времени. Как я могу это достичь? – yolgun