2017-01-21 2 views
2

В Apache NiFi, используя FetchS3Object для чтения из ведра S3, я вижу, что он может считывать весь объект в ведро и по мере добавления. Возможно ли:В NiFi можно ли выборочно читать через процессор FetchS3Object?

  1. Чтобы настроить процессор на чтение только объектов, добавленных сейчас, а не уже существующих?
  2. Как я могу заставить его прочитать определенную папку в ведре?

NiFi кажется отличным, просто отсутствуют примеры в их документации, по крайней мере, для популярных процессоров.

ответ

3

Комбинация процессоров ListS3 и FetchS3Object будет делать это:

  1. ListS3 - пронумеровать S3 ведро и генерировать flowfiles ссылающегося каждый объект. Вы можете настроить свойство Prefix, чтобы указать конкретную папку в ведре, чтобы перечислять только подмножество. ListS3 отслеживает то, что он прочитал, используя функцию состояния NiFi, поэтому он будет генерировать новые файлы потоков, когда новые объекты будут добавлены в ведро.
  2. FetchS3Object - читать объекты S3 в содержимом потока. Вы можете использовать вывод ListS3, сконфигурировав свойство Bucket на ${s3.bucket} и Объект Ключ до ${filename}.

enter image description here

+0

Thanks James. Я уже делаю это успешно. Мои вопросы очень специфичны для определенного варианта использования, когда я хочу читать только новые файлы по мере их добавления, а не старые в ведре. – Sammy

+0

ListS3 будет идентифицировать новые объекты. Вы можете позволить ему работать для чтения до «сейчас» и отбрасывать вывод для существующих файлов. – James

+0

Я закончил с помощью ListS3 + FetchS3Object вместе с RouteOnAttribute, где я добавил условие $ {s3.lastModified: ge (1485189600000)} для маршрутизации только недавно добавленных документов. – Sammy

1

Другой подход будет настроить S3 ведро для отправки уведомлений SNS, подписываться очереди SQS. NiFi будет считывать из очереди SQS, чтобы получать уведомления, фильтровать интересующие объекты и обрабатывать их.

Дополнительную информацию об этом подходе см. В разделе Monitoring An S3 Bucket in Apache NiFi.

1

Используйте GetSQS и fetchS3Object процессор и настройте ваш процессор GETSQS для прослушивания уведомления для вновь добавленного файла. Это подход, управляемый событиями, когда всякий раз, когда появляется новый файл, очередь SQS отправляет уведомление в nifi. Используйте нижеследующую ссылку для получения дополнительных разъяснений: AWS-NIFI integration

Смежные вопросы