Хорошо, я начну с уточненной потребительной случае и объяснить мой вопрос:Объяснить Kinesis осколок Итератор - AWS Java SDK
- я использую третьей стороной веб-аналитики платформы, которая использует AWS Kinesis потоков для передачи данных от клиента в конечный пункт назначения - поток Kinesis;
- Платформа для веб-аналитики использует 2 потока:
- Поток коллектора данных (поток одиночного осколка);
- Второй поток для обогащения необработанных данных из потока коллектора (одиночный осколочный поток); Самое главное, этот поток потребляет необработанные данные из первого потока с использованием типа итератора
TRIM_HORIZON
;
- потребляет данные из потока с помощью AWS Java SDK, secifically используя
GetShardIteratorRequest
класс; - В настоящее время я разрабатываю класс извлечения, так что это делается синхронно, то есть я использую данные только тогда, когда компилирую свой класс;
- Класс удивительно работает, хотя есть некоторые вещи, которые я не понимаю, особенно в отношении того, как данные потребляются из потока и значения каждого из типов итераторов;
Моя проблема является то, что данные, которые я получить противоречива и не имеет хронологическую логику в нем.
При использовании
AT_SEQUENCE_NUMBER
и обеспечивают первый порядковый номер от осколка с.getSequenceNumberRange() getStartingSequenceNumber().
... как ``, я не получаю все записи. Аналогично,
AFTER_SEQUENCE_NUMBER
;- Когда я использую
LATEST
, я получаю нулевые результаты; - Когда я использую
TRIM_HORIZON
, который должен иметь смысл использовать, он, похоже, не работает нормально. Он использовал, чтобы предоставить мне данные, а затем добавил новые «события» (записи в конечный поток), и я получил нулевые записи. Тайна.
Мои вопросы:
- Как можно безопасно использовать данные из потока, без необходимости беспокоиться о пропущенных записей?
- Есть ли альтернатива
ShardIteratorRequest
? - Если есть, как я могу просто «просматривать» поток и посмотреть, что внутри него для отладки ссылок?
- Что мне не хватает с помощью метода
TRIM_HORIZON
?
Заранее спасибо, мне очень хотелось бы узнать немного больше о потреблении данных из потока Kinesis.
У меня тоже есть аналогичные проблемы - хотя для меня я получаю дубликаты записей на каждой итерации (используя как AT_SEQUENCE_NUMBER, так и FROM_SEQUENCE_NUMBER), несмотря на то, что каждый ответ получил значение NextShardIterator. Документы несколько загадочны в этом вопросе ... Я также хотел бы знать, что означает «необрезанный» (w.r.t TRIM_HORIZON). – Erve1879
Для записи я сделал что-то среднее в среднем - я взял существующего пользователя Scala, который постоянно прослушивает поток и просто переносил его обратно на чистую Java для моих целей. Вот приложение Scala, первоначально разработанное SnowPlow https://github.com/snowplow/kinesis-example-scala-consumer – YuvalHerziger
К сожалению, я не дружественный к Java .....! Я просто хочу, чтобы были языковые агностики, четкие рекомендации о том, как обеспечить идемпотентность и 100% «охват» записей, в то время как разрешить перезагрузку потребителя, сбои и т. Д. Это, кажется, отрицает цель Кинезиса, если нам нужно сохранить и проверить против SequenceNumber всех ранее учтенных записей, чтобы избежать дублирования. Я уверен, что мне что-то не хватает, хотя ....... – Erve1879