2015-10-02 5 views
8

Мы используем Storm с кафкой. Когда мы терпим неудачу в сообщениях, мы хотели бы их воспроизвести, но в некоторых случаях плохие данные или ошибки кода заставят сообщения всегда терпеть неудачу с Bolt, поэтому мы перейдем в бесконечный цикл повтора. Очевидно, что мы исправляем ошибки, когда находим их, но хотим, чтобы наша топология была, как правило, отказоустойчивой. Как мы можем() привязать кортеж после повторного воспроизведения более N раз?Максимальное количество повторений кортежей на Storm Kafka Spout

Просматривая код для Кафки носика, я вижу, что он был разработан, чтобы повторить попытку с экспоненциальным таймером задержки и comments on the PR состояния:

«Носик не завершает цикл повторных попыток (это мое убеждение, что он не должен этого делать, потому что он не может сообщать о том, что произошел сбой, связанный с отменой повторения), он обрабатывает только задержку повторных попыток. Болт в топологии, по-прежнему ожидается, в конечном итоге вызовет ack() вместо fail(), чтобы остановить цикл."

Я видел ответы StackOverflow, которые рекомендуют писать пользовательский носик, но я бы предпочел не зацикливаться на сохранении пользовательского патча внутренних частей Kafka Spout, если есть рекомендуемый способ сделать это в Bolt.

Каков правильный способ сделать это в болте? Я не вижу никакого состояния в кортеже, которое разоблачает, сколько раз оно было воспроизведено.

+1

Если у вас есть проверка ошибок в болте, где вы можете сделать вывод, что конкретный кортеж «плохой» по вашей бизнес-логике, вы можете «подписать» вместо отказа ... поэтому он не будет воспроизводиться .. ... –

ответ

5

Шторм сам по себе не поддерживает вашу проблему. Таким образом, индивидуальное решение - единственный путь. Даже если вы не хотите исправлять KafkaSpout, я думаю, что введение счетчика и нарушение цикла повтора в нем было бы лучшим подходом. В качестве альтернативы вы также можете унаследовать от KafkaSpout и поставить счетчик в свой подкласс. Это, конечно, несколько похоже на патч, но может быть менее навязчивым и проще реализовать.

Если вы хотите использовать болт, вы можете сделать следующее (что также требует некоторых изменений в KafkaSpout или его подклассе).

  • Назначить уникальный идентификатор в качестве дополнительного атрибута каждого кортежа (возможно, уже есть уникальный идентификатор доступен, в противном случае, вы могли бы ввести «счетчик-ID» или просто весь кортеж, то есть все атрибуты, для идентификации каждого кортежа).
  • Вставьте болт после KafkaSpout по номеру fieldsGrouping на идентификаторе (чтобы гарантировать, что кортеж, который воспроизводится, передается в тот же экземпляр болта).
  • В вашем болте используйте HashMap<ID,Counter>, который буферизует все кортежи и подсчитывает количество попыток (повторений). Если счетчик меньше вашего порогового значения, переместите входной кортеж, чтобы он обрабатывался с помощью текущей топологии (конечно, вам необходимо привязать кортеж соответственно). Если счетчик больше вашего порога, попробуйте кортеж, чтобы разбить цикл и удалить его запись из HashMap (вы также можете захотеть LOG всех неудачных кортежей).
  • Чтобы удалить успешно обработанные кортежи из HashMap, каждый раз, когда кортеж находится под номером KafkaSpout, вам необходимо переслать идентификатор кортежа на болт, чтобы он мог удалить кортеж из HashMap. Просто объявите второй выходной поток для своего KafkaSpout подзаголовка и перезапишите Spout.ack(...) (конечно, вам нужно позвонить super.ack(...), чтобы KafkaSpout тоже получил деньги).

Этот подход может поглотить много памяти.В качестве альтернативы иметь запись для каждого кортежа в HashMap, вы также можете использовать третий поток (который соединен с болтом как два других) и пересылать идентификатор кортежа, если кортеж не работает (т. Е. В Spout.fail(...)). Каждый раз, когда болт получает сообщение об ошибке из этого третьего потока, счетчик увеличивается. До тех пор, пока в HashMap (или порог не достигнут) вход отсутствует, болт просто переводит кортеж для обработки. Это должно уменьшить используемую память, но требует дополнительной логики, которая будет реализована в вашем носике и болте.

Оба подхода имеют тот недостаток, что каждый связанный кортеж приводит к появлению дополнительного сообщения для вашего нового значка (таким образом, увеличения сетевого трафика). Для второго подхода может показаться, что вам нужно отправить сообщение «ack» на болт для кортежей, которые раньше не выполнялись. Однако вы не знаете, какие кортежи потерпели неудачу, а какие нет. Если вы хотите избавиться от этих сетевых издержек, вы можете ввести второй номер HashMap в KafkaSpout, который буферизирует идентификаторы сбойных сообщений. Таким образом, вы можете отправить сообщение «ack», если неудачный кортеж был успешно воспроизведен. Конечно, этот третий подход делает логику еще более сложной.

Без изменения KafkaSpout в некоторой степени, я не вижу решения для вашей проблемы. Я лично заплачу KafkaSpout или воспользуюсь третьим подходом с HashMap в подклассе KafkaSpout и болтом (потому что он потребляет мало памяти и не накладывает много дополнительной нагрузки на сеть по сравнению с первыми двумя решениями).

+0

- метод fail() в Spout, вызываемый одним потоком? Я просто пытаюсь определить, нужен ли мне ConcurrentHashMap для отслеживания msgIds-> errorCnt или простой HashMap <>. спасибо – user3169330

+0

'nextTuple()', 'ack()' и 'fail()' вызывается одним потоком. Использовать 'HashMap' достаточно. См. Здесь для получения дополнительной информации: https://stackoverflow.com/questions/32547935/why-should-i-not-loop-or-block-in-spout-nexttuple –

+0

еще одна вещь, если у меня есть N носиков, происходит сбой() для определенного msgId, вызывается на сервере SAME/Spout? – user3169330

0

В основном это работает так:

  1. При развертывании топологии они должны быть класса производства (это, определенный уровень качества, как ожидается, число кортежей низких).
  2. Если кортеж не работает, проверьте, действительно ли кортеж действителен.
  3. Если кортеж действителен (например, не был вставлен, потому что невозможно подключиться к внешней базе данных или что-то вроде этого) ответьте ему.
  4. Если кортеж пропущен и никогда не может быть обработан (например, идентификатор базы данных, который является текстом, а база данных ожидает целое число), это должно быть ack, вы никогда не сможете исправить такую ​​вещь или вставить ее в базы данных.
  5. Необходимо регистрировать новые виды исключений (а также содержимое самого кортежа). Вы должны проверить эти журналы и сформировать правило для проверки кортежей в будущем. И в конечном итоге добавить код для правильной их обработки (ETL) в будущем.
  6. Не регистрируйте все, в противном случае ваши файлы журналов будут огромными, будьте очень избирательными в отношении того, что вы регистрируете. Содержимое журнальных файлов должно быть полезным, а не кучей мусора.
  7. Продолжайте делать это, и в конечном итоге вы будете охватывать только все случаи.
0

Мы также сталкиваемся с аналогичными данными, в которых у нас имеются плохие данные, в результате чего болт терпит неудачу бесконечно.

Чтобы решить эту проблему во время выполнения, мы представили еще один болт, называя его «DebugBolt» для справки. Поэтому носик сначала отправляет сообщение на этот болт, а затем эти болты фиксируют требуемые данные для плохих сообщений, а затем испускают их на необходимый болт. Таким образом, можно исправить ошибки данных «на лету».

Кроме того, если вам нужно удалить некоторые сообщения, вы можете фактически передать ignoreFlag из своего DebugBolt в исходный болт, и ваш оригинальный болт должен просто отправить ack на носик без обработки, если ignoreFlag имеет значение True.

0

Мы просто использовали наш болт, чтобы исправить плохой кортеж в потоке ошибок и подбросить его. Еще один болт справился с ошибкой, обратившись к теме Kafka специально для ошибок. Это позволяет нам легко направлять нормальный поток данных об ошибках через топологию.

Единственный случай, когда мы терпим неудачу кортежа, - это то, что какой-то требуемый ресурс отключен, например, сетевое соединение, БД ... Это возвращаемые ошибки. Все остальное направлено на поток ошибок, который должен быть исправлен или обработан как соответствующий.

Все это предполагает, конечно, что вы не хотите нести какие-либо потери данных. Если вы хотите только попытаться приложить максимум усилий и проигнорировать после нескольких попыток, я бы посмотрел на другие варианты.

Смежные вопросы