Я пытаюсь читать потоковые данные с использованием Spark Python и изменять формат данных для потоковых данных. Но мне кажется, что я даже не могу прочитать поток ...Не удалось прочитать данные потока Spark
Вот мои шаги:
Я открыл один терминал, CD в папку ввода данных, а затем введите в командной строке
ls part-* | xargs -I % sh -c '{ cat %; sleep 5;}' | nc -lk 9999
Затем я открываю другой терминал, тип
setenv SPARK_HOME /user/abc/Downloads/spark-1.5.2-bin-hadoop2.6/
, чтобы я мог запускать Spark локально. Затем я набираю команду${SPARK_HOME}/bin/spark-submit --master local /user/abc/test.py localhost 9999
для запуска моего кода.
Ниже приведен код, я просто тестирование ли я читать потоковые данные, а затем изменить формат данных ... Но он всегда показывает ошибку: 16/01/28 22:41:37 INFO ReceiverSupervisorImpl: Starting receiver 16/01/28 22:41:37 INFO ReceiverSupervisorImpl: Called receiver onStart 16/01/28 22:41:37 INFO ReceiverSupervisorImpl: Receiver started again 16/01/28 22:41:37 INFO SocketReceiver: Connecting to localhost:9999 16/01/28 22:41:37 INFO SocketReceiver: Connected to localhost:9999 16/01/28 22:41:37 INFO SocketReceiver: Closed socket to localhost:9999 16/01/28 22:41:37 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Socket data stream had no more data
Если я повторно запустить ls part-* | xargs -I % sh -c '{ cat %; sleep 5;}' | nc -lk 9999
, он по-прежнему показывает ту же ошибку ... Вы знаете, как решить проблему?
import sys
import re
from pyspark import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
sc = SparkContext(appName="test")
ssc = StreamingContext(sc, 5)
sqlContext = SQLContext(sc)
def get_tuple(r):
m = re.search('\[(.*?)\]',r)
s = m.group(1)
fs = s.split(',')
for i in range(len(fs)):
if i > 1:
fs[i] = float(fs[i])
return fs
def main():
indata = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
inrdd = indata.map(lambda r: get_tuple(r))
Features = Row('feature_vec')
features_rdd = inrdd.map(lambda r: Features(r))
features_rdd.pprint(num=10)
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
Только для целей тестирования можно запустить пс сервера, как это и посмотреть, если он работает: в то время как истинные ; выполнить эхо-тест; сделано | nc -lk 9999 – facha
Большое спасибо @facha, это хороший способ проверить! Я узнал от вас больше! Просто решив проблему, команда искры должна добавить '[*]', например: '$ {SPARK_HOME}/bin/spark-submit --master local [*] /user/abc/test.py localhost 9999' –