2016-05-20 3 views
3

Я пытаюсь написать простую задачу pyspark, которая получала бы данные от темы брокера kafka, делала некоторую трансформацию по этим данным и помещала преобразованные данные в другую тему брокера кафки.Как правильно использовать pyspark для отправки данных в kafka broker?

У меня есть следующий код, который считывает данные из Кафки темы, но не имеет никакого эффекта работает sendkafka функции:

from pyspark import SparkConf, SparkContext 

from operator import add 
import sys 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 
import json 
from kafka import SimpleProducer, KafkaClient 

def sendkafka(messages): 
    kafka = KafkaClient("localhost:9092") 
    producer = SimpleProducer(kafka) 
    for message in messages: 
     yield producer.send_messages('spark.out', message) 

def main(): 
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount") 
    ssc = StreamingContext(sc, 5) 

    brokers, topic = sys.argv[1:] 
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) 
    parsed = kvs.map(lambda (key, value): json.loads(value)) 
    parsed.pprint() 

    sentRDD = kvs.mapPartitions(sendkafka) 
    sentRDD.count() 

    ssc.start() 
    ssc.awaitTermination() 
if __name__ == "__main__": 

    main() 

Что я должен изменить, чтобы сделать мою sendkafka функцию на самом деле передач данных в искра. Кафка тема?

ответ

4

Вот правильный код, который считывает из Кафки в Искре, и записывает искровые данные обратно в другую Кафка тему:

from pyspark import SparkConf, SparkContext 
from operator import add 
import sys 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 
import json 
from kafka import SimpleProducer, KafkaClient 
from kafka import KafkaProducer 

producer = KafkaProducer(bootstrap_servers='localhost:9092') 

def handler(message): 
    records = message.collect() 
    for record in records: 
     producer.send('spark.out', str(record)) 
     producer.flush() 

def main(): 
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount") 
    ssc = StreamingContext(sc, 10) 

    brokers, topic = sys.argv[1:] 
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) 
    kvs.foreachRDD(handler) 

    ssc.start() 
    ssc.awaitTermination() 
if __name__ == "__main__": 

    main() 

способ запуска это:

spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.1.jar s.py localhost:9092 test 
+0

что дают ниже ошибок. Библиотеки Kafka Spark Streaming не найдены в пути класса. Попробуйте выполнить одно из следующих действий. 1. Включите библиотеку Кафки и его зависимости с в искровой подати команды в $ бен/искровой представить --packages org.apache.spark: искровой потоковый Кафка: 1.6.0 ... 2. Загрузите JAR артефакта из Maven Central http://search.maven.org/, Идентификатор группы = org.apache.spark, Артефакт Id = spark-streaming-kafka-assembly, Version = 1.6.0. Затем –

+2

@beyhan этот ответ работает только как локальная модель, а не кластер, – avocado

+1

@BeyhanGul вам нужно добавить --packages org.apache.spark: spark-streaming-kafka- <любая версия spark говорит вам использовать> для команда – Chandan

Смежные вопросы