2015-11-08 4 views
1

Моя топология такова: kafka(p:6)->reduce(p:6)->db writer(p:12) (где p: параллельность).flink: он проигрывает записи?

  • он у меня работает на одном узле «кластер» с taskmanager.numberOfTaskSlots: 30
  • Я знаю, что мой источник Кафка производит ~ 6,5 млн записи/мин
  • Кафки «читатель» имеет параллелизм равный к # Кафки разделов

Когда я наблюдаю эту работу (через FLiNK UI) в течение ~ 1 мин эти значения я вижу:

  • Кафка -> уменьшить: ~ 1.5M записей отправлено (охлаждает> 4x)
  • уменьшают (оконный агрегат 5 секунд) -> дб написать ~ 114K записей отправляется (выключена по> 2x)
  • дБ запись -> запись, полученная: ~ 23K (охлаждает> 5x)

(Есть небольшие расхождения между переданным/принятыми значениями для других частей, но я могу отнести их к ошибкам измерения)

Вопрос (и):
1. Whe re - остальные записи?
2. Нагрузка на эту машину не превышает 1,5, пока она работает. Есть ли еще один ограничивающий фактор?
3. Неужели я неправильно читаю значения из пользовательского интерфейса?

Java-8
Flink 1.0 (последняя GitHub)
машины: 32 ядер/96 Гб RAM

Это можно объяснить с помощью процесса агрегации.
Это значение соответствует тому, что записано в db.

ответ

6

Флинк не потерял записей, они просто забуферированы в полете, или они остаются дольше в Кафке. Из цифр видно, что вы испытываете противодавление.

Вы можете видеть, что «редуктор» выпустил много записей, которые еще не были получены «db writer». В этом случае эти записи все еще находятся в буферах канала связи между операторами. Эти каналы имеют ограниченный объем буферизации (в зависимости от количества настроенных буферов, как правило, нескольких МБ). Для небольших записей они, вероятно, будут содержать несколько записей за 10 000 записей.

Если количество отправленных записей в одном операторе постоянно значительно отстает от принятых записей в принимающем операторе, это индикатор того, что приемник (здесь «db writer») не может идти в ногу со скоростью передачи данных. Может быть, потому, что БД не обрабатывает вставки достаточно быстро (слишком синхронно, слишком мелкозернистая фиксация?), Возможно, сеть между «db writer» и DB насыщена.

В этом случае «db writer» пропустит редуктор, который в конечном итоге также проткнет источник Kafka.

Чтобы проверить, что такое скорость передачи данных, если у вас не было обратного давления из базы данных, вы можете попробовать эксперимент, где «db writer» просто отбрасывает все записи.