2017-01-05 2 views
1

Я работаю над настройкой приложения распределенного режима Kafka Connect, которое будет конвейером Kafka-S3. Я использую Kafka 0.10.1.0-1 и Kafka Connect 3.1.1-1. Пока все идет гладко, но один аспект, важный для большей системы, с которой я работаю, требует знания информации о смещении конвейера Kafka -> FileSystem. Согласно документации, конфигурация offset.storage.topic будет местом, которое приложение распределенного режима использует для хранения информации о смещении. Это имеет смысл, учитывая, что Kafka хранит потребительские смещения в «новой» Kafka. Однако после выполнения некоторых тестов с помощью FileStreamSinkConnector ничего не записывается в мой offset.storage.topic, который является значением по умолчанию: connect-offsets.Kafka Connect offset.storage.topic не получает сообщения (т. Е. Как получить доступ к метаданным смещения Kafka Connect?)

Чтобы быть конкретным, я использую производителя Python Kafka для передачи данных в тему и использования Kafka Connect с FileStreamSinkConnect для вывода данных из темы в файл. Это работает и ведет себя так, как я ожидаю, что соединитель будет вести себя. Кроме того, когда я останавливаю соединитель и запускаю соединитель, приложение запоминает состояние в теме, и дублирование данных отсутствует. Однако, когда я перехожу к offset.storage.topic, чтобы увидеть, какие метаданные смещены, в этой теме ничего нет.

Это команда, которую я использую:

kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic connect-offsets --from-beginning

Я получаю это сообщение после того, как позволить запускать эту команду в течение минуты или около того:

Processed a total of 0 messages

Итак, подведем итог, я имеют 2 вопроса:

  1. Почему я s, которые не записываются в тему, которая должна сохранять это, даже если мое распределенное приложение сохраняет состояние правильно?
  2. Как получить доступ к информации о метаданных смещения для приложения распределенного режима Kafka Connect? Это 100% необходимо для реализации нашей системы Lambda Architecture нашей системы.

Спасибо за помощь.

+0

Убедитесь, что потребитель читает тему и не терпит неудачу. Поскольку потребитель ожидает не только сообщения, но и правильно отформатированного (с 5 байтами в заголовке, 4 из них - это идентификатор схемы в реестре схемы). Проверьте это: http://stackoverflow.com/a/41285681/1437693 –

+0

Благодарим вас за помощь @YuriTceretian. Это относится к Kafka Connect, у которого есть собственный встроенный потребитель. – PhillipAMann

+0

Является ли Python Kafka совместимым с потребителем Connect? –

ответ

0

Новый S3 Connector, выпущенный компанией Confluent, может быть вам интересен.

Из того, что вы описали, возможно, это может значительно упростить вашу задачу экспорта записей из Kafka в ваши S3-ведра.

1

Смещения могут совершать на Кафку умолчанию смещение фиксации темы т.е. _consumer_offsets

1
  1. Liju правильно, подключающиеся-Коррекции используется для отслеживания смещения для источника разъемов (которые имеют производителя, но не потребитель) , разъем для раковины имеют потребителя и отслеживать смещение обычным способом - __consumer_offsets тему

  2. Лучший способ посмотреть на последние совершенные взаимозачетов с помощью инструмента группы потребителей:

    бен/Кафка-потребительские группы.ш --group подключение неэластичного-вход-разъем --bootstrap-сервер Localhost: 9092 --describe

Название группы всегда «замыкающийся» и имя соединителя (в моем случае, эластичная логин -connector). Это покажет последний сдвиг, совершенный группой, который в основном признает, что все сообщения до этого смещения были записаны в Elastic.

+0

Привет, Гвен, как мы можем добиться чтения смещения потребителя с помощью Java API. У меня есть требование показать его на панели управления? На данный момент мы используем указанную выше команду оболочки. – Renukaradhya

+1

Вы можете посмотреть код, который мы написали при реализации командной строки групп потребителей и получить основную идею ... (Вот почему камни с открытым исходным кодом, верно?) –

Смежные вопросы