2016-05-22 3 views
5

Я новичок в Kafka. Я создал java-продюсер на своей локальной машине и установил брокера Kafka на другой машине, скажем M2, в сети (я могу подключиться к SSH, подключиться к этой машине). На стороне Продюсера в консоли Eclipse я получаю сообщение «Сообщение отправлено». Но когда я проверяю потребитель консоли на машине M2, я не вижу эти сообщения.Kafka: сообщение на консоль отсутствует после сообщения, отправленного Java-производителем

Мой ява код производитель:

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.ProducerRecord; 


import java.util.HashMap; 
import java.util.Map; 

public class KafkaMessageProducer { 

    /** 
    * @param args 
    */ 
    public static void main(String[] args) { 

     KafkaMessageProducer reportObj = new KafkaMessageProducer(); 
     reportObj.send(); 

    } 

    public void send(){ 

     Map<String, Object> config = new HashMap<String, Object>(); 
     config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "135.113.133.60:9092"); 
     config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

     KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config); 
     int maxMessages = 5; 
     int count = 0; 
     while(count < maxMessages){ 
      producer.send(new ProducerRecord<String, String>("test", "msg", "message --- #"+count++)); 
      System.out.println("Message send.."+count); 
     } 
     producer.close(); 
    } 

} 

Можете ли вы, пожалуйста, дайте мне знать, где я буду неправильно? Я могу отправлять сообщения локально на машине M2 от производителя консоли. Примечание. Даже когда я меняю IP-адрес на полное имя хоста Kafka Broker, он по-прежнему имеет такую ​​же проблему.

Обновление: Я также думаю, что Продюсер может подключиться к брокеру Kafka и отправлять сообщения, но Kafka Broker не передает эти сообщения потребителю. Если я изменю IP-адрес или порт на Zookeeper (который работает на том же узле, что и Kafka Broker), и просмотрите журнал Zookeeper, он получит команду «Продюсер» и затем отклонит сеанс.

Update2: Я создал банку производителя и запустил эту банку на машине M2, и она сработала. Таким образом, кажется, что что-то не так с тем, как Продюсер пытается подключиться к брокерам Kafka. Не уверен, в чем проблема.

+0

У вас есть потребитель консоли и работает до того, как производитель отправит сообщение? Вы пытались прочитать с начала темы? – yuyang

+0

Да. Пользователь консоли был запущен. Я также попробовал это после того, как продюсер отправил сообщения. Я читаю форму консольного потребителя с начала темы. – user2441441

+0

Несвязанный: я нашел, что другой вопрос не стоит так много downvotes. Поэтому я даю некоторую компенсацию здесь ;-) – GhostCat

ответ

3

Наконец-то я нашел ответ, и я отправляю его здесь, если у кого-то другая проблема. Если вы пытаетесь подключиться удаленно, используйте параметр брокера Kafka advertized.hostname. Это сработало для меня.

+0

, пожалуйста, уточните немного больше, можете ли вы указать всю строку конфигурации, которую вы добавили/изменили. (как вы описали вопрос, пожалуйста, опишите свой ответ тоже). Ожидание вашего ответа и благодарности заранее. –

+1

@DynamicRemo В файле \ Kafka \ config \ server.properties вам нужно добавить следующее: 'port = 9092 advertized.host.name = localhost' Обратите внимание, что localhost используется, потому что я не использую удаленный m/c. –

1

Вы можете использовать код следующим образом, чтобы прочитать информацию метаданных для темы kafka, чтобы узнать, получил ли брокер сообщения. Это может помочь отладки.

SimpleConsumer consumer = new SimpleConsumer(broker.host(), broker.port(), 100000, 
     64 * 1024, "your_group_id"); 
List<String> topics = new ArrayList<>(); 
topics.add(topic); 
TopicMetadataRequest req = new TopicMetadataRequest(topics); 

TopicMetadataResponse resp = simpleConsumer.send(req); 
if (resp.topicsMetadata().size() != 1) { 
    throw new RuntimeException("Expected one metadata for topic " 
     + topic + " found " + resp.topicsMetadata().size()); 
} 

TopicMetadata topicMetaData = resp.topicsMetadata().get(0); 
+0

Где я могу запустить это? На моей локальной машине M2? – user2441441

+0

вы можете запустить это с любого хоста, который может поговорить с брокером kafka. – yuyang

+0

Еще один вопрос. Где я упоминаю детали Machine M2 выше, если я запустил это на своей локальной машине? – user2441441

2

Подобно тому, как идея для отладки - попробовать producer.send(/* record */).get(); То есть, ждать результата от Future возвращенного из метода send(). Может быть, на стороне производителя есть исключение, и это просто игнорируется в фоновом режиме.

+0

После этого он остается застрявшим. Единственное, что напечатано на консоли, это предупреждение: log4j: WARN Для регистратора не найдено ни одного приложения (org.apache.kafka.clients.producer.ProducerConfig). log4j: WARN Пожалуйста, правильно инициализируйте систему log4j. log4j: WARN См. Http://logging.apache.org/log4j/1.2/faq.html#noconfig для получения дополнительной информации. – user2441441

Смежные вопросы