Я хочу читать несколько кафка из флинка.Чтение из нескольких брокеров kafka с флинком
У меня есть кластер из 3 компьютеров для kafka. При следующей теме
Topic:myTopic PartitionCount:3 ReplicationFactor:1 Configs:
Topic: myTopic Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: myTopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: myTopic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
От Флинка я выполнить следующий код:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "x.x.x.x:9092,x.x.x.x:9092,x.x.x.x:9092");
properties.setProperty("group.id", "flink");
DataStream<T> stream = env.addSource(new FlinkKafkaConsumer09<>("myTopic", new SimpleStringSchema(), properties)
stream.map(....)
env.execute()
я запускаю 3 раз ту же самую работу.
Если я выполняю этот код с одним брокером, это хорошо работает, но с 3 сломанными (на 3 разных машинах) считывается только один раздел.
(In this question) предложенное решения было
создавать отдельные экземпляры FlinkKafkaConsumer для каждого кластера (это то, что вы уже делаете), а затем объединение Получены потоками
Это не работает в мое дело.
Так что мои вопросы:
- ли я что-то отсутствует?
- Если бы у нас был новый компьютер в кластере Kafka, нам нужно изменить код флинка, чтобы добавить потребителя для нового borker? Или мы можем справиться с этим автоматически во время выполнения?
Спасибо за ваш ответ. Я пробовал это, и это работает, но с помощью этого метода я не могу адаптировать свою работу к числу разделов в Кафке. Если я хочу масштабировать свое приложение, я должен остановить работу, а затем снова запустить ее. То, что я хотел сделать с моим решением, заключалось в том, чтобы динамически адаптировать параллелизм flink к числу разделов в kafka (то есть один раздел = одно задание). Знаете ли вы, как это сделать? – cju
Flink действительно не может изменить уровень параллелизма во время выполнения. Но изменение количества разделов на одну тему в Kafka также не является автоматическим, и вы не будете делать это очень часто.Я не вижу проблемы с перезапуском задания Flink с другой настройкой параллелизма. – vanekjar
В kafka мы можем [изменить тему во время выполнения] (https://kafka.apache.org/documentation#basic_ops_modify_topic) (единственное ограничение - «_adding разделы не меняют разбиение существующих данных». В определенной ситуации, Я могу думать о другом случае использования, когда мы не хотим перезапускать флинк, чтобы масштабировать работу, и я пытаюсь найти способ (если возможно), чтобы обеспечить непрерывный сервис. Но, возможно, это невозможно. – cju