Я пытался настроить поток Spark для нескольких очередей RabbitMQ. Как упоминалось ниже, у меня установлено 2 рабочих, каждому работнику дается одно ядро и 2 ГБ памяти. Таким образом, проблема заключается в том, что я сохраняю этот параметр равным conf.set("spark.cores.max","2")
. Потоковая передача не обрабатывает никаких данных, которые он просто продолжает добавлять к заданиям. Но как только я установил его на conf.set("spark.cores.max","3")
, потоковая передача начнет его обрабатывать. Поэтому я не мог понять причину этого. Кроме того, если я хочу обрабатывать данные параллельно из обеих очередей, как я должен это делать. Я упомянул мои настройки кода и конфигурации ниже.Spark Потоковая обработка из нескольких очередей rabbitmq параллельно
Spark-env.sh:
SPARK_WORKER_MEMORY=2g
SPARK_WORKER_INSTANCES=1
SPARK_WORKER_CORES=1
Scala Код:
val rabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2","queueName" -> config.getString("queueName"),"host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("exchangeName"), "routingKeys" -> config.getString("routingKeys"))
val receiverStream = RabbitMQUtils.createStream(ssc, rabbitParams)
receiverStream.start()
val predRabbitParams = Map("storageLevel" -> "MEMORY_AND_DISK_SER_2", "queueName" -> config.getString("queueName1"), "host" -> config.getString("QueueHost"), "exchangeName" -> config.getString("exchangeName1"), "routingKeys" -> config.getString("routingKeys1"))
val predReceiverStream = RabbitMQUtils.createStream(ssc, predRabbitParams)
predReceiverStream.start()