Я пытался расширить сеть количество слов, чтобы иметь возможность фильтровать строки на основе определенного ключевого словаPyspark работы фильтра на Dstream
Я использую искру 1.6.2
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 5)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.split(" ")).filter("ERROR")
counts.pprint()
ssc.start()
ssc.awaitTermination()
Я попробовал все вариации,
я почти всегда получаю ошибку я не могу применить такие функции, как
pprint/шоу/принять/собирающие on TransformedDStream
. Я использовал преобразование с foreachRDD в линиях Dstream с функцией для проверки с использованием собственных методов строки python, что тоже не срабатывает (на самом деле, если я использую печать в любом месте программы, появляется искра-подача, только сообщения об ошибках отсутствуют.
То, что я хочу, чтобы иметь возможность фильтровать входящие Dstreams по ключевому слову, как «ERROR» |. «WARNING» и т.д., и выводить их на стандартный вывод или STDERR
Благодаря @dfernig я принял ответ. Я думал, что я сравнивал слово за словом «ERROR» от split.filter :) У меня есть следующий вопрос, который я разместил здесь, если он может посмотреть? [ссылка] (https://stackoverflow.com/questions/42200620/pyspark-transfer-control-out-of-spark-session-sc) – GreenThumb