2016-01-18 3 views
3

Ссылаясь на версию Apache Кафки 0.9.0.0, в соответствии с документацией по конфигурации производителя:Apache Кафка Производитель Ошибка конфигурации

http://kafka.apache.org/documentation.html#producerconfigs

мне нужно использовать следующее свойство, чтобы указать список брокеров :

props.put("bootstrap.servers", "localhost:9092") 

Вот мой класс продюсер:

def main(args: Array[String]) { 
    //val conf = new SparkConf().setAppName("VPP metrics producer") 
    //val sc = new SparkContext(conf) 

    val props: Properties = new Properties() 
     props.put("bootstrap.servers", "localhost:9092") 
     props.put("key.serializer", "kafka.serializer.StringEncoder") 
     props.put("value.serializer", "kafka.serializer.StringEncoder") 

    val config = new ProducerConfig(props) 
    val producer = new Producer[String, String](config) 

    1 to 10000 foreach { 
     case i => 
     val jsonStr = getRandomTsDataPoint().toJson.toString() 
     println(s"sending message $i to kafka") 
     producer.send(new KeyedMessage[String, String]("test_topic", jsonStr)) 
     println(s"sent message $i to kafka") 
    } 
    } 

Вот моя зависимость:

object Dependencies { 
    val resolutionRepos = Seq(
    "Spray Repository" at "http://repo.spray.cc/" 
) 

    object V { 
    val spark  = "1.6.0" 
    val kafka  = "0.9.0.0" 
    val jodaTime = "2.7" 
    val sprayJson = "1.3.2" 
    // Add versions for your additional libraries here... 
    } 

    object Libraries { 
    val sparkCore = "org.apache.spark"   %% "spark-core"   % V.spark 
    val kafka  = "org.apache.kafka"   %% "kafka"     % V.kafka 
    val jodaTime = "joda-time"     % "joda-time"    % V.jodaTime 
    val sprayJson = "io.spray"     %% "spray-json"   % V.sprayJson 
    } 
} 

Как вы можете видеть, я использую 0.9.0.0 версию Apache Кафки. Когда я попытался запустить класс Producer, я получаю следующую ошибку:

Joes-MacBook-Pro:spark-kafka-producer joe$ java -cp target/scala-2.11/spark-example-project-0.1.0-SNAPAHOT.jar com.eon.vpp.MetricsProducer 
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Missing required property 'metadata.broker.list' 
    at scala.Predef$.require(Predef.scala:219) 
    at kafka.utils.VerifiableProperties.getString(VerifiableProperties.scala:177) 
    at kafka.producer.ProducerConfig.<init>(ProducerConfig.scala:66) 
    at kafka.producer.ProducerConfig.<init>(ProducerConfig.scala:56) 
    at com.eon.vpp.MetricsProducer$.main(MetricsProducer.scala:45) 
    at com.eon.vpp.MetricsProducer.main(MetricsProducer.scala) 

Почему это? Я даже проверил содержимое моего файла jar, и он использует версию Apache Kafka 0.9.0.0! (kafka_2.11-0.9.0.0.jar)

+0

Работает ли она, если вы замените 'bootstrap.servers' на' metadata.broker.list'? – Rahul

ответ

Смежные вопросы