2015-07-06 3 views
6

У меня есть RDD Map[String, String]; есть способ вызвать filter его несколько раз, не проходя через RDD более одного раза?Spark - Несколько фильтров на RDD за один проход

Например, я хочу сделать что-то вроде этого:

val stateNY = mapRDD.filter(person => person("state").equals("NY")) 
val stateOR = mapRDD.filter(person => person("state").equals("OR")) 
val stateMA = mapRDD.filter(person => person("state").equals("MA")) 
val stateWA = mapRDD.filter(person => person("state").equals("WA")) 

и это:

val wage10to20 = mapRDD.filter(person => person("wage").toDouble > 10 && person("wage").toDouble <= 20) 
val wage20to30 = mapRDD.filter(person => person("wage").toDouble > 20 && person("wage").toDouble <= 30) 
val wage30to40 = mapRDD.filter(person => person("wage").toDouble > 30 && person("wage").toDouble <= 40) 
val wage40to50 = mapRDD.filter(person => person("wage").toDouble > 40 && person("wage").toDouble <= 50) 

где mapRDD имеет тип RDD[Map[String, String]], в один проход.

+0

Работа с распределенными коллекциями требует изменения ментальной модели. Вероятно, вам не нужно делать такие отфильтрованные выделения. Подумайте об альтернативах, группируя вещи. – maasg

ответ

6

Я предполагаю, что вы имеете в виду вы хотите вернуть отдельный РД для каждого значения (т.е. не просто сделать person => Set("NY", "OR", "MA", "WA").contains(person("state")))

Обычно то, что вы пытаетесь достичь будет возможно с помощью Pair RDDs

В первом примере, вы могли бы использовать:

val keyByState = mapRDD.keyBy(_("state")) 

А потом делать такие операции, как groupByKey, reduceByKey и т.д.

Или в вашем втором примере ключ по размеру заработной платы округлен до ближайшего 10.

+0

Спасибо! Один вопрос: когда я делаю '' 'keyBy''', за которым следует' '' groupByKey''', я получаю '' '' PairRDD''' '' 'String''' и' '' CompactBuffer '' ''. Как я могу затем преобразовать это в несколько RDD '' 'Map [String, String]' ''? –

1

Если вам в конечном итоге нужны они в отдельных RDD, вам понадобятся отдельные фильтры и несколько сканирований в какой-то момент. Вы должны кэшировать RDD (mapRDD в первом примере), который вы просматриваете, чтобы он не читался несколько раз.

Существует преимущество в том, чтобы делать фильтры, как вы их написали, или делать группировку, предложенную в другом ответе, поскольку фильтры могут возникать на стороне карты, тогда как фильтрация после группировки потребует перетасовки всех данных вокруг (включая данные, относящиеся к состояниям, которые вы не нужно ...)

Смежные вопросы