2016-07-23 3 views
0

У меня есть следующие RDD, содержащие наборы элементов, которые я хотел бы группировать по подобию элементов (Элементы в том же наборе считаются схожими. Сходство транзитивно и все элементы в наборах, которые по крайней мере один общий элемент, также рассматриваются аналогичные)Уменьшить Spark RDD для возврата нескольких значений

Входной РДД:

Set(w1, w2) 
Set(w1, w2, w3, w4) 
Set(w5, w2, w6) 
Set(w7, w8, w9) 
Set(w10, w5, w8) --> All the first 5 set elements are similar as each of the sets have atleast one common item 
Set(w11, w12, w13) 

Я хотел бы выше RDD, чтобы свести к

Set(w1, w2, w3, w4, w5, w6, w7, w8, w9, w10) 
Set(w11, w12, w13) 

Любой sugge что я могу сделать? Я не могу сделать что-то, как показано ниже, где я мог бы игнорировать сокращение два набора, если они не содержат каких-либо общих элементов:

data.reduce((a,b) => if (a.intersect(b).size > 0) a ++ b ***else (a,b)***) 

Спасибо.

ответ

0

Ваш reduce алгоритм на самом деле неверен. Например, что, если один набор не может сливаться со следующим набором, но все равно может быть объединен с другим набором в наборе.

Есть, вероятно, лучшие способы, но я думаю, что это решение, преобразовывая его в проблему графа и используя Graphx.

val data = Array(Set("w1", "w2", "w3"), Set("w5", "w6"), Set("w7"), Set("w2", "w3", "w4")) 
val setRdd = sc.parallelize(data).cache 

// Generate an unique id for each item to use as vertex's id in the graph 
val itemToId = setRdd.flatMap(_.toSeq).distinct.zipWithUniqueId.cache 
val idToItem = itemToId.map { case (item, itemId) => (itemId, item) } 

// Convert to a RDD of set of itemId 
val newSetRdd = setRdd.zipWithUniqueId 
    .flatMap { case (sets, setId) => 
    sets.map { item => (item, setId) } 
    }.join(itemToId).values.groupByKey().values 

// Create an RDD containing edges of the graph 
val edgeRdd = newSetRdd.flatMap { set => 
    val seq = set.toSeq 
    val head = seq.head 
    // Add an edge from the first item to each item in a set, 
    // including itself 
    seq.map { item => Edge[Long](head, item)} 
    } 

val graph = Graph.fromEdges(edgeRdd, Nil) 

// Run connected component algorithm to check which items are similar. 
// Items in the same component are similar 
val verticesRDD = graph.connectedComponents().vertices 

verticesRDD.join(idToItem).values.groupByKey.values.collect.foreach(println) 
+0

Отлично. Благодарю. Никогда не исследовал библиотеку Spark's Graphx, и мне пора это сделать. – soontobeared

Смежные вопросы