2

Я пытаюсь читать сообщения от Kafka, обрабатывать данные, а затем добавлять данные в cassandra, как если бы это RDD.Сохранение данных обратно в Cassandra as RDD

Моя проблема заключается в том, чтобы сохранить данные обратно в cassandra.

from __future__ import print_function 

from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 
from pyspark import SparkConf, SparkContext 

appName = 'Kafka_Cassandra_Test' 
kafkaBrokers = '1.2.3.4:9092' 
topic = 'test' 
cassandraHosts = '1,2,3' 
sparkMaster = 'spark://mysparkmaster:7077' 


if __name__ == "__main__": 
    conf = SparkConf() 
    conf.set('spark.cassandra.connection.host', cassandraHosts) 

    sc = SparkContext(sparkMaster, appName, conf=conf) 

    ssc = StreamingContext(sc, 1) 

    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": kafkaBrokers}) 
    lines = kvs.map(lambda x: x[1]) 
    counts = lines.flatMap(lambda line: line.split(" ")) \ 
     .map(lambda word: (word, 1)) \ 
     .reduceByKey(lambda a, b: a+b) 
    counts.saveToCassandra('coreglead_v2', 'wordcount') 

    ssc.start() 
    ssc.awaitTermination() 

И ошибка:

[[email protected] ~]# spark-submit --jars /var/spark/lib/spark-streaming-kafka-assembly_2.10-1.6.0.jar --packages datastax:spark-cassandra-connector:1.5.0-RC1-s_2.11 /var/spark/scripts/kafka_cassandra.py 
Ivy Default Cache set to: /root/.ivy2/cache 
The jars for the packages stored in: /root/.ivy2/jars 
:: loading settings :: url = jar:file:/var/spark/lib/spark-assembly-1.6.0-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml 
datastax#spark-cassandra-connector added as a dependency 
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 
    confs: [default] 
    found datastax#spark-cassandra-connector;1.5.0-RC1-s_2.11 in spark-packages 
    found org.apache.cassandra#cassandra-clientutil;2.2.2 in central 
    found com.datastax.cassandra#cassandra-driver-core;3.0.0-rc1 in central 
    found io.netty#netty-handler;4.0.33.Final in central 
    found io.netty#netty-buffer;4.0.33.Final in central 
    found io.netty#netty-common;4.0.33.Final in central 
    found io.netty#netty-transport;4.0.33.Final in central 
    found io.netty#netty-codec;4.0.33.Final in central 
    found io.dropwizard.metrics#metrics-core;3.1.2 in central 
    found org.slf4j#slf4j-api;1.7.7 in central 
    found org.apache.commons#commons-lang3;3.3.2 in central 
    found com.google.guava#guava;16.0.1 in central 
    found org.joda#joda-convert;1.2 in central 
    found joda-time#joda-time;2.3 in central 
    found com.twitter#jsr166e;1.1.0 in central 
    found org.scala-lang#scala-reflect;2.11.7 in central 
:: resolution report :: resolve 647ms :: artifacts dl 15ms 
    :: modules in use: 
    com.datastax.cassandra#cassandra-driver-core;3.0.0-rc1 from central in [default] 
    com.google.guava#guava;16.0.1 from central in [default] 
    com.twitter#jsr166e;1.1.0 from central in [default] 
    datastax#spark-cassandra-connector;1.5.0-RC1-s_2.11 from spark-packages in [default] 
    io.dropwizard.metrics#metrics-core;3.1.2 from central in [default] 
    io.netty#netty-buffer;4.0.33.Final from central in [default] 
    io.netty#netty-codec;4.0.33.Final from central in [default] 
    io.netty#netty-common;4.0.33.Final from central in [default] 
    io.netty#netty-handler;4.0.33.Final from central in [default] 
    io.netty#netty-transport;4.0.33.Final from central in [default] 
    joda-time#joda-time;2.3 from central in [default] 
    org.apache.cassandra#cassandra-clientutil;2.2.2 from central in [default] 
    org.apache.commons#commons-lang3;3.3.2 from central in [default] 
    org.joda#joda-convert;1.2 from central in [default] 
    org.scala-lang#scala-reflect;2.11.7 from central in [default] 
    org.slf4j#slf4j-api;1.7.7 from central in [default] 
    --------------------------------------------------------------------- 
    |     |   modules   || artifacts | 
    |  conf  | number| search|dwnlded|evicted|| number|dwnlded| 
    --------------------------------------------------------------------- 
    |  default  | 16 | 0 | 0 | 0 || 16 | 0 | 
    --------------------------------------------------------------------- 
:: retrieving :: org.apache.spark#spark-submit-parent 
    confs: [default] 
    0 artifacts copied, 16 already retrieved (0kB/14ms) 
16/02/15 16:26:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
Traceback (most recent call last): 
    File "/var/spark/scripts/kafka_cassandra.py", line 27, in <module> 
    counts.saveToCassandra('coreglead_v2', 'wordcount') 
AttributeError: 'TransformedDStream' object has no attribute 'saveToCassandra' 

От поиска вокруг я нашел this GitHub issue, однако это, как представляется, связаны с другой библиотеки (я не могу использовать эту библиотеку, как я используя Cassandra 3.0, и он пока не поддерживается).

Целью является создание агрегированных данных из одного сообщения (словосочетание только для тестирования) и вставка его в несколько таблиц.

Я близок к тому, чтобы просто использовать Datastax Python Driver и писать заявления самостоятельно, но есть ли лучший способ достичь этого?

ответ

3

Вы используете соединитель Spark Cassandra от Datastax, который не поддерживает питон на уровне RDD/DStream. Поддерживаются только Dataframes. См. docs для получения дополнительной информации.

Я создал обертку вокруг вышеупомянутого разъема: PySpark Cassandra. Это не является полным дополнением к разъему Datastax, но там много всего. Кроме того, если производительность важна, исследование влияния производительности может стоить времени.

И, наконец, Spark отправляет python example с использованием CqlInput/OutputFormat из hasoop mapreduce. На мой взгляд, это не очень удобный вариант для разработчиков, но он есть.

+0

Спасибо за ваш ответ. Я видел вашу библиотеку PySpark-Cassandra, но, похоже, она не поддерживает Cassandra 3, это все еще так? –

+0

@ JimWright, pyspark-cassandra построен на https://github.com/datastax/spark-cassandra-connector. Совместимость с Cassandra 3 была выпущена не так давно (v1.5). Версия 0.3.1 из pyspark-cassandra построен на этом выпуске и, таким образом, поддерживает Cassandra 3. –

+0

Привет, в 11 октября вы изменили README.md, добавив это предложение: «PySpark Cassandra больше не поддерживается. Усилия по развитию отошли от Spark до чистой среды Python ". Каким будет ваше обновление для вашего ответа? – benjguin

0

Посмотрите на свой код и ознакомьтесь с описанием проблемы: не видно, что есть какой-либо соединитель Cassandra, который вы используете. Spark не поставляется с поддержкой Cassandra из коробки, поэтому такие типы данных RDD и DStream не имеют метода saveToCassandra. Вам необходимо импортировать внешний разъем Spark-Cassandra, который расширяет типы RDD и DStream для поддержки интеграции Cassandra.

Вот почему вы получаете сообщение об ошибке: Python не может найти какую-либо функцию saveToCassandra по типу DStream, поскольку в настоящее время нет.

Вам понадобится соединитель DataStax или другой разъем для расширения типа DStream с помощью saveToCassandra.

+0

Спасибо за ответ, я использую коннектор с помощью datastax: https://github.com/datastax/spark-cassandra-connector, который я указал, что запускаю spark-submit. Я новичок в Python, откуда я знаю, что мне нужно импортировать? –

+0

@JimWright как у вас есть программа Spark & ​​PySpark? Используете ли вы DataStax Enterprise? Кроме того, вы используете оболочку pyspark или как вы пытаетесь выполнить свой код? – egerhard

+0

Я использую сообщество издание, и я бегу по командной строке через spark-submit: spark-submit --jars /var/spark/lib/spark-streaming-kafka-assembly_2.10-1.6.0.jar - пакеты datastax: spark-cassandra-connector: 1.5.0-RC1-s_2.11 /var/spark/scripts/kafka_cassandra.py –