2016-10-06 3 views
4

Я пытаюсь использовать kafka как альтернативу AWS SQS. Мотивация в первую очередь заключается в том, чтобы улучшить производительность, когда kafka устранит ограничение потянуть 10 сообщений за один раз с кепкой в ​​256 кбайт. Вот сценарий моего варианта использования на высоком уровне. У меня есть куча сканеров, которые отправляют документы для индексирования. Размер полезной нагрузки в среднем составляет около 1 мб. Сканеры вызывают конечную точку SOAP, которая, в свою очередь, запускает код производителя для отправки сообщений в очередь kafka. Потребительское приложение подбирает сообщения и обрабатывает их. Для моего тестового окна я настроил тему с 30 разделами с 2 репликациями. Два экземпляра kafka работают с 1 экземпляром zookeeper. Версия kafka - 0.10.0.Проблемы с настройкой/производительностью потребительской конфигурации Kafka

Для моего тестирования я опубликовал 7 миллионов сообщений в очереди. Я создал группу потребителей с 30 потребительскими потоками, по одному на раздел. Первоначально у меня создалось впечатление, что это существенно ускорит вычислительную мощность по сравнению с тем, что я получал через SQS. К сожалению, это было не так. В моем случае обработка данных сложна и занимает в среднем 1-2 минуты для завершения. Это приводит к резкому перераспределению разделов, поскольку потоки не могут биться вовремя. Я мог видеть кучу сообщений в журнале со ссылкой

Авто смещения коммит Сбой группы full_group: Commit не может быть завершен, поскольку группа уже балансировки и назначена разделов другого члена. Это означает, что время между последующими вызовами poll() было больше, чем сконфигурированное session.timeout.ms, что обычно подразумевает, что цикл опроса тратит слишком много времени на обработку сообщений. Вы можете указать это либо , увеличив тайм-аут сеанса, либо уменьшив максимальный размер пакетов , возвращенных в опросе() с max.poll.records.

Это приводит к тому, что одно и то же сообщение обрабатывается несколько раз. Я пытался играть с тайм-аутом сеанса, max.poll.records и временем опроса, чтобы избежать этого, но это замедлило общую обработку большого времени. Вот некоторые параметры конфигурации.

 
metadata.max.age.ms = 300000 
max.partition.fetch.bytes = 1048576 
bootstrap.servers = [kafkahost1:9092, kafkahost2:9092] 
enable.auto.commit = true 
max.poll.records = 10000 
request.timeout.ms = 310000 
heartbeat.interval.ms = 100000 
auto.commit.interval.ms = 1000 
receive.buffer.bytes = 65536 
fetch.min.bytes = 1 
send.buffer.bytes = 131072 
value.deserializer = class com.autodesk.preprocessor.consumer.serializer.KryoObjectSerializer 
group.id = full_group 
retry.backoff.ms = 100 
fetch.max.wait.ms = 500 
connections.max.idle.ms = 540000 
session.timeout.ms = 300000 
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 
metrics.sample.window.ms = 30000 
auto.offset.reset = latest 
Я сократил время опроса потребителей до 100 мс. Это уменьшило проблемы перебалансировки, устранило дублирующую обработку, но значительно замедлило общий процесс. Это закончилось тем, что потребовалось 35 часов для обработки всех 6 миллионов сообщений по сравнению с 25 часами с использованием решения SQS. Каждый потребительский поток в среднем получал 50-60 сообщений за опрос, хотя некоторые из них опросили 0 записей в разы. Я не уверен в этом, когда в разделе есть огромное количество сообщений. Тот же поток смог получить сообщения во время последующей итерации. Может ли это быть связано с перебалансировкой?

Это мой потребительский код

 
while (true) { 
    try{ 
     ConsumerRecords records = consumer.poll(100); 
     for (ConsumerRecord record : records) { 
      if(record.value()!=null){ 
       TextAnalysisRequest textAnalysisObj = record.value(); 
       if(textAnalysisObj!=null){ 
        // Process record 
        PreProcessorUtil.submitPostProcessRequest(textAnalysisObj); 
       } 
      } 
     } 
    }catch(Exception ex){ 
     LOGGER.error("Error in Full Consumer group worker", ex); 
    } 
Я понимаю, что эта часть обработки записи является одним узким местом в моем случае. Но я уверен, что у нескольких людей есть аналогичный случай использования большого времени обработки. Я подумал о том, чтобы сделать асинхронную обработку, открутив каждый процессор в выделенном потоке или использовать пул потоков с большой емкостью, но не уверен, создаст ли он большую нагрузку в системе. В то же время я видел несколько примеров, когда люди использовали паузу и возобновляют API для выполнения обработки, чтобы избежать проблемы с перебалансировкой.

Я действительно ищу совет или лучшую практику в этом случае. В частности, рекомендуемая настройка конфигурации вокруг звукового сигнала, тайм-аута запроса, максимальных записей опроса, интервала автоматического фиксации, интервала опроса и т. Д., Если kafka не является подходящим инструментом для моего использования, пожалуйста, дайте мне знать.

ответ

2

Вы можете начать обработку сообщений асинхронно, в отдельном потоке, чем поток, который читается из Kafka.Таким образом, автоматическое совершение будет очень быстрым, и Kafka не сократит вашу сессию. Что-то вроде этого:

private final BlockingQueue<TextAnalysisRequest> requests = 
new LinkedBlockingQueue(); 

В читальном нити:

while (true) { 
    try{ 
     ConsumerRecords records = consumer.poll(100); 
     for (ConsumerRecord record : records) { 
      if(record.value()!=null){ 
       TextAnalysisRequest textAnalysisObj = record.value(); 
       if(textAnalysisObj!=null){ 
        // Process record 
        requests.offer(textAnalysisObj); 
       } 
      } 
    }  
} 
catch(Exception ex){ 
    LOGGER.error("Error in Full Consumer group worker", ex); 
} 

В обработке резьбы:

  while (!Thread.currentThread().isInterrupted()) { 
       try { 
        TextAnalysisRequest textAnalysisObj = requests.take(); 
        PreProcessorUtil.submitPostProcessRequest(textAnalysisObj); 
       } catch (InterruptedException e) { 
        LOGGER.info("Process thread interrupted", e); 
        Thread.currentThread().interrupt(); 
       } catch (Throwable t) { 
        LOGGER.warn("Unexpected throwable while processing.", t); 
       } 
      } 

Посмотрите также на этой документации, для стратегии отправки больших сообщений через Кафку: http://blog.cloudera.com/blog/2015/07/deploying-apache-kafka-a-practical-faq/

Вкратце говорится, что Кафка выполняет b на сообщения небольшого размера около 10 тыс., и если вам нужно отправить более крупные сообщения, лучше разместить их в сетевом хранилище и отправить через Kafka только их местоположение или разделить их.

Смежные вопросы