0

У меня есть следующий сценарий pyspark, которые предполагают подключение к локальной Кафка кластера:pyspark не смог найти KafkaUtils.createDirectStream

from pyspark import SparkConf, SparkContext 

from operator import add 
import sys 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 
## Constants 
APP_NAME = "PythonStreamingDirectKafkaWordCount" 
##OTHER FUNCTIONS/CLASSES 

def main(): 
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount") 
    ssc = StreamingContext(sc, 2) 

    brokers, topic = sys.argv[1:] 
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) 
    lines = kvs.map(lambda x: x[1]) 
    counts = lines.flatMap(lambda line: line.split(" ")) \ 
     .map(lambda word: (word, 1)) \ 
     .reduceByKey(lambda a, b: a+b) 
    counts.pprint() 

    ssc.start() 
    ssc.awaitTermination() 
if __name__ == "__main__": 

    main() 

Когда я запускаю это, я получаю следующее сообщение об ошибке:

File "/home/ubuntu/spark-1.3.0-bin-hadoop2.4/hello1.py", line 16, in main 
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) 
AttributeError: type object 'KafkaUtils' has no attribute 'createDirectStream' 

Что делать, если у вас есть доступ к KafkaUtils.createDirectStream?

ответ

1

Вы используете Spark 1.3.0, а версия Python createDirectStream была введена в Spark 1.4.0. Spark 1.3 обеспечивает только реализацию Scala и Java.

Если вы хотите использовать прямой поток, вам придется обновить установку Spark.

Смежные вопросы