2017-01-25 2 views
0

Я пытаюсь запустить пример подсчета слов, интегрирующий поток AWS Kinesis и Apache Spark. Случайные линии помещаются в Кинезис через равные промежутки времени.Как напечатать PythonTransformedDStream

lines = KinesisUtils.createStream(...) 

Когда я представить мое заявление, lines.pprint() я не вижу каких-либо значений отпечатанные.

Пробовал печатать lines объект, и я вижу <pyspark.streaming.dstream.TransformedDStream object at 0x7fa235724950>

Как напечатать PythonTransformedDStream объект? и проверьте, получены ли данные.

Уверен, что учетных данных не существует, если я использую ложные учетные данные, я получаю исключение доступа.

Добавлен код для справки

import sys 

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream 

if __name__ == "__main__": 
    sc = SparkContext(appName="SparkKinesisApp") 
    ssc = StreamingContext(sc, 1) 

    lines = KinesisUtils.createStream(ssc, "SparkKinesisApp", "myStream", "https://kinesis.us-east-1.amazonaws.com","us-east-1", InitialPositionInStream.LATEST, 2) 

    # lines.saveAsTextFiles('/home/ubuntu/logs/out.txt') 
    lines.pprint() 

    counts = lines.flatMap(lambda line: line.split(" ")) 
          .map(lambda word: (word, 1)) 
          .reduceByKey(lambda a, b: a + b) 

    counts.pprint() 

    ssc.start() 
    ssc.awaitTermination() 

ответ

0

В итоге я получил его работу.

В примере кода, который я указал на https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py, неверно вводить заявку.

Правильная команда, с которой я получил это работаю в

$ bin/spark-submit --jars external/spark-streaming-kinesis-asl_2.11-2.1.0.jar --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.1.0 /home/ubuntu/my_pyspark/spark_kinesis.py 
2

С lines.pprint() не печатает ничего, вы можете, пожалуйста, убедитесь, что вы выполняете:

ssc.start() 
ssc.awaitTermination() 

, как указано в примере здесь: https://github.com/apache/spark/blob/v2.1.0/examples/src/main/python/streaming/network_wordcount.py

pprint() должен работать, если среда настроена правильно:

http://spark.apache.org/docs/2.1.0/streaming-programming-guide.html#output-operations-on-dstreams

Выходные Операции по DStreams

print() - Печать первые десять элементов каждой партии данных в DStream на узле драйвера выполняется приложение потоковой передачи. Этот полезен для разработки и отладки. Python API Это называется pprint() в API Python.

+0

Я уже пробовал подсчет программу сетевого слова и 'pprint' работает для этого, так что я думаю, среда настроена соответствующим образом. Кроме того, упомянутые две строки доступны в конце моего кода. Программа запускается до тех пор, пока я не нажму ctrl + c. – ArunDhaJ

+0

@ArunDhaJ - вы установили сервер netcat (http://landoflinux.com/linux_netcat_command.html) и выполнили его с помощью '$ nc -lk 9999'? Вы вводили слова в консоли netcat, которые будут введены в вашу программу искрообразования? – Yaron

+0

Я пробовал программу подсчета сетевых слов с помощью 'nc' и успешно выполнил ее. Я столкнулся с проблемами интеграции Amazon Kinesis. Я публикую случайные предложения для потока кинезий, однако мой искровой клиент не выбирает его и не обрабатывает. – ArunDhaJ

Смежные вопросы