2016-01-29 2 views
1

Я пытаюсь читать потоковые данные с использованием Spark Python и изменять формат данных для потоковых данных. Но мне кажется, что я даже не могу прочитать поток ...Не удалось прочитать данные потока Spark

Вот мои шаги:

  1. Я открыл один терминал, CD в папку ввода данных, а затем введите в командной строке

    ls part-* | xargs -I % sh -c '{ cat %; sleep 5;}' | nc -lk 9999 
    
  2. Затем я открываю другой терминал, тип setenv SPARK_HOME /user/abc/Downloads/spark-1.5.2-bin-hadoop2.6/, чтобы я мог запускать Spark локально. Затем я набираю команду ${SPARK_HOME}/bin/spark-submit --master local /user/abc/test.py localhost 9999 для запуска моего кода.

Ниже приведен код, я просто тестирование ли я читать потоковые данные, а затем изменить формат данных ... Но он всегда показывает ошибку: 16/01/28 22:41:37 INFO ReceiverSupervisorImpl: Starting receiver 16/01/28 22:41:37 INFO ReceiverSupervisorImpl: Called receiver onStart 16/01/28 22:41:37 INFO ReceiverSupervisorImpl: Receiver started again 16/01/28 22:41:37 INFO SocketReceiver: Connecting to localhost:9999 16/01/28 22:41:37 INFO SocketReceiver: Connected to localhost:9999 16/01/28 22:41:37 INFO SocketReceiver: Closed socket to localhost:9999 16/01/28 22:41:37 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Socket data stream had no more data

Если я повторно запустить ls part-* | xargs -I % sh -c '{ cat %; sleep 5;}' | nc -lk 9999, он по-прежнему показывает ту же ошибку ... Вы знаете, как решить проблему?

import sys 
import re 

from pyspark import SparkContext 
from pyspark.sql.context import SQLContext 
from pyspark.sql import Row 
from pyspark.streaming import StreamingContext 


sc = SparkContext(appName="test") 
ssc = StreamingContext(sc, 5) 
sqlContext = SQLContext(sc) 


def get_tuple(r): 
    m = re.search('\[(.*?)\]',r) 
    s = m.group(1) 
    fs = s.split(',') 
    for i in range(len(fs)): 
     if i > 1: 
      fs[i] = float(fs[i]) 
    return fs 


def main(): 
    indata = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) 
    inrdd = indata.map(lambda r: get_tuple(r)) 
    Features = Row('feature_vec') 
    features_rdd = inrdd.map(lambda r: Features(r)) 
    features_rdd.pprint(num=10) 

    ssc.start() 
    ssc.awaitTermination() 

if __name__ == "__main__": 
    main() 
+1

Только для целей тестирования можно запустить пс сервера, как это и посмотреть, если он работает: в то время как истинные ; выполнить эхо-тест; сделано | nc -lk 9999 – facha

+0

Большое спасибо @facha, это хороший способ проверить! Я узнал от вас больше! Просто решив проблему, команда искры должна добавить '[*]', например: '$ {SPARK_HOME}/bin/spark-submit --master local [*] /user/abc/test.py localhost 9999' –

ответ

0

Проблема решена. Команда Спарка линия должна добавить [*] для искровой потоковой передачи, как это:

${SPARK_HOME}/bin/spark-submit --master local[*] /user/abc/test.py localhost 9999 

Тогда выход появится

Смежные вопросы