2017-02-14 2 views
1

У меня есть прецедент, где я должен обрабатывать события в режиме FIFO. Это события, генерируемые машинами. каждая машина генерирует одно событие за 30 секунд. Для конкретной машины нам нужно обработать события на основе FIFO fasion.Обработка FIFO с использованием Spark Streaming?

Нам необходимо обработать около 240 миллионов событий в день. Для такого масштабного масштаба нам необходимо использовать Kafka + Spark Streaming

Из документации Kafka я понимаю, что мы можем использовать ключевое поле сообщения для маршрутизации сообщения в определенный раздел темы. Это гарантирует, что я могу использовать идентификатор машины в качестве ключа и обеспечить, чтобы все сообщения с конкретной машины попадали в один раздел тем.

50% проблема решена.

Здесь идет вопрос на стороне обработки.

Искра Документация подхода Kafka Direct говорит, что разделы RDD эквивалентны разделам Kafka.

Так что, когда я выполняю rdd.foreachPartition, задание выполняет итерацию в упорядоченном fasion?

Обеспечено ли, что раздел RDD всегда лежит в одном исполнителе?

Обеспечено ли, чтобы задача foreachPartition выполнялась только одним потоком для всего раздела?

Пожалуйста, помогите.

ответ

1

Предположим, что вы не используете операторов, которые переделывают данные (например, repartition, reduceByKey, reduceByKeyAndWindow, ...).

Так что, когда я выполняю rdd.foreachPartition, задание задает итерацию в упорядоченном fasion?

Да. Он обрабатывает данные, соответствующие порядку в разделе Kafka.

Обеспечено ли, что раздел RDD всегда лежит в одном исполнителе?

Да. Существует только один исполнитель (задача), обрабатывающий раздел, если вы не включили speculation. speculation может запустить другую задачу для запуска того же раздела, если он слишком медленный.

Обеспечено, что задача foreachPartition выполняется только одним потоком для всего раздела?

Да. Он обрабатывает данные в одном разделе один за другим.

+0

первое предложение в этом ответе очень важно. Любая операция, вызванная перемещением, заставит ваши данные «не синхронизироваться». Если это проблема, рассмотрите возможность явной сортировки событий на машинный идентификатор. –

0

Из документации Kafka я понимаю, что мы можем использовать ключевое поле сообщения для маршрутизации сообщения в определенный раздел темы. Это гарантирует, что я могу использовать идентификатор машины в качестве ключа и обеспечить, чтобы все сообщения с конкретной машины попадали в один раздел тем.

При публикации данных в Kafka вам не нужно использовать идентификатор машины.Используйте null как ключ, и kafka будет внутренне использовать схему разбиения Hash, чтобы отправить данные соответствующим образом различным хостам kafka.

Здесь возникает вопрос на стороне обработки.

Гоча: При обработке в искру, она не будет иметь глобальный порядок. Пример: Есть 5 событий (упорядоченный по времени): e0 (ранний), e1, e2, e3, e4 (последняя)

Этих назначь к различным разделам Кафки:

Kakfa Partition P0: e0, e3 Kafka Partition P1: e1, e2, e4

Итак, когда вы читаете в своей искровой работе, вы получите e0, e3 в одном RDD и e1, e2, e4 в другом RDD, в этом порядке.

Если вы хотите, чтобы глобальное упорядочение (e0, e1, e2, e3, e4), вам нужно было написать один раздел в kafka. Но тогда вы потеряете терпимость к разделам и столкнетесь с некоторыми проблемами производительности (необходимо настроить производителей и потребителей). 3000 событий/сек должно быть хорошо, но это также зависит от вашего кластера kafka.

Ваши другие вопросы уже ответили на @zsxwing (see)

Смежные вопросы