2016-11-18 2 views
0

Я хочу, чтобы грамматически воссоздать тему Кафки. Я использую kafka.admin.AdminUtils для того же.Прогматически воссоздавая тему Кафки

Вот мой rought код:

AdminUtils.deleteTopic(zkUtils, topicName); 
AdminUtils.createTopic(zkUtils, topicName, partitions, replicationFactor, new Properties()); 

Приведенный выше код работает большую часть времени, однако несколько раз он терпит неудачу с исключением следующего:

Exception in thread "main" kafka.common.TopicExistsException: Topic "new_topic" already exists. 
     at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:253) 
     at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:237) 
     at kafka.admin.AdminUtils.createTopic(AdminUtils.scala) 

Я понимаю, что тема не правильно удаляться. Что я делаю неправильно здесь.

ответ

2

Вы не можете вызвать createTopic сразу после вызова удаленияTopic. Две вещи должны быть предметом заботы:

  1. Набор «delete.topic.enable» истинной

  2. Поскольку удаление темы является асинхронной операцией, то лучше убедиться, что все метаданные были успешно удалены изнутри Zookeeper перед созданием новой темы

+0

Я уже установил delete.topic.enable true. Чтобы достичь пункта 2, есть ли обратный вызов от kafka, который уведомляет о завершении операции удаления? –

+0

Кажется, не может быть использована функция обратного вызова из коробки. Возможным способом является мониторинг существования для zk node '/ admin/delete_topics/'. Удаление этого zk-узла является последним, но единственным шагом при удалении темы, которая возникает перед очисткой кэшей контроллеров, что нелегко контролировать. – amethystic

Смежные вопросы