2015-11-05 2 views
3

(Также пара вопросов о таймаутах и ​​maxSpoutPending)Как известно Storm, когда сообщение полностью обработано?

Я вижу много ссылок в документации Storm о полной обработке сообщений. Но как знает мой KafkaSpout, когда сообщение полностью обработано?

Надеюсь, это связано с тем, как мои болты подключены так, когда последний болт в моем потоке украшает кортеж, носик знает, когда обрабатывается мое сообщение?

В противном случае я бы предположил, что после истечения периода ожидания подтвержденное состояние сообщения проверяется, и оно считается обработанным, если оно указано в XOR. Но я надеюсь, что это не так?

У меня также есть связанные вопросы о конфигурации maxTuplesPending и таймаута.

Если я устанавливаю maxTuplePending на 10k, то правильно ли я думаю, что каждый экземпляр носика будет продолжать испускать кортежи, пока этот экземпляр носика не будет отслеживать 10k кортежей в полете, 10k кортежей, которые не были полностью обработаны? И тогда новый кортеж испускается, когда сообщение в настоящее время в полете полностью обработано?

И, наконец, это связано с конфигурацией тайм-аута? Должны ли носики ожидать каким-либо образом для настроенного тайм-аута, чтобы произойти до появления новых сообщений? Или конфигурация тайм-аута входит в игру только в том случае, если сообщение застопорилось/замедлилось при обработке, что привело к его сбою из-за таймаута?

Более лаконично (или, надо надеяться, более четко), есть ли эффект для установки моего тайм-аута на 30 минут, кроме сообщений, не будет проваливаться, если они не будут устранены окончательным болтом в течение 30 минут? Или существуют другие воздействия, такие как конфигурация тайм-аута, влияющая на уровень выбросов носиков?

Извините за долгий, бессвязный вопрос (ы). Заранее благодарим за любой ответ.

* Редактировать прояснить дальнейшую

Причиной этого является проблемой для меня, потому что мои сообщения не обязательно проходят через весь поток.

Скажем, у меня есть болты A, B, C, D. Большую часть времени сообщения передаются от A-> B -> -> D. Но у меня есть некоторые сообщения, которые намеренно остановятся на болте A. A будет подбрасывать их, но не испускать их (из-за моей бизнес-логики, в тех случаях я хочу продолжить обработку сообщений).

Так будет ли мой KafkaSpout знать, что сообщение, которое является зарегистрированным, но не выпущено из A, полностью обработано? Поскольку я хотел бы, чтобы другое сообщение было выпущено из желоба, как только Bolt A будет сделан с ним, в этом случае.

ответ

4

Шторм отслеживает кортежи по всей топологии с помощью механизма привязки, который должен использовать код UDF. Эти привязки приводят к так называемому кортежу-корню, корнем дерева является кортеж, испускаемый носиком, а все остальные узлы (которые соединены в древовидной структуре) представляют собой испущенные кортежи из болтов , которые использовали входные кортежи в качестве якорей (это только логическая модель и не реализована таким образом в Storm, хотя).

Например, носик испускает кортеж предложения, который разделяется первым болтом в словах, некоторое слово фильтруется вторым болтом, а слово-слово применяется третьим болтом. Наконец, стопорный болт записывает результат в файл.Дерево будет выглядеть следующим образом:

"this is an example sentence" -+-> "this" 
           +-> "is" 
           +-> "an" 
           +-> "example" -> "example",1 -> "example",1 
           +-> "sentence" -> "sentence",1 -> "sentence",1 

Первоначальное предложение испускается носиком, используемое в качестве якоря bolt1 для всех лексем, которые испускаются и получает acked от bolt1. Bolt2 отфильтровывает «это», «is» и «an» и просто загружает три кортежа. «пример» и «предложение» просто переадресованы, используются в качестве якоря для выходного кортежа и затем подзаряжаются. То же самое происходит в bolt2, и последний болт раковины просто забрасывает все входящие кортежи.

Кроме того, Storm отслеживает все треки всех кортежей, т. Е. От промежуточных болтов, а также от раковин. Во-первых, носик отправляет идентификатор выходного кортежа в задачу acker. Каждый раз, когда кортеж используется как якорь, acker также получает сообщение с идентификатором привязного кортежа и идентификатором выходного кортежа (который автоматически генерируется Storm). Акки с болта также идут на ту же задачу, что и XOR. Если все данные получены, то есть для носика и всех рекурсивно привязанных выходных кортежей (результат XOR будет равен нулю), acker отправляет сообщение на носик, что кортеж полностью обработан, и происходит обратная связь с номером Spout.ack(MessageId) (т.е. обратный вызов выполняется немедленно, когда кортеж полностью обрабатывается). Кроме того, проверяющие регулярно проверяют, есть ли кортеж, который зарегистрирован дольше, чем таймаут. Если это произойдет, идентификатор кортежа будет сброшен с помощью acker, и сообщение будет отправлено на носик, если кортеж завершился неудачно (в результате получился вызов Spout.fail(MessageId)).

Кроме того, Spouts хранит количество всех кортежей в полете и прекращает звонить Spout.nextTuple(), если этот счет превышает maxTuplesPending. Насколько мне известно, этот параметр применяется во всем мире, т. Е. Подсчитываются локальные подсчеты каждого задания на носик, а глобальный счет сравнивается с параметром (не уверен, как это реализовано подробно, хотя).

Таким образом, параметр timeout не зависит от maxTuplesPending.

+0

Большое спасибо за подробный ответ. Не могли бы вы обратиться к сценарию, который я упомянул в Редактировании. Как Шторм знает, что «все acks получены». Если я намеренно задержу после болта А, но не испугаю кортеж, чтобы забить B, будет ли это тайм-аут сообщения? – ab11

+0

Посмотрите на мой пример. кортежи «это», «есть», а «а» будет отфильтровываться промежуточным болтом, т. е. только под контролем и без вывода. Это прекрасно работает. Если бы все слова из предложения были отфильтрованы, дерево было бы менее глубоким, но начальное предложение получило бы штраф только на носик. –

+0

Еще раз спасибо. Я по-прежнему смущен тем, как Шторм справляется с этим. Если болт A вызывает ack и не испускает, как Storm знает, чтобы вызвать Spout.ack (messageId)? Я бы подумал, что он будет вызывать только Spout.ack (messageId), когда кортеж подкрепляется последним болтом в дереве Tuple (как он мог знать, что в этом случае, где я не испускал, что дерево меньше чем обычно). Я спрашиваю об этом, потому что вижу сбои слизи, хотя моя топология полностью обрабатывает его сообщения в течение таймаут-времени, поэтому я подозреваю, что неудачи исходят из тех сообщений, которые были выпущены Болтом, но не испускали. – ab11

Смежные вопросы