2017-01-22 2 views
8

Есть ли встроенная функция в потоки Kafka, которая позволяет динамически подключать один входной поток к нескольким выходным потокам? KStream.branch позволяет разветвляться на основе истинных/ложных предикатов, но это не совсем то, что я хочу. Я бы хотел, чтобы каждый входящий журнал определял тему, в которую он будет транслироваться во время выполнения, например, журнал {"date": "2017-01-01"} будет транслироваться по теме topic-2017-01-01, а журнал {"date": "2017-01-02"} будет транслироваться по теме topic-2017-01-02.Динамическое подключение входного потока Kafka к нескольким выходным потокам

Я мог бы позвонить forEach в поток, а затем написать продюсеру Kafka, но это не кажется очень изящным. Есть ли лучший способ сделать это в рамках Streams?

+0

Что вы подразумеваете под «на основе строки» - btw: 'KStream.branch' принимает несколько предикатов (ваш вопрос указывает, что вы пропустили это). Поэтому 'branch' должен позволять делать то, что вы хотите. Может быть, вы можете привести пример данных? –

+2

Я должен быть более ясным. Я знаю, что он принимает несколько предикатов - это было бы прекрасным решением, если бы у меня было фиксированное количество тем, которые я хотел бы передать. Однако то, что я хочу сделать, это написать на темы с именем 'foo- {date}'. – kellanburket

ответ

4

Если вы хотите динамически создавать темы на основе ваших данных, вы не получаете поддержки в потоковом API Kafka на данный момент (v0.10.2 и ранее). Вам нужно будет создать KafkaProducer и реализовать свою динамическую «маршрутизацию» самостоятельно (например, используя KStream#foreach() или KStream#process()). Обратите внимание, что вам нужно делать синхронные записи, чтобы избежать потери данных (которые, к сожалению, не очень эффективны). Планируется расширить Streaming API с динамической маршрутизацией темы, но конкретная временная шкала для этой функции пока отсутствует.

Существует еще одно соображение, которое вы должны принять во внимание. Если вы не знаете свою целевую категорию (-а) раньше времени и просто полагаетесь на так называемую функцию автосоздания темы, вы должны убедиться, что эти темы создаются с требуемыми настройками конфигурации (например, количество разделов или коэффициент репликации).

В качестве альтернативы «созданию темы» вы также можете использовать Admin Client (доступный с v0.10.1) для создания тем с правильной конфигурацией. См. https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations

+0

Можете ли вы дать нам ссылку на динамические планы маршрутизации темы - я хотел бы следить за прогрессом - спасибо – AutomatedMike

+0

https://issues.apache.org/jira/browse/KAFKA-4936 –

Смежные вопросы