2015-05-27 2 views
1

Учитывая два больших ключ многозначных пары РДА (d1 и d2), и состоящий из уникальных ID ключей и вектора значения (например, RDD[Int,DenseVector]), я необходимо сопоставить d1, чтобы получить для каждого из своих элементов ID из ближайшего элемента в d2 с использованием эвклидового расстояния, метрического значения между векторов.Spark: Как сопоставить RDD, когда доступ к другому РДУ требуется

Я не нашел способ сделать это, используя стандартные преобразования RDD. Я понимаю, что вложенные РДУ не допускаются в Спарк, однако, если это было возможно, простое решение будет:

d1.map((k,v) => (k, d2.map{case (k2, v2) => val diff = (v - v2); (k2, sqrt(diff dot diff))} 
         .takeOrdered(1)(Ordering.by[(Double,Double), Double](_._2))  
         ._1)) 

Кроме того, если d1 был маленьким, я мог бы работать с картой (например, d1.collectAsMap()) и петли по каждому из его элементов, но это не вариант из-за размера набора данных.

Есть ли альтернатива этому преобразованию в Spark?

EDIT 1:

Использование @holden и @ Дэвид-грифон предложения, я решил вопрос с помощью cartesian() и reduceByKey(). Это сценарий (предполагается, что sc - SparkContext и использование библиотеки Breeze).

val d1 = sc.parallelize(List((1,DenseVector(0.0,0.0)), (2,DenseVector(1.0,0.0)), (3,DenseVector(0.0,1.0)))) 
val d2 = sc.parallelize(List((1,DenseVector(0.0,0.75)), (2,DenseVector(0.0,0.25)), (3,DenseVector(1.0,1.0)), (4,DenseVector(0.75,0.0)))) 

val d1Xd2 = d1.cartesian(d2) 
val pairDistances = d1Xd2.map{case ((k1, v1), (k2, v2)) => (k1, (k2, sqrt(sum(pow(v1-v2,2)))))} 
val closestPoints = pairDistances.reduceByKey{case (x, y) => if (x._2 < y._2) x else y } 

closestPoints.foreach(s => println(s._1 + " -> " + s._2._1)) 

Выход полученный:

1 -> 2 
2 -> 4 
3 -> 1 
+2

Я бы преобразовал их в DataFrames и попытался сделать это с помощью 'join' между ними. В противном случае, я думаю, вам нужно сделать 'd1.cartesian (d2)', а затем использовать 'reduce', чтобы найти кратчайшее расстояние для каждого' d1._1' –

+1

Довольно много дубликатов: http://stackoverflow.com/ a/29953122/21755 –

+0

Если оба набора данных велики, вы не можете сравнивать каждый элемент с каждым другим элементом. Вам нужно использовать схему разбиения пространства, затем «присоединиться» к разделам и найти наилучшие совпадения в разделах. –

ответ

2

Трансформации на РДУ могут быть применены только на стороне водителя, так вложенности карт не будут работать. Как указывает @davidgriffin, вы можете использовать cartesian. Для вашего случая использования вы, вероятно, захотите следить за этим с помощью reduceByKey, а внутри вашего сокращения по ключу вы можете отслеживать минимальное расстояние.

+0

Я обновил вопрос с помощью решения, основанного на вашем предложении. Спасибо. –

Смежные вопросы