Я новичок в Spark/Scala, Я бы хотел использовать SparkStreaming для обработки сообщений устройств в облачных сообщениях на Azure IoTHub. К сожалению, я не могу создать рабочее соединение с IoThub. Я попытался использовать информацию, описанную here.Ошибка создания Spark StreamingContext to Azure IoThub
Это мой код:
package com.aimaps.iot;
import com.microsoft.spark.streaming.examples.arguments.EventhubsArgumentParser.ArgumentMap
import com.microsoft.spark.streaming.examples.arguments.{EventhubsArgumentKeys, EventhubsArgumentParser}
import com.microsoft.spark.streaming.examples.common.StreamStatistics
import org.apache.spark._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.eventhubs.EventHubsUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.log4j.{Level, Logger}
object EventhubsListener {
def main (args: Array[String]) {
val eventhubsParams = Map[String, String] (
"eventhubs.namespace" -> "SIGFOX-AIMAPS-TEST.azure-devices.net",
"eventhubs.name" -> "SIGFOX-AIMAPS-TEST",
"eventhubs.policyname" -> "policyName",
"eventhubs.policykey" -> "policyKey",
"eventhubs.consumergroup" -> "$Default",
"eventhubs.partition.count" -> "2",
"eventhubs.checkpoint.dir" -> "./EventCheckpoint",
"eventhubs.checkpoint.interval" -> "1"
)
val ssc = new StreamingContext("local[*]", "IoT Test", Seconds(10))
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
val stream = EventHubsUtils.createUnionStream(ssc, eventhubsParams)
stream.foreachRDD((rdd, time) => {
if (rdd.count() > 0) {
println(f"Recieved ${rdd.count()} messages.")
} else {
println("No messages recieved.")
}
}
)
ssc.start()
ssc.awaitTermination()
}
}
Я получаю эту ошибку:
ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error handling message, restarting receiver - com.microsoft.azure.servicebus.CommunicationException: java.nio.channels.UnresolvedAddressException. This is usually caused by incorrect hostname or network configuration. Please check to see if namespace information is correct. TrackingId: 5778876f-a4d5-4bf9-8260-83daaaf83ccf, at: 2016-11-22T11:21:46.805+01:00[Europe/Bratislava]
at com.microsoft.azure.servicebus.MessagingFactory$RunReactor.run(MessagingFactory.java:371)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
at org.apache.qpid.proton.reactor.impl.IOHandler.handleBound(IOHandler.java:155)
at org.apache.qpid.proton.reactor.impl.IOHandler.onUnhandled(IOHandler.java:372)
at org.apache.qpid.proton.engine.BaseHandler.onConnectionBound(BaseHandler.java:58)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:131)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:309)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:277)
at com.microsoft.azure.servicebus.MessagingFactory$RunReactor.run(MessagingFactory.java:340)
... 1 more
Что я должен использовать в качестве параметров соединения? где я могу получить пространство имен Azure IoTHub?