Я пытаюсь использовать kafka как альтернативу AWS SQS. Мотивация в первую очередь заключается в том, чтобы улучшить производительность, когда kafka устранит ограничение потянуть 10 сообщений за один раз с кепкой в 256 кбайт. Вот сценарий моего варианта использования на высоком уровне. У меня есть куча сканеров, которые отправляют документы для индексирования. Размер полезной нагрузки в среднем составляет около 1 мб. Сканеры вызывают конечную точку SOAP, которая, в свою очередь, запускает код производителя для отправки сообщений в очередь kafka. Потребительское приложение подбирает сообщения и обрабатывает их. Для моего тестового окна я настроил тему с 30 разделами с 2 репликациями. Два экземпляра kafka работают с 1 экземпляром zookeeper. Версия kafka - 0.10.0.Проблемы с настройкой/производительностью потребительской конфигурации Kafka
Для моего тестирования я опубликовал 7 миллионов сообщений в очереди. Я создал группу потребителей с 30 потребительскими потоками, по одному на раздел. Первоначально у меня создалось впечатление, что это существенно ускорит вычислительную мощность по сравнению с тем, что я получал через SQS. К сожалению, это было не так. В моем случае обработка данных сложна и занимает в среднем 1-2 минуты для завершения. Это приводит к резкому перераспределению разделов, поскольку потоки не могут биться вовремя. Я мог видеть кучу сообщений в журнале со ссылкой
Авто смещения коммит Сбой группы full_group: Commit не может быть завершен, поскольку группа уже балансировки и назначена разделов другого члена. Это означает, что время между последующими вызовами poll() было больше, чем сконфигурированное session.timeout.ms, что обычно подразумевает, что цикл опроса тратит слишком много времени на обработку сообщений. Вы можете указать это либо , увеличив тайм-аут сеанса, либо уменьшив максимальный размер пакетов , возвращенных в опросе() с max.poll.records.
Это приводит к тому, что одно и то же сообщение обрабатывается несколько раз. Я пытался играть с тайм-аутом сеанса, max.poll.records и временем опроса, чтобы избежать этого, но это замедлило общую обработку большого времени. Вот некоторые параметры конфигурации.
metadata.max.age.ms = 300000
max.partition.fetch.bytes = 1048576
bootstrap.servers = [kafkahost1:9092, kafkahost2:9092]
enable.auto.commit = true
max.poll.records = 10000
request.timeout.ms = 310000
heartbeat.interval.ms = 100000
auto.commit.interval.ms = 1000
receive.buffer.bytes = 65536
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class com.autodesk.preprocessor.consumer.serializer.KryoObjectSerializer
group.id = full_group
retry.backoff.ms = 100
fetch.max.wait.ms = 500
connections.max.idle.ms = 540000
session.timeout.ms = 300000
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
metrics.sample.window.ms = 30000
auto.offset.reset = latest
Я сократил время опроса потребителей до 100 мс. Это уменьшило проблемы перебалансировки, устранило дублирующую обработку, но значительно замедлило общий процесс. Это закончилось тем, что потребовалось 35 часов для обработки всех 6 миллионов сообщений по сравнению с 25 часами с использованием решения SQS. Каждый потребительский поток в среднем получал 50-60 сообщений за опрос, хотя некоторые из них опросили 0 записей в разы. Я не уверен в этом, когда в разделе есть огромное количество сообщений. Тот же поток смог получить сообщения во время последующей итерации. Может ли это быть связано с перебалансировкой?
Это мой потребительский код
while (true) {
try{
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
if(record.value()!=null){
TextAnalysisRequest textAnalysisObj = record.value();
if(textAnalysisObj!=null){
// Process record
PreProcessorUtil.submitPostProcessRequest(textAnalysisObj);
}
}
}
}catch(Exception ex){
LOGGER.error("Error in Full Consumer group worker", ex);
}
Я понимаю, что эта часть обработки записи является одним узким местом в моем случае. Но я уверен, что у нескольких людей есть аналогичный случай использования большого времени обработки. Я подумал о том, чтобы сделать асинхронную обработку, открутив каждый процессор в выделенном потоке или использовать пул потоков с большой емкостью, но не уверен, создаст ли он большую нагрузку в системе. В то же время я видел несколько примеров, когда люди использовали паузу и возобновляют API для выполнения обработки, чтобы избежать проблемы с перебалансировкой.
Я действительно ищу совет или лучшую практику в этом случае. В частности, рекомендуемая настройка конфигурации вокруг звукового сигнала, тайм-аута запроса, максимальных записей опроса, интервала автоматического фиксации, интервала опроса и т. Д., Если kafka не является подходящим инструментом для моего использования, пожалуйста, дайте мне знать.