2016-10-27 3 views
0

Я хочу читать несколько кафка из флинка.Чтение из нескольких брокеров kafka с флинком

У меня есть кластер из 3 компьютеров для kafka. При следующей теме

Topic:myTopic PartitionCount:3 ReplicationFactor:1 Configs: 
Topic: myTopic Partition: 0 Leader: 2 Replicas: 2 Isr: 2 
Topic: myTopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0 
Topic: myTopic Partition: 2 Leader: 1 Replicas: 1 Isr: 1 

От Флинка я выполнить следующий код:

Properties properties = new Properties(); 
properties.setProperty("bootstrap.servers", "x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092"); 
properties.setProperty("group.id", "flink"); 

DataStream<T> stream = env.addSource(new FlinkKafkaConsumer09<>("myTopic", new SimpleStringSchema(), properties) 
stream.map(....) 
env.execute() 

я запускаю 3 раз ту же самую работу.

Если я выполняю этот код с одним брокером, это хорошо работает, но с 3 сломанными (на 3 разных машинах) считывается только один раздел.

(In this question) предложенное решения было

создавать отдельные экземпляры FlinkKafkaConsumer для каждого кластера (это то, что вы уже делаете), а затем объединение Получены потоками

Это не работает в мое дело.

Так что мои вопросы:

  1. ли я что-то отсутствует?
  2. Если бы у нас был новый компьютер в кластере Kafka, нам нужно изменить код флинка, чтобы добавить потребителя для нового borker? Или мы можем справиться с этим автоматически во время выполнения?

ответ

1

Возможно, вы неправильно поняли понятие распределенных потоков Кафки.

Тема Кафки состоит из нескольких разделов (3 в вашем случае). Каждый потребитель может потреблять один или несколько из этих разделов. Если вы запустите 3 экземпляра своего приложения с тем же group.id, каждый потребитель действительно прочитает данные только от одного брокера –, он попытается распределить нагрузку равномерно, так что это один раздел для каждого потребителя.

Рекомендую прочитать больше об этой теме, особенно о концепции групп потребителей в Kafka documentation.

В любом случае FlinkKafkaConsumer09 может работать в нескольких параллельных экземплярах, каждый из которых будет извлекать данные из одного или нескольких разделов Kafka. Вам не нужно беспокоиться о создании дополнительных экземпляров потребителя. Один экземпляр потребителя может извлекать записи из всех разделов.

Я понятия не имею, почему вы начинаете работу 3 раза вместо одного с параллелизмом, установленным в 3. Это решит вашу проблему.

DataStream<T> stream = 
     env.addSource(new FlinkKafkaConsumer09<>("myTopic", new SimpleStringSchema(), properties)) 
       .setParallelism(3); 
+0

Спасибо за ваш ответ. Я пробовал это, и это работает, но с помощью этого метода я не могу адаптировать свою работу к числу разделов в Кафке. Если я хочу масштабировать свое приложение, я должен остановить работу, а затем снова запустить ее. То, что я хотел сделать с моим решением, заключалось в том, чтобы динамически адаптировать параллелизм flink к числу разделов в kafka (то есть один раздел = одно задание). Знаете ли вы, как это сделать? – cju

+0

Flink действительно не может изменить уровень параллелизма во время выполнения. Но изменение количества разделов на одну тему в Kafka также не является автоматическим, и вы не будете делать это очень часто.Я не вижу проблемы с перезапуском задания Flink с другой настройкой параллелизма. – vanekjar

+0

В kafka мы можем [изменить тему во время выполнения] (https://kafka.apache.org/documentation#basic_ops_modify_topic) (единственное ограничение - «_adding разделы не меняют разбиение существующих данных». В определенной ситуации, Я могу думать о другом случае использования, когда мы не хотим перезапускать флинк, чтобы масштабировать работу, и я пытаюсь найти способ (если возможно), чтобы обеспечить непрерывный сервис. Но, возможно, это невозможно. – cju