2015-03-14 2 views
3

Я написал простое искрообразующее приложение, которое в основном считывает поток событий из Кафки и сохраняет эти события в Кассандре в таблице, позволяя эффективные запросы по этим данным. Основная цель этого задания - обрабатывать текущие данные в реальном времени. Но есть также исторические события, хранящиеся в hdf.Приостановка/дросселирование искры/искрообразование

Я хочу повторно использовать РД обработки коды (часть потоковой работы) в исторической работе, и мне интересно, что является лучшим решением для чтения исторических данных в соответствии со следующими требованиями:

  1. Historical события хранятся в ежедневных свернутых файлах в hdfs (я хочу запустить задание в ряде исторических файлов)
  2. Было бы неплохо иметь возможность приостанавливать работу (вставки в cassandra являются идемпотентными, поэтому мне нужно -очередная обработка)
  3. Я хочу иметь механизм дросселирования, позволяющий определить максимальное число бер событий, которые могут быть обработаны (в определенный период времени: например, каждый 1мин)

Я рассмотрел два подхода до сих пор:

  • Batch Спарк работа
    • Ad1: Есть лучший способ определить RDD на основе диапазона файлов, чем создание одного RDD для каждого файла, а затем объединить их?
    • Ad2,3: Возможно ли это?
  • Спарк Streaming работа
    • Ad1: Как эффективно определить диапазон входных файлов? Sth лучше, чем использовать ssc.textFileStream(inputDir) и копировать файлы, которые я хочу обработать в этом каталоге?
    • Ad2: Я предполагаю, что установка каталога контрольной точки - это то, что я ищу.
    • Ad3 Я планирую использовать spark.streaming.receiver.maxRate свойству

Правильно ли я, что регулярная партия искра не может удовлетворить мои требования? Я жду вашего совета, чтобы исправить потоковое решение.

ответ

2

Для Batch Спарк работы, 1. Вы можете дать разделенные запятой имена файлов в СБН. *** Файловые операции 2, 3. Так как вы будете иметь возможность

Для потоковой работы, 1. Вы может определять RDD для файлов и вставлять их с помощью queueStream. 2. Зависит от того, что вы подразумеваете под паузой. Вы можете просто прекратить потоковый контекст изящно, когда вы хотите сделать паузу. 3. Да, это он.

Но, отступая, вы можете сделать много обмена кодами в преобразованиях RDD и DStream. Что бы вы ни делали для RDD в своей пакетной части, можно было бы повторно использовать в DStream.transform() в вашей потоковой части.

+0

Благодарим вас за ответ. Вы имеете в виду, что я должен указать несколько текстовых файлов следующим образом: sc.textFile ("hdfs: // path/file1, hdfs: // path/file2")? – tomek

+0

для тех, кто идет здесь, приостановка с закрытием sc может быть не очень хорошей идеей из-за этого: https://issues.apache.org/jira/browse/SPARK-13198 – XioRcaL