2016-09-09 4 views
3

Я пытаюсь подключиться к экземпляру узла сообщений Bluemix на http://bluemix.net. Этот простой скриптПочему kafka-python не удается подключиться к службе сообщений Bluemix?

#!/usr/bin/env python 

from kafka import KafkaProducer 
from kafka.errors import KafkaError 

kafka_brokers_sasl = [ 
    "kafka01-prod01.messagehub.services.us-south.bluemix.net:9093", 
    "kafka02-prod01.messagehub.services.us-south.bluemix.net:9093", 
    "kafka03-prod01.messagehub.services.us-south.bluemix.net:9093", 
    "kafka04-prod01.messagehub.services.us-south.bluemix.net:9093", 
    "kafka05-prod01.messagehub.services.us-south.bluemix.net:9093" ] 
sasl_plain_username = "xxxxxxxxxxxxxxx" 
sasl_plain_password = "xxxxxxxxxxxxxxxxxxxxxxxxx" 
sasl_mechanism = 'SASL_PLAINTEXT' 

producer = KafkaProducer(bootstrap_servers = kafka_brokers_sasl, 
         sasl_plain_username = sasl_plain_username, 
         sasl_plain_password = sasl_plain_password, 
         sasl_mechanism = sasl_mechanism) 

заканчивается ниже исключением:

Traceback (most recent call last): 
    File "./test-mh.py", line 12, in <module> 
    producer = KafkaProducer(bootstrap_servers = kafka_brokers_sasl, sasl_plain_username = sasl_plain_username, sasl_plain_password = sasl_plain_password, sasl_mechanism = sasl_mechanism) 
    File "/usr/local/lib/python2.7/dist-packages/kafka/producer/kafka.py", line 328, in __init__ 
    **self.config) 
    File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 202, in __init__ 
    self.config['api_version'] = self.check_version(timeout=check_timeout) 
    File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 791, in check_version 
    raise Errors.NoBrokersAvailable() 
kafka.errors.NoBrokersAvailable: NoBrokersAvailable 

У меня kafka_brokers_sasl, sasl_plain_username и sasl_plain_password из учетных messagehub службы объекта. Я использую kafka-python 1.3.1, который, похоже, поддерживает механизм аутентификации SASL. Любая идея, что я делаю неправильно? Благодарю.

ответ

5

Для концентратора сообщений требуется подключение клиентов с использованием соединения TLS 1.2. Это означает, что параметр security_protocol равен KafkaProducer, а также ssl.SSLContext через параметр ssl_context. По-видимому, клиент Python Kafka по умолчанию создает контекст SSLv23.

Вот изменения, необходимые для подключения:

import ssl 
from kafka import KafkaProducer 
from kafka.errors import KafkaError 

kafka_brokers_sasl = [ 
    "kafka01-prod01.messagehub.services.us-south.bluemix.net:9093", 
    "kafka02-prod01.messagehub.services.us-south.bluemix.net:9093", 
    "kafka03-prod01.messagehub.services.us-south.bluemix.net:9093", 
    "kafka04-prod01.messagehub.services.us-south.bluemix.net:9093", 
    "kafka05-prod01.messagehub.services.us-south.bluemix.net:9093" ] 
sasl_plain_username = "xxxxxxxxxxxxxxx" 
sasl_plain_password = "xxxxxxxxxxxxxxxxxxxxxxxxx" 

sasl_mechanism = 'PLAIN'  # <-- changed from 'SASL_PLAINTEXT' 
security_protocol = 'SASL_SSL' 

# Create a new context using system defaults, disable all but TLS1.2 
context = ssl.create_default_context() 
context.options &= ssl.OP_NO_TLSv1 
context.options &= ssl.OP_NO_TLSv1_1 

producer = KafkaProducer(bootstrap_servers = kafka_brokers_sasl, 
         sasl_plain_username = sasl_plain_username, 
         sasl_plain_password = sasl_plain_password, 
         security_protocol = security_protocol, 
         ssl_context = context, 
         sasl_mechanism = sasl_mechanism) 
Смежные вопросы