0

Я новичок в Scala и Kafka, и у меня возникли проблемы.Kafka производитель создает тему, но не умеет отправлять сообщения

Я пытаюсь подключить производитель scala kafka к серверу kafka, который установлен на сервере cloudera express. Я сделал это уже один раз в виртуальных машинах с these instructions и не имел никаких проблем.

Когда я запускаю продюсер, создается желаемая тема, но ни одно сообщение не отправляется, или я так думаю.

Здесь не следует часть кода:

производителя Кафки

import java.util.Properties 
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} 

class KafkaProducerManager { 

    val props = new Properties() 
    props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS) 
    props.put("acks", "all") 
    props.put("retries", "2") 
    props.put("auto.commit.interval.ms", "1000") 
    props.put("linger.ms", "1") 
    props.put("block.on.buffer.full", "true") 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
    props.put("auto.create.topics.enable", "true") 

    val producer = new KafkaProducer[String, String](props) 

    def startCounter() { 
     println("Start Producer Counter") 
     for (i <- 1 to 100) { 
      producer.send(new ProducerRecord("test-counter", i.toString, "Package " + i)) 
      println("Producer - Send: " + i) 
     } 

     println("Closing producer") 
     producer.close() 
    } 
} 

Когда я выполнить метод запуска, я вижу «Producer - Отправить: #», как выход из этого и я не ошибки. Итак, я предполагаю, что этот кусок кода отправил сообщения в Кафку.

Я начал следующий на сервере Кафку, прежде чем я побежал производитель:

kafka-console-consumer --zookeeper zk:2181 --topic test-counter 

Но вот я вижу, ничего не происходит.

Когда я проверяю тему, которую производитель должен создать, находится в списке.

kafka-topics -zookeeper zk:2181 --list 

У меня также есть аналогичная проблема с потребителем:

import java.util.{Arrays, Properties} 
import org.apache.kafka.clients.consumer.KafkaConsumer 

class KafkaConsumerManager { 

    val props = new Properties() 
    props.put("bootstrap.servers", KafkaServer.KAFKA_ADDRESS) 
    props.put("group.id", "testGroup") 
    props.put("enable.auto.commit", "true") 
    props.put("auto.commit.interval.ms", "1000") 
    props.put("linger.ms", "1") 
    props.put("session.timeout.ms", "3000") 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
    props.put("zookeeper.connect", KafkaServer.ZOOKEEPER_ADDRESS) 

    val consumer = new KafkaConsumer[String, String](props) 

    def start() { 
     println("Start Consumer") 
     consumer.subscribe(Arrays.asList("test-counter")) 

     while (true) { 
      val records = consumer.poll(100) 
      val iterator = records.iterator() 

      while (iterator.hasNext) { 
       val record = iterator.next() 
       printf("Consumer: offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value()) 
      } 
     } 
    } 
} 

При создании сообщения на сервере через Кафка-консольной-производителя я вижу их появляются в Кафка-консоли-потребителей на окружающую сервер, но не в потребитель, которого я написал.

kafka-console-producer --broker-list ks:9092 --topic test-counter 

KafkaServer.ZOOKEEPER_ADDRESS такое же, как аргумент гк: 2181 с Кафкой-консольным-потребителем и KafkaServer.KAFKA_ADDRESS таким же, как аргумент кс: 9092 с Кафкой-консольным-производителем.

ответ

0

Я попробовал код и обнаружил, что:

  • следует указать ключ и значенияdeserializers в потребительском свойства:

    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
    
  • есть проблема с session.timeout.ms Недвижимость. От here:

    heartbeat.interval.ms - ... Значение должно быть ниже, чемsession.timeout.ms ...по умолчанию: 3000

    Это означает, что вы должны либо увеличить значение session.timeout.ms или просто удалить линию, так как значение по умолчанию для собственности 30000, который больше, чем по умолчанию heartbeat.interval.ms.

После выполнения исправления код работает.

+0

Я попробую это немедленно –

+0

С этими изменениями проблема все еще существует. Но теперь я могу видеть, что это причина: 'java.net.ConnectException: время соединения завершено: никакой дополнительной информации. По какой-то причине производитель и потребитель не могут подключиться к ks: 9092. –

+0

Я пробовал этот код с другим сервером kafka, и там он отлично работает. Кажется, это проблема с нашим kafkaserver на cloudera и конфигурации. –