2014-09-17 3 views
6

Хорошо, я начну с уточненной потребительной случае и объяснить мой вопрос:Объяснить Kinesis осколок Итератор - AWS Java SDK

  1. я использую третьей стороной веб-аналитики платформы, которая использует AWS Kinesis потоков для передачи данных от клиента в конечный пункт назначения - поток Kinesis;
  2. Платформа для веб-аналитики использует 2 потока:
    1. Поток коллектора данных (поток одиночного осколка);
    2. Второй поток для обогащения необработанных данных из потока коллектора (одиночный осколочный поток); Самое главное, этот поток потребляет необработанные данные из первого потока с использованием типа итератора TRIM_HORIZON;
  3. потребляет данные из потока с помощью AWS Java SDK, secifically используя GetShardIteratorRequest класс;
  4. В настоящее время я разрабатываю класс извлечения, так что это делается синхронно, то есть я использую данные только тогда, когда компилирую свой класс;
  5. Класс удивительно работает, хотя есть некоторые вещи, которые я не понимаю, особенно в отношении того, как данные потребляются из потока и значения каждого из типов итераторов;

Моя проблема является то, что данные, которые я получить противоречива и не имеет хронологическую логику в нем.

  • При использовании AT_SEQUENCE_NUMBER и обеспечивают первый порядковый номер от осколка с

    .getSequenceNumberRange() getStartingSequenceNumber().

    ... как ``, я не получаю все записи. Аналогично, AFTER_SEQUENCE_NUMBER;

  • Когда я использую LATEST, я получаю нулевые результаты;
  • Когда я использую TRIM_HORIZON, который должен иметь смысл использовать, он, похоже, не работает нормально. Он использовал, чтобы предоставить мне данные, а затем добавил новые «события» (записи в конечный поток), и я получил нулевые записи. Тайна.

Мои вопросы:

  1. Как можно безопасно использовать данные из потока, без необходимости беспокоиться о пропущенных записей?
  2. Есть ли альтернатива ShardIteratorRequest?
  3. Если есть, как я могу просто «просматривать» поток и посмотреть, что внутри него для отладки ссылок?
  4. Что мне не хватает с помощью метода TRIM_HORIZON?

Заранее спасибо, мне очень хотелось бы узнать немного больше о потреблении данных из потока Kinesis.

+0

У меня тоже есть аналогичные проблемы - хотя для меня я получаю дубликаты записей на каждой итерации (используя как AT_SEQUENCE_NUMBER, так и FROM_SEQUENCE_NUMBER), несмотря на то, что каждый ответ получил значение NextShardIterator. Документы несколько загадочны в этом вопросе ... Я также хотел бы знать, что означает «необрезанный» (w.r.t TRIM_HORIZON). – Erve1879

+0

Для записи я сделал что-то среднее в среднем - я взял существующего пользователя Scala, который постоянно прослушивает поток и просто переносил его обратно на чистую Java для моих целей. Вот приложение Scala, первоначально разработанное SnowPlow https://github.com/snowplow/kinesis-example-scala-consumer – YuvalHerziger

+0

К сожалению, я не дружественный к Java .....! Я просто хочу, чтобы были языковые агностики, четкие рекомендации о том, как обеспечить идемпотентность и 100% «охват» записей, в то время как разрешить перезагрузку потребителя, сбои и т. Д. Это, кажется, отрицает цель Кинезиса, если нам нужно сохранить и проверить против SequenceNumber всех ранее учтенных записей, чтобы избежать дублирования. Я уверен, что мне что-то не хватает, хотя ....... – Erve1879

ответ

0

Я понимаю путаницу выше, и у меня были те же проблемы, но я думаю, что я понял это сейчас. Обратите внимание, что я использую JSON API непосредственно без KCL.

Я, кажется, что API предоставляет клиентам 2 основных варианта итераторов, когда они начинают потреблять поток:

A) TRIM_HORIZON: для чтения СОСТОЯВШИХСЯ записей с задержкой между многими минутами (даже часов) и 24 часами назад. Он не возвращает недавно поставленные записи. Использование AFTER_SEQUENCE_NUMBER в последней записи, которую видит этот итератор, возвращает пустой массив, даже когда записи были недавно PUT.

B) ПОСЛЕДНИЕ: для чтения записей БУДУЩЕГО в режиме реального времени (сразу после PUT). Я был обманут единственным предложением документации, которую мог найти на этом «Начать чтение сразу после самой последней записи в осколке, чтобы вы всегда читали самые последние данные в осколке». Вы получали пустой массив, потому что никакие записи не были PUT с момента получения итератора. Если вы получите этот тип итератора, а затем запустите запись, эта запись будет немедленно доступна.

Наконец, если вы знаете идентификатор последовательности недавно поставленной записи, вы можете получить ее немедленно, используя AT_SEQUENCE_NUMBER, и вы можете получить более поздние записи, используя AFTER_SEQUENCE_NUMBER, даже если они не будут отображаться в итераторе TRIM_HORIZON.

Вышеупомянутое означает, что если вы хотите прочитать все известные прошлые записи и будущие записи в реальном времени, вам нужно использовать комбинацию A и B, с логикой, чтобы справиться с записями между ними (недавнее прошлое) , KCL может хорошо сгладить это.

Смежные вопросы