2016-11-09 2 views
0

При работе с компонентом, соединяющим большие данные, мне очень сложно слить их в искру.Как подключить компонент в Spark, когда данные слишком велики

Структура данных в моих исследованиях может быть упрощена до RDD[Array[Int]]. Например:

RDD[Array(1,2,3), Array(1,4), Array(5,6), Array(5,6,7,8), Array(9), Array(1)]

Цель состоит в том, чтобы объединить два массива, если они имеют пересечение установить, заканчиваясь с массивами без пересечения. Поэтому после слияния, оно должно быть:

RDD[Array(1,2,3,4), Array(5,6,7,8), Array(9)]

Проблема вроде компонента соединительной в рамках Преголя в Graph Algo. Одним из решений является сначала найти граничное соединение между двумя Array, используя декартовой продукт, а затем объединить их. Однако в моем случае есть 300KArray с общим размером 1G. Следовательно, сложность времени и памяти будет примерно 300K * 300K. Когда я запускаю программу на своем Mac Pro в искры, она полностью застревает.

Baiscally, это как:

enter image description here

Благодаря

+1

Извините, но 1G не большие данные ... Как это слишком большой? У большинства компьютеров Mac сейчас около 8 ГБ, верно? –

+0

@ cricket_007 Да, 1G - это не большие данные, но в итоге я продолжу свой алгоритм до больших данных. У моего Mac действительно есть 8G RAM. Поэтому, если я не могу запустить этот набор данных на своем Mac, это означает, что алгоритм нежелателен. –

+0

У меня нет особого кода, но я не уверен, что Spark действительно предназначен для этого конкретного алгоритма/задачи. Я сделал что-то подобное с dataframes и SparkSQL и использовал функции Windowing, но мне гарантировали последовательные номера. Кажется, у вас нет этого –

ответ

0

Вот мое решение. Не может быть достаточно приличным, но работает для небольших данных. Может ли он применяться к большим данным, нуждается в дополнительном доказательстве.

def mergeCanopy(canopies:RDD[Array[Int]]):Array[Array[Int]] = { /* try to merge two canopies */ val s = Set[Array[Int]]() val c = canopies.aggregate(s)(mergeOrAppend, _++_) return c.toArray

def mergeOrAppend(disjoint: Set[Array[Int]], cluster: Array[Int]):Set[Array[Int]] = { var disjoints = disjoint for (clus <- disjoint) { if (clus.toSet.&(cluster.toSet) != Set()) { disjoints += (clus.toSet++cluster.toSet).toArray disjoints -= clus return disjoints } } disjoints += cluster return disjoints }

Смежные вопросы