У меня есть две темы Kafka, которые передают точный контент из разных источников, поэтому я могу иметь высокую доступность в случае сбоя одного из источников. Я пытаюсь объединить 2 темы в 1 тему с использованием Kafka Streams 0.10.1.0, так что я не пропускаю никаких сообщений о сбоях и нет дубликатов, когда все источники работают.Слияние нескольких одинаковых тем потока Кафки
При использовании метода KStream leftJoin
одна из тем может спуститься без проблем (вторичная тема), но когда основная тема опускается, в тему вывода ничего не отправляется. Это, кажется, потому что, в соответствии с Kafka Streams developer guide,
KStream-KStream LeftJoin всегда приводится в отчетах, поступающих из первичного потока
так, если нет записи, поступающей из первичного потока, его не будут использовать записи из вторичного потока, даже если они существуют. Как только основной поток возвращается в сеть, вывод возобновляется нормально.
Я также попытался с помощью outerJoin
(который добавляет повторяющиеся записи) с последующим преобразованием в KTable и groupByKey избавиться от дубликатов,
KStream mergedStream = stream1.outerJoin(stream2,
(streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1,
JoinWindows.of(2000L))
mergedStream.groupByKey()
.reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore))
.toStream((key,value) -> value)
.to(outputStream)
, но я все еще получаю дубликаты один раз в то время. Я также использую commit.interval.ms=200
, чтобы получить KTable для отправки в выходной поток достаточно часто.
Что было бы лучшим способом приблизиться к этому слиянию, чтобы получить ровно один раз вывод из нескольких одинаковых тем ввода?
В общем, я бы рекомендовал API-интерфейс процессора решить эту проблему. Вы также можете попробовать перейти к текущей версии 'trunk' (не уверен, что это возможно для вас). Joins были переработаны, и это может решить вашу проблему: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics. Новая семантика соединения будет включена в Kafka '0.10.2', которая имеет целевую дату выхода с января 2017 года (https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan). –
@ MatthiasJ.Sax Я переключился на багажник, и похоже, что 'leftJoin' теперь ведет себя как' outerJoin' для KStream-KStream, поэтому я думаю, что вернусь к семантике 10.1. Теперь я пытаюсь создать фальшивый поток, который выводит значения null, которые я буду использовать в качестве основного в leftJoin с тем, что раньше было основным, и использовать это объединение в leftJoin со вторичным. Надеюсь, это приведет к тому, что всегда будут иметь значения в основном потоке, даже когда мой основной файл не работает (так как я просто получаю null от первого leftJoin). –
Новый 'leftJoin' запускается с обеих сторон, так как старый' внешнийJoin' тоже сделал (я думаю, это то, что вы подразумеваете под "похоже, что leftJoin теперь ведет себя как внешнийJoin"?) - это ближе к семантике SQL, чем старый 'leftJoin', но' leftJoin' по-прежнему отличается от 'outerJoin': если правая сторона запускает и не находит партнера по объединению, он отбрасывает запись, и результат не испускается , –