Я хочу, чтобы грамматически воссоздать тему Кафки. Я использую kafka.admin.AdminUtils
для того же.Прогматически воссоздавая тему Кафки
Вот мой rought код:
AdminUtils.deleteTopic(zkUtils, topicName);
AdminUtils.createTopic(zkUtils, topicName, partitions, replicationFactor, new Properties());
Приведенный выше код работает большую часть времени, однако несколько раз он терпит неудачу с исключением следующего:
Exception in thread "main" kafka.common.TopicExistsException: Topic "new_topic" already exists.
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:253)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:237)
at kafka.admin.AdminUtils.createTopic(AdminUtils.scala)
Я понимаю, что тема не правильно удаляться. Что я делаю неправильно здесь.
Я уже установил delete.topic.enable true. Чтобы достичь пункта 2, есть ли обратный вызов от kafka, который уведомляет о завершении операции удаления? –
Кажется, не может быть использована функция обратного вызова из коробки. Возможным способом является мониторинг существования для zk node '/ admin/delete_topics/'. Удаление этого zk-узла является последним, но единственным шагом при удалении темы, которая возникает перед очисткой кэшей контроллеров, что нелегко контролировать. –
amethystic