2015-10-25 1 views
2

Я пытаюсь изменить пример искрообразования KafkaWordCount, чтобы взять поток байтов. Это мой код до сих пор:Ошибка Spark createStream при создании потока для декодирования байтовых массивов в IntelliJ с использованием плагина Scala

def main(args: Array[String]) { 
if (args.length < 4) { 
    System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>") 
    System.exit(1) 
} 

val Array(zkQuorum, group, topics, numThreads) = args 
val sparkConf = new SparkConf().setAppName("SiMod").setMaster("local[2]") 
val ssc = new StreamingContext(sparkConf, Seconds(2)) 
ssc.checkpoint("checkpoint") 

var event: Event = null 
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap 
val lines = KafkaUtils.createStream[String, Array[Byte], DefaultDecoder, DefaultDecoder](ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_ONLY_SER) 

Последняя строка -

val lines = KafkaUtils.createStream[String, Array[Byte], DefaultDecoder, DefaultDecoder](ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_ONLY_SER) 

дает ошибку в IntelliJ, хотя, насколько я могу видеть мое использование такое же, как и в других примерах.

Error:(35, 41) overloaded method value createStream with alternatives: 
    (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,keyTypeClass:  Class[String],valueTypeClass: Class[Array[Byte]],keyDecoderClass: Class[kafka.serializer.DefaultDecoder],valueDecoderClass: Class[kafka.serializer.DefaultDecoder],kafkaParams: java.util.Map[String,String],topics: java.util.Map[String,Integer],storageLevel: org.apache.spark.storage.StorageLevel)org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream[String,Array[Byte]] <and> 
(ssc: org.apache.spark.streaming.StreamingContext,kafkaParams: scala.collection.immutable.Map[String,String],topics: scala.collection.immutable.Map[String,Int],storageLevel: org.apache.spark.storage.StorageLevel)(implicit evidence$1: scala.reflect.ClassTag[String], implicit evidence$2: scala.reflect.ClassTag[Array[Byte]], implicit evidence$3: scala.reflect.ClassTag[kafka.serializer.DefaultDecoder], implicit evidence$4: scala.reflect.ClassTag[kafka.serializer.DefaultDecoder])org.apache.spark.streaming.dstream.ReceiverInputDStream[(String, Array[Byte])] 
cannot be applied to (org.apache.spark.streaming.StreamingContext, String, String, scala.collection.immutable.Map[String,Int]) 
val lines = KafkaUtils.createStream[String, Array[Byte], DefaultDecoder, DefaultDecoder](ssc, zkQuorum, group, topicMap) 

Что я могу сделать?

ответ

0

Попробуйте с Струнным декодером вместо для ключа:

val lines = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_ONLY_SER) 
Смежные вопросы