2016-01-18 3 views
1

Я получаю данные от Kafka в приложении Spark Streaming. Он поставляется в формате Transformed DStreams. Затем я сохраняю только те функции, которые мне нужны.
features=data.map(featurize)
, который дает мне «имя», «возраст», «независимо».
Затем я хочу сохранить только имя всех данных
features=data.map(featurize).map(lambda Names: Names["name"]Работа над отдельными элементами RDD-pyspark

Теперь, когда я напечатать эту команду, я получаю все имена, приходящие от потокового приложения, но я хочу работать на каждом из них в отдельности.
В частности, я хочу проверить каждое имя, и если я уже сталкивался с ним в прошлом, я хочу применить к нему функцию. В противном случае я просто продолжу свое приложение. Поэтому я хочу, чтобы каждое имя было строкой, чтобы я мог вставить ее в свою функцию, которая проверяет, была ли в прошлом показана одна строка.

Я знаю, что foreach предоставит мне каждый RDD, но все же я хочу работать над каждым именем RDD отдельно.

Есть ли способ в pyspark сделать это?

+1

Вы пробовали updateStateByKey, где ключи - ваши имена? – user3689574

+0

Как я могу использовать updateStateByKey для хранения имен из разных потоковых окон? – Iolkos

+0

Здесь вы можете найти полезную информацию (найдите раздел updateStateByKey): http://spark.apache.org/docs/latest/streaming-programming-guide.html – user3689574

ответ

0

Так что я сделал, чтобы определить функцию, которая проверяет, является ли я видел это имя в прошлом, а затем использовать .filter(myfunc) работать только с именами я хочу ...

Сейчас проблема заключается в том, что в в каждом новом потоковом окне функция применяется с самого начала, поэтому, если бы я видел имя John в первом окне 7 раз, я буду хранить его только один раз, но тогда, если бы я видел имя John во втором окне 5 раз я буду держать его снова один раз ...

Я хочу сохранить имя Джон один раз для всего потокового приложения ... Любые мысли по этому поводу?