2017-02-10 3 views
0

Я пытался расширить сеть количество слов, чтобы иметь возможность фильтровать строки на основе определенного ключевого словаPyspark работы фильтра на Dstream

Я использую искру 1.6.2

from __future__ import print_function 

import sys 

from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 

if __name__ == "__main__": 
    if len(sys.argv) != 3: 
     print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr) 
     exit(-1) 
    sc = SparkContext(appName="PythonStreamingNetworkWordCount") 
    ssc = StreamingContext(sc, 5) 

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) 
    counts = lines.flatMap(lambda line: line.split(" ")).filter("ERROR") 
    counts.pprint() 

    ssc.start() 
    ssc.awaitTermination() 

Я попробовал все вариации,

я почти всегда получаю ошибку я не могу применить такие функции, как

pprint/шоу/принять/собирающие on TransformedDStream

. Я использовал преобразование с foreachRDD в линиях Dstream с функцией для проверки с использованием собственных методов строки python, что тоже не срабатывает (на самом деле, если я использую печать в любом месте программы, появляется искра-подача, только сообщения об ошибках отсутствуют.

То, что я хочу, чтобы иметь возможность фильтровать входящие Dstreams по ключевому слову, как «ERROR» |. «WARNING» и т.д., и выводить их на стандартный вывод или STDERR

ответ

1

То, что я хочу, чтобы иметь возможность фильтровать входящие потоки данных по ключевому слову типа «ОШИБКА» | «ПРЕДУПРЕЖДЕНИЕ» и т. д. и выводят его на стандартный вывод или stderr.

Тогда вы не хотите называть flatMap, так как это разделит ваши линии на отдельные жетоны. Вместо этого, вы можете заменить этот вызов вызовом filter, который проверяет, содержит ли строка "error":

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) 
errors = lines.filter(lambda l: "error" in l.lower()) 
errors.pprint() 
+0

Благодаря @dfernig я принял ответ. Я думал, что я сравнивал слово за словом «ERROR» от split.filter :) У меня есть следующий вопрос, который я разместил здесь, если он может посмотреть? [ссылка] (https://stackoverflow.com/questions/42200620/pyspark-transfer-control-out-of-spark-session-sc) – GreenThumb

Смежные вопросы