2016-11-08 10 views
2

При проведении исследований мне сложно удалить все подмножества в Spark RDD.Как эффективно удалить подмножество в искровом RDD

Структура данных RDD[(key,set)]. Например, это может быть:

RDD[ ("peter",Set(1,2,3)), ("mike",Set(1,3)), ("jack",Set(5)) ]

Поскольку множество микрофону (Set(1,3)) является подмножеством Петра (Set(1,2,3)), я хочу удалить «микрофон», который будет в конечном итоге с

RDD[ ("peter",Set(1,2,3)), ("jack",Set(5)) ]

Легко реализовать в python локально с двумя циклами «для». Но когда я хочу распространиться на облако с помощью scala и искры, не так просто найти хорошее решение.

Thanks

+0

Что вы делаете со связями? '(" peter ", Set (1,2,3))' & '(" olga ", Set (1,2,3))' – maasg

+0

Просто удалите один из них. Сохранение того, что не имеет значения. –

+0

Предлагаемое решение поддерживает оба. Вам предлагается адаптировать его к вашим конкретным потребностям. – maasg

ответ

1

Я сомневаюсь, что мы можем бежать сравнивая каждый элемент друг с другом (эквивалент двойной петли в нераспределенном алгоритме) , Операция подмножества между наборами не является рефлексивной, что означает, что нам нужно сравнить is "alice" subsetof "bob" и is "bob" subsetof "alice".

Чтобы сделать это с помощью искровой API, мы можем прибегнуть к перемножения данных с самим собой, используя декартово произведение и проверки каждой записи из полученной матрицы:

val data = Seq(("peter",Set(1,2,3)), ("mike",Set(1,3)), ("anne", Set(7)),("jack",Set(5,4,1)), ("lizza", Set(5,1)), ("bart", Set(5,4)), ("maggie", Set(5))) 
// expected result from this dataset = peter, olga, anne, jack 
val userSet = sparkContext.parallelize(data) 
val prod = userSet.cartesian(userSet) 
val subsetMembers = prod.collect{case ((name1, set1), (name2,set2)) if (name1 != name2) && (set2.subsetOf(set1)) && (set1 -- set2).nonEmpty => (name2, set2) } 
val superset = userSet.subtract(subsetMembers)  

// lets see the results: 
superset.collect() 
// Array[(String, scala.collection.immutable.Set[Int])] = Array((olga,Set(1, 2, 3)), (peter,Set(1, 2, 3)), (anne,Set(7)), (jack,Set(5, 4, 1))) 
-3

Вы можете использовать фильтр после карты.

Вы можете построить как карту, которая вернет значение для того, что вы хотите удалить. Во-первых построить функцию:

def filter_mike(line): 
    if line[1] != Set(1,3): 
     return line 
    else: 
     return None 

Затем вы можете фильтровать сейчас вроде этого:

your_rdd.map(filter_mike).filter(lambda x: x != None) 

Это будет работать

+0

Снова прочитайте вопрос. – shanmuga

1

Это может быть достигнуто с помощью RDD.fold функции.
В этом случае требуемым выходом является «Список» (ItemList) superset элементов. Для этого вход также должен быть преобразован в «Список» (RDD из ITEMLIST)

import org.apache.spark.rdd.RDD 

// type alias for convinience 
type Item = Tuple2[String, Set[Int]] 
type ItemList = List[Item] 

// Source RDD 
val lst:RDD[Item] = sc.parallelize(List(("peter",Set(1,2,3)), ("mike",Set(1,3)), ("jack",Set(5)))) 


// Convert each element as a List. This is needed for using fold function on RDD 
// since the data-type of the parameters are the same as output parameter 
// data-type for fold function 
val listOflst:RDD[ItemList] = lst.map(x => List(x)) 

// for each element in second ItemList 
// - Check if it is not subset of any element in first ItemList and add first 
// - Remove the subset of newly added elements 
def combiner(first:ItemList, second:ItemList) : ItemList = { 
    def helper(lst: ItemList, i:Item) : ItemList = { 
     val isSubset: Boolean = lst.exists(x=> i._2.subsetOf(x._2)) 
     if(isSubset) lst else i :: lst.filterNot(x => x._2.subsetOf(i._2)) 
    } 
    second.foldLeft(first)(helper) 
} 


listOflst.fold(List())(combiner) 
Смежные вопросы