2016-01-25 2 views
0

Мне нужно сохранить значения из kafka-> spark streaming-> cassandra.Сохранение значений от искры до Кассандры

Теперь, я получаю значения от искры kafka->, и у меня есть искровое задание для сохранения значений в db cassandra. Однако я столкнулся с проблемой с datatype dstream.

В этом следующем фрагменте вы можете увидеть, как я пытаюсь преобразовать DStream в удобный объект списка python, чтобы я мог работать с ним, но он дает ошибку.

входа на производителе Кафки:

Byrne 24 San Diego [email protected] Rob

искровой работа:

map1={'spark-kafka':1} 
kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", map1) 
lines = kafkaStream.map(lambda x: x[1]) 
words = lines.flatMap(lambda line: line.split(" ")) 

words.pprint() # outputs-> Byrne 24 SanDiego [email protected] Rob 

list=[lambda word for word in words] 
#gives an error -> TypeError: 'TransformedDStream' object is not iterable 

Это, как я экономлю значение от искрового> Кассандры

rdd2=sc.parallelize([{ 
... "lastname":'Byrne', 
... "age":24, 
... "city":"SanDiego", 
... "email":"[email protected]", 
... "firstname":"Rob"}]) 
rdd2.saveToCassandra("keyspace2","users") 

Что лучший способ преобразования объекта DStream в словарь или что лучший способ сделать то, что я пытаюсь сделать здесь?

Мне просто нужны значения, полученные от kafka (в виде DStream), которые будут сохранены в Кассандре.

Спасибо, и любая помощь будет приятной!

Версия:

Cassandra v2.1.12 
Spark v1.4.1 
Scala 2.10 
+0

Потому что это была моя ошибка, более связанная с проблемой $ JAVA_HOME, чем мезо. – HackCode

+0

Хорошо человек расслабиться. Нет причин, чтобы получить здесь гипер. Не будем спамить этот пост сейчас. – HackCode

ответ

0

На самом деле, я нашел ответ в этом руководстве http://katychuang.me/blog/2015-09-30-kafka_spark.html.

+0

@ HackCode. Я также пытаюсь выполнить тот же пример, но перед вызовом saveToCassandra ('keyspace', 'table') возникает строка .Error - py4j.protocol.Py4JJavaError: Произошла ошибка при вызове o38.newInstance. что мне не хватает, вы можете мне предложить. – kit

0

Как и все «Спарка», я думаю, краткое объяснение связанно, так как даже если вы знакомы с РДОМ, DStreams имеет еще более высокую концепцию:
A дискретизованной поток (DStream), представляет собой непрерывную последовательность RDD одного и того же типа, представляющую непрерывный поток данных. В вашем случае DStreams создаются из живых данных Кафки.
Хотя программа Спарк Streaming работает, каждый DStream периодически генерирует RDD от реальных данных Кафки

Теперь перебрать получил РДУ, вам нужно использовать DStream#foreachRDD (и, как следует из ее названия, она служит той же цели как foreach, но на этот раз, чтобы перебрать более RDDs).
Как только у вас есть RDD, вы можете вызывать rdd.collect() или rdd.take() или любой другой стандартный API для RDD.

Теперь, в качестве заключительной заметки, чтобы сделать вещи еще более увлекательными, Spark представил новый «прямой» подход, не требующий приема, чтобы обеспечить более надежные сквозные гарантии.
(, который требует Spark 1.3+)
Вместо использования приемников для приема данных этот подход периодически запрашивает Kafka для последних смещений в каждой теме + раздел и соответственно определяет диапазоны смещений для обработки в каждой партии. Когда запускаются задания для обработки данных, простой пользовательский API Kafka используется для чтения определенных диапазонов смещений от Kafka.
(что хороший способ сказать вам придется «беспорядок» с коррекциями сам)

См Direct Streams Approach для получения более подробной информации.
См here для примера кода Scala

Смежные вопросы