2

spark streaming UI Прямой потребитель Kafka начал ограничивать чтение до 450 событий (5 * 90 разделов) за партию (5 секунд), он работал нормально в течение 1 или 2 дней до этого (около 5000 до 40000 событий на каждую партию)Spark Streaming Kafka прямой потребительский расходная нагрузка

Я использую искровой автономный кластер (искра и искрообразование-кафка версии 1.6.1), работающий в AWS и использующий ведро S3 для каталога контрольных точек StreamingContext.getOrCreate(config.sparkConfig.checkpointDir, createStreamingContext), нет задержек в планировании и достаточного количества дисков пространства на каждом рабочем узле.

Не изменять параметры инициализации клиента Кафки, почти уверен, что структура Кафки не изменилась:

val kafkaParams = Map("metadata.broker.list" -> kafkaConfig.broker) 
val topics = Set(kafkaConfig.topic) 
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topics) 

Также не может понять, почему, когда прямое описание потребителя говорит The consumed offsets are by the stream itself мне все еще нужно использовать контрольно-пропускной пункт каталог при создании потокового контекста?

+0

Установлен ли 'spark.streaming.backpressure.enabled' значение' true'? –

+0

да, я попытаюсь отключить его –

+0

похоже, что он помог –

ответ

1

Обычно это результат включения противодавления через настройку spark.streaming.backpressure.enabled на значение true. Обычно, когда алгоритм противодавления видит, что в него поступают больше данных, он привык, он начинает упускать каждую партию до небольшого размера, пока он не сможет снова «стабилизировать» себя. Это иногда имеет ложные срабатывания и заставляет ваш поток замедлять скорость обработки.

Если вы хотите настроить эвристику немного, есть некоторые недокументированные флаги он использует (только убедитесь, что вы знаете, что вы делаете):

val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0) 
val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2) 
val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0) 
val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100) 

Если вы хотите окровавленные детали, PIDRateEstimator это то, что вы ищете.

Смежные вопросы