2016-11-24 2 views
7

У меня есть две темы Kafka, которые передают точный контент из разных источников, поэтому я могу иметь высокую доступность в случае сбоя одного из источников. Я пытаюсь объединить 2 темы в 1 тему с использованием Kafka Streams 0.10.1.0, так что я не пропускаю никаких сообщений о сбоях и нет дубликатов, когда все источники работают.Слияние нескольких одинаковых тем потока Кафки

При использовании метода KStream leftJoin одна из тем может спуститься без проблем (вторичная тема), но когда основная тема опускается, в тему вывода ничего не отправляется. Это, кажется, потому что, в соответствии с Kafka Streams developer guide,

KStream-KStream LeftJoin всегда приводится в отчетах, поступающих из первичного потока

так, если нет записи, поступающей из первичного потока, его не будут использовать записи из вторичного потока, даже если они существуют. Как только основной поток возвращается в сеть, вывод возобновляется нормально.

Я также попытался с помощью outerJoin (который добавляет повторяющиеся записи) с последующим преобразованием в KTable и groupByKey избавиться от дубликатов,

KStream mergedStream = stream1.outerJoin(stream2, 
    (streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1, 
    JoinWindows.of(2000L)) 

mergedStream.groupByKey() 
      .reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore)) 
      .toStream((key,value) -> value) 
      .to(outputStream) 

, но я все еще получаю дубликаты один раз в то время. Я также использую commit.interval.ms=200, чтобы получить KTable для отправки в выходной поток достаточно часто.

Что было бы лучшим способом приблизиться к этому слиянию, чтобы получить ровно один раз вывод из нескольких одинаковых тем ввода?

+0

В общем, я бы рекомендовал API-интерфейс процессора решить эту проблему. Вы также можете попробовать перейти к текущей версии 'trunk' (не уверен, что это возможно для вас). Joins были переработаны, и это может решить вашу проблему: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics. Новая семантика соединения будет включена в Kafka '0.10.2', которая имеет целевую дату выхода с января 2017 года (https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan). –

+0

@ MatthiasJ.Sax Я переключился на багажник, и похоже, что 'leftJoin' теперь ведет себя как' outerJoin' для KStream-KStream, поэтому я думаю, что вернусь к семантике 10.1. Теперь я пытаюсь создать фальшивый поток, который выводит значения null, которые я буду использовать в качестве основного в leftJoin с тем, что раньше было основным, и использовать это объединение в leftJoin со вторичным. Надеюсь, это приведет к тому, что всегда будут иметь значения в основном потоке, даже когда мой основной файл не работает (так как я просто получаю null от первого leftJoin). –

+0

Новый 'leftJoin' запускается с обеих сторон, так как старый' внешнийJoin' тоже сделал (я думаю, это то, что вы подразумеваете под "похоже, что leftJoin теперь ведет себя как внешнийJoin"?) - это ближе к семантике SQL, чем старый 'leftJoin', но' leftJoin' по-прежнему отличается от 'outerJoin': если правая сторона запускает и не находит партнера по объединению, он отбрасывает запись, и результат не испускается , –

ответ

5

Использование любого типа соединения не решит вашу проблему, так как вы всегда будете иметь либо отсутствующий результат (внутреннее соединение в случае, если какие-то потоковые киоски), либо «дубликаты» с null (левое соединение или внешнее соединение в случай, когда оба потока находятся в режиме онлайн). См. https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics для получения подробной информации о семантике присоединения в потоках Кафки.

Таким образом, я бы рекомендовал использовать процессор API, который можно смешивать и матч с DSL с помощью KStreamprocess(), transform() или transformValues(). См. How to filter keys and value with a Processor using Kafka Stream DSL для получения более подробной информации.

Вы также можете добавить нестандартный магазин в ваш процессор (How to add a custom StateStore to the Kafka Streams DSL processor?), чтобы сделать дубликат-фильтрацию отказоустойчивой.

Смежные вопросы