2016-11-22 2 views
0

Я пытаюсь подключиться к IBM Bluemix сообщений узлового и сгенерирует сообщение с помощью Java, следуя примеруКафка Производитель Timeout Issue подключения к сообщени-концентратору Bluemix

https://github.com/ibm-messaging/message-hub-samples/tree/master/java/message-hub-kafka-ssl

producer.properties

key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 
acks=-1 
security.protocol=SASL_SSL 
sasl.mechanism=PLAIN 
ssl.protocol=TLSv1.2 
ssl.enabled.protocols=TLSv1.2 
ssl.truststore.password=changeit 
ssl.truststore.type=JKS 
ssl.endpoint.identification.algorithm=HTTPS 
ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/jre/lib/security/cacerts 

jaas.conf.template

KafkaClient { 
    com.ibm.messagehub.login.MessageHubLoginModule required 
    serviceName="kafka" 
    user="$USERNAME" 
    password="$PASSWORD"; 
}; 

Producer.java фрагмент

ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(
       "MyTopic", 
       KEY.getBytes("UTF-8"), 
       "MESSAGE".getBytes("UTF-8")); 

     // Synchronously wait for a response from Message Hub/Kafka. 
     RecordMetadata m = kafkaProducer.send(record).get(); 

Проблема заключается в том, я получаю исключение тайм-аут, когда я пытаюсь получить будущее RecordMetadata

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. 
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:730) 
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:483) 
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430) 
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353) 

Прочитайте ранее пост на том же тема

Timeout connecting to message-hub on Bluemix

И возможная причина была упомянута тема не created.I можно увидеть тему в bluemix консоли и проверить, я назвал службу отдыха, чтобы принести список тем перед отправкой сообщения

RESTRequest restApi = new RESTRequest(getRestHost(),getApiKey()); 

String topics = restApi.get("/admin/topics", false); 

logger.info("Topics present in the system: " + topics); 

Он возвращает тему, где я пытаюсь нажать сообщение, и все же я получаю ошибку тайм-аута.

Может кто-то пожалуйста, помогите мне в отладке проблемы

UPDATE

На основе замечаний, я включил журналы отладки для Кафки, а вот последовательность бревен

2016-11-23 08:48:20.906 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.clients.NetworkClient : Initialize connection to node -5 for sending metadata request 
2016-11-23 08:48:20.906 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.clients.NetworkClient : Initiating connection to node -5 at kafka05-prod01.messagehub.services.us-south.bluemix.net:9093. 
2016-11-23 08:48:20.914 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator  : Set SASL client state to SEND_HANDSHAKE_REQUEST 
2016-11-23 08:48:20.915 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator  : Creating SaslClient: client=messagehub/[email protected];service=kafka;serviceHostname=kafka05-prod01.messagehub.services.us-south.bluemix.net;mechs=[PLAIN] 
2016-11-23 08:48:20.979 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--5.bytes-sent 
2016-11-23 08:48:20.980 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--5.bytes-received 
2016-11-23 08:48:20.980 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.common.metrics.Metrics : Added sensor with name node--5.latency 
2016-11-23 08:48:20.982 DEBUG 72885 --- [sage-hub-sample] org.apache.kafka.clients.NetworkClient : Completed connection to node -5 
2016-11-23 08:48:21.080 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator  : Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE 

2016-11-23 08:48:21.264 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator  : Set SASL client state to INITIAL 
2016-11-23 08:48:21.265 DEBUG 72885 --- [sage-hub-sample] o.a.k.c.s.a.SaslClientAuthenticator  : Set SASL client state to INTERMEDIATE 
2016-11-23 08:48:21.284 DEBUG 72885 --- [sage-hub-sample] o.apache.kafka.common.network.Selector : Connection with kafka05-prod01.messagehub.services.us-south.bluemix.net/23.246.202.55 disconnected 

java.io.EOFException: null 
at org.apache.kafka.common.network.SslTransportLayer.read(SslTransportLayer.java:488) 
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) 
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) 
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:239) 
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:182) 
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:64) 
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:318) 
at org.apache.kafka.common.network.Selector.poll(Selector.java:283) 
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229) 
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134) 
at java.lang.Thread.run(Thread.java:745) 

Все вышесказанное регистрируется для любого из 5 брокеров. Также это все отладочные заявления, поэтому я не уверен, являются ли они ошибками.

-Tatha

ответ

1

Я вижу несколько проблем с файлом JAAS:

  • образца вы имеете в виду, использует Кафка 0.10.0.X, так что вы не должны использовать старый концентратор сообщений модуль для входа в систему Кафка 0.9. Так заменить «com.ibm.messagehub.login.MessageHubLoginModule» на «org.apache.kafka.common.security.plain.PlainLoginModule»

  • поле для имени пользователя, называется «имя пользователя», а не «пользователя ». Это должно быть имя пользователя = "$ USERNAME"

+0

Спасибо @Mickael, это были изменения, которые исправили проблему – Tatha

0

пожалуйста включите уровень DEBUG log4j и клиент следа Кафка может помочь выявить любые проблемы с подключением. Попробуйте изменить последнюю строку на log4j.logger.org.apache.kafka=DEBUG в файле log4j.properties, перестроить и посмотреть подробный вывод. Не стесняйтесь пересказывать это.

+0

Спасибо, что посмотрели на это. Я обновил Q с помощью отладочных журналов apache.kafka. Похоже на проблему с сетью/связью, как вы упомянули, но я не уверен, почему это происходит – Tatha

Смежные вопросы