Я работаю над настройкой приложения распределенного режима Kafka Connect, которое будет конвейером Kafka-S3. Я использую Kafka 0.10.1.0-1 и Kafka Connect 3.1.1-1. Пока все идет гладко, но один аспект, важный для большей системы, с которой я работаю, требует знания информации о смещении конвейера Kafka -> FileSystem. Согласно документации, конфигурация offset.storage.topic
будет местом, которое приложение распределенного режима использует для хранения информации о смещении. Это имеет смысл, учитывая, что Kafka хранит потребительские смещения в «новой» Kafka. Однако после выполнения некоторых тестов с помощью FileStreamSinkConnector ничего не записывается в мой offset.storage.topic
, который является значением по умолчанию: connect-offsets
.Kafka Connect offset.storage.topic не получает сообщения (т. Е. Как получить доступ к метаданным смещения Kafka Connect?)
Чтобы быть конкретным, я использую производителя Python Kafka для передачи данных в тему и использования Kafka Connect с FileStreamSinkConnect для вывода данных из темы в файл. Это работает и ведет себя так, как я ожидаю, что соединитель будет вести себя. Кроме того, когда я останавливаю соединитель и запускаю соединитель, приложение запоминает состояние в теме, и дублирование данных отсутствует. Однако, когда я перехожу к offset.storage.topic
, чтобы увидеть, какие метаданные смещены, в этой теме ничего нет.
Это команда, которую я использую:
kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic connect-offsets --from-beginning
Я получаю это сообщение после того, как позволить запускать эту команду в течение минуты или около того:
Processed a total of 0 messages
Итак, подведем итог, я имеют 2 вопроса:
- Почему я s, которые не записываются в тему, которая должна сохранять это, даже если мое распределенное приложение сохраняет состояние правильно?
- Как получить доступ к информации о метаданных смещения для приложения распределенного режима Kafka Connect? Это 100% необходимо для реализации нашей системы Lambda Architecture нашей системы.
Спасибо за помощь.
Убедитесь, что потребитель читает тему и не терпит неудачу. Поскольку потребитель ожидает не только сообщения, но и правильно отформатированного (с 5 байтами в заголовке, 4 из них - это идентификатор схемы в реестре схемы). Проверьте это: http://stackoverflow.com/a/41285681/1437693 –
Благодарим вас за помощь @YuriTceretian. Это относится к Kafka Connect, у которого есть собственный встроенный потребитель. – PhillipAMann
Является ли Python Kafka совместимым с потребителем Connect? –