Я пытаюсь написать простую задачу pyspark, которая получала бы данные от темы брокера kafka, делала некоторую трансформацию по этим данным и помещала преобразованные данные в другую тему брокера кафки.Как правильно использовать pyspark для отправки данных в kafka broker?
У меня есть следующий код, который считывает данные из Кафки темы, но не имеет никакого эффекта работает sendkafka функции:
from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
def sendkafka(messages):
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
for message in messages:
yield producer.send_messages('spark.out', message)
def main():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 5)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
parsed = kvs.map(lambda (key, value): json.loads(value))
parsed.pprint()
sentRDD = kvs.mapPartitions(sendkafka)
sentRDD.count()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
Что я должен изменить, чтобы сделать мою sendkafka функцию на самом деле передач данных в искра. Кафка тема?
что дают ниже ошибок. Библиотеки Kafka Spark Streaming не найдены в пути класса. Попробуйте выполнить одно из следующих действий. 1. Включите библиотеку Кафки и его зависимости с в искровой подати команды в $ бен/искровой представить --packages org.apache.spark: искровой потоковый Кафка: 1.6.0 ... 2. Загрузите JAR артефакта из Maven Central http://search.maven.org/, Идентификатор группы = org.apache.spark, Артефакт Id = spark-streaming-kafka-assembly, Version = 1.6.0. Затем –
@beyhan этот ответ работает только как локальная модель, а не кластер, – avocado
@BeyhanGul вам нужно добавить --packages org.apache.spark: spark-streaming-kafka- <любая версия spark говорит вам использовать> для команда – Chandan