2

У меня есть искровой потоковое приложение, которое выглядит следующим образом:Spark Streaming: Как добавить дополнительные разделы в мой DStream?

val message = KafkaUtils.createStream(...).map(_._2) 

message.foreachRDD(rdd => { 

    if (!rdd.isEmpty){ 
    val kafkaDF = sqlContext.read.json(rdd) 

    kafkaDF.foreachPartition(
     i =>{ 
     createConnection() 
     i.foreach(
      row =>{ 
      connection.sendToTable() 
      } 
     ) 
     closeConnection() 
     } 
    ) 

И, я запускаю его на кластере пряжи с использованием

spark-submit --master yarn-cluster --num-executors 3 --driver-memory 2g --executor-memory 2g --executor-cores 5.... 

Когда я пытаюсь войти kafkaDF.rdd.partitions.size, результат получается быть '1' или '5' в основном. Я смущен, возможно ли контролировать количество разделов моего DataFrame? KafkaUtils.createStream, похоже, не принимает никаких параметров, связанных с количеством разделов, которые я хочу для rdd. Я пробовал kafkaDF.rdd.repartition(int), но он тоже не работает.

Как добиться большего параллелизма в моем коде? Если мой подход ошибочен, каков правильный способ его достижения?

+0

Вы попробовали решение? Это сработало для вас? – marios

+0

Я добавил больше потребителей и больше разделов на тему Кафки. Теперь производительность лучше. Дайте мне знать – void

ответ

4

В Spark Streaming параллелизм может быть достигнут в двух областях: (a) потребители/приемники (в вашем случае потребители Kafka) и (b) обработка (выполняется Spark).

По умолчанию поток искрообразования присваивает каждому потребителю одно ядро ​​(aka Thread). Поэтому, если вам нужно больше данных для приема, вам нужно создать больше потребителей. Каждый потребитель создаст DStream. Затем вы можете объединить DStreams, чтобы получить один большой поток.

// A basic example with two threads for consumers 
val messageStream1 = KafkaUtils.createStream(...) // say, reading topic A 
val messageStream2 = KafkaUtils.createStream(...) // and this one reading topic B 

val combineStream = messageStream1.union(messageStream2) 

Alternatively, количество приемников/потребителей может быть увеличена путем переразметкой входного потока:

inputStream.repartition(<number of partitions>)) 

Все остальные ядра, доступные для потокового приложения будет назначен Спарк.

Так что если у вас есть N ядра (определяется через spark.cores.max) и у вас есть C потребителей вы остались с N-C ядер, доступных для Спарк.

#Partitions =~ #Consumers x (batch duration/block interval) 

блок интервал = сколько времени потребитель ожидает, прежде чем он толкает данные, которые он создан в качестве искрового блока (определяется как конфигурации spark.streaming.blockInterval).

Всегда помните, что Spark Streaming имеет две функции, которые постоянно имеют место. Набор потоков, которые читают текущую микропакету (потребители), и набор потоков, которые обрабатывают предыдущую микро-пакет (Spark).

Для получения более подробных рекомендаций по настройке производительности см. here, here и here.

+0

Я использую мое приложение в кластере YARN. Я не могу найти свойство spark.cores.max, но я устанавливаю -executor-core conf с моей командой spark-submit. Итак, это означает, что общее количество доступных ядер (num-executors * executor-core)? – void

+0

Кроме того, можете ли вы привести пример или ссылку для создания большего количества потребителей и объединения на них? Большое спасибо! – void

+0

Я не эксперт с конфигурациями YARN, так как я в основном работаю с автономным. 'spark.executor.cores' должен определять ядра для каждого исполнителя. Итак, '#Executors x spark.executor.cores = Total Cores' – marios