2016-05-16 4 views
0

Я начал Кафку с помощью следующей командыНе удалось опубликовать Avro сообщения Кафки Тема

docker run -p 2181:2181 -p 9092:9092 -p 8081:8081 --env ADVERTISED_HOST=`docker-machine ip \`docker-machine active\`` --env ADVERTISED_PORT=9092 spotify/kafka 

Теперь, я написал простую программу, которая отправляет строку в Кафка тему. он работает без проблем.

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092") 
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") 
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") 
val producer = new KafkaProducer[String, String](props) 
val inputRecord = new ProducerRecord[String, String]("test", "key2", "Hello World") 
producer.send(inputRecord) 
producer.close() 

Так что теперь я изменил эту программу и попытаться отправить Avro сообщение в тему Кафки

val props = new Properties() 
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092") 
props.put("schema.registry.url", "http://192.168.99.100:8081") 
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") 
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer") 
val producer = new KafkaProducer[String, Object](props) 
val inputRecord = createAvroRecord(schemaStr, "test1", "test1") 
val producer: KafkaProducer[String, Object] = CreateProducerAvro 
val producerAvroRecord = new ProducerRecord[String, Object]("test", "key1", inputRecord) 
producer.send(producerAvroRecord) 
producer.close() 

Но я получаю ошибку

[error] (run-main-0) org.apache.kafka.common.errors.SerializationException: Error serializing Avro message 
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message 
Caused by: java.net.ConnectException: Connection refused 
    at java.net.PlainSocketImpl.socketConnect(Native Method) 
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) 
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) 
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) 
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
    at java.net.Socket.connect(Socket.java:579) 
    at java.net.Socket.connect(Socket.java:528) 
    at sun.net.NetworkClient.doConnect(NetworkClient.java:180) 
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:432) 
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:527) 
    at sun.net.www.http.HttpClient.<init>(HttpClient.java:211) 
    at sun.net.www.http.HttpClient.New(HttpClient.java:308) 
    at sun.net.www.http.HttpClient.New(HttpClient.java:326) 
    at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:997) 
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:933) 
    at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:851) 
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1092) 
    at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.httpRequest(RestUtils.java:128) 
    at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.registerSchema(RestUtils.java:174) 
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:51) 
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:89) 
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:49) 
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:67) 
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:424) 
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339) 
    at KafkaPublisher$.SendAvroMessage(KafkaPublisher.scala:35) 
    at KafkaPublisher$.main(KafkaPublisher.scala:20) 
    at KafkaPublisher.main(KafkaPublisher.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
[trace] Stack trace suppressed: run last compile:run for the full output. 

ответ

0

Изменить свой key.serializer и значение. сериализатор в avro, как показано ниже.

props.put("key.serializer", io.confluent.kafka.serializers.KafkaAvroSerializer.class); 
props.put("value.serializer", io.confluent.kafka.serializers.KafkaAvroSerializer.class); 

При использовании реестра схемы, установите URL реестра схемы

props.put("schema.registry.url", "http://localhost:8081/subjects/Kafka-value/versions/1"); 

Примечание: Установите схемы URL реестра на сервер, на котором выполняется реестр схемы с именем субъекта «Kafka- value " Если вы не используете вышеуказанное, вы можете отказаться от этого.

Смежные вопросы