2015-05-26 3 views
4

Я использую новый клиент производителя kafka и устанавливаю свойство timeout.ms в 50 мс.Тайм-аут нового производителя Kafka

Вот полная конфигурация, используемая в производителя:

props.put("acks", "1"); 
props.put("buffer.memory", "33554432"); 
props.put("retries", "1"); 
props.put("batch.size", "16384"); 
props.put("client.id", "foo"); 
props.put("linger.ms", "0"); 
props.put("timeout.ms", "50"); 

Запрос среднее время отклика в некоторые моменты высокой нагрузки составляет 4 секунды, но я не получаю ошибку тайм-аута.

Знает ли кто-нибудь, как этот таймаут рассчитывается, когда он начинает подсчитываться и когда он заканчивается? Есть ли способ настроить таймаут для начала с момента вызова метода отправки производителя?

ответ

5

Новое имущество timeout.ms работает с конфигурацией производителя ack. Для примера рассмотрим следующую ситуацию

ack = all 
timeout.ms = 3000 

В этом случае ack = all означает, что лидер не будет отвечать до тех пор пока он получает подтверждение для полного набора в синхронизации реплик (ISR) и максимальное время ожидания, чтобы получить это будет подтверждение be 3000 ms. Если он не получил ожидаемое количество подтверждений в течение заданного времени, он вернет ошибку.

Также обратите внимание, что это свойство не учитывает сетевую задержку.

На странице дока:

Конфигурация определяет максимальное количество времени сервера будет ждать подтверждения от последователей, чтобы удовлетворить требование подтверждения производителя указавшего с конфигурацией квитанций. Если запрошенное количество подтверждений не выполняется, когда истекает время ожидания, будет возвращена ошибка. Этот тайм-аут измеряется на стороне сервера и не включает сетевую задержку запроса.

Так что в вашем случае с ack=1 (я не 100% уверен, что это и открыт для любой коррекции, если это применимо), если руководитель не в состоянии ответить (после записи запись в своем журнале, не дожидаясь полного подтверждения от всех последователей) в течение 50 мс должен выдать ошибку.

+0

Эта конфигурация не существует для нового производителя кафки, она говорит, что она будет проигнорирована, когда я начну продюсер. –

+0

Согласовано, новая конфигурация производителя использует это вместо старого свойства, соответственно обновил ответ – user2720864

+0

Вы правы, это ожидаемое поведение, но наша проблема немного отличается. Мы перегружаем kafka, и некоторые узлы перестают отвечать (возможно, потому, что GC), но у продюсера нет тайм-аута на стороне клиента, поэтому он ждет, пока узел вернется для отправки сообщений в очереди. Существует проблема об этом на kafka jira (https://issues.apache.org/jira/browse/KAFKA-1788). –