Учитывая два больших ключ многозначных пары РДА (d1
и d2
), и состоящий из уникальных ID ключей и вектора значения (например, RDD[Int,DenseVector]
), я необходимо сопоставить d1
, чтобы получить для каждого из своих элементов ID из ближайшего элемента в d2
с использованием эвклидового расстояния, метрического значения между векторов.Spark: Как сопоставить RDD, когда доступ к другому РДУ требуется
Я не нашел способ сделать это, используя стандартные преобразования RDD. Я понимаю, что вложенные РДУ не допускаются в Спарк, однако, если это было возможно, простое решение будет:
d1.map((k,v) => (k, d2.map{case (k2, v2) => val diff = (v - v2); (k2, sqrt(diff dot diff))}
.takeOrdered(1)(Ordering.by[(Double,Double), Double](_._2))
._1))
Кроме того, если d1
был маленьким, я мог бы работать с картой (например, d1.collectAsMap()
) и петли по каждому из его элементов, но это не вариант из-за размера набора данных.
Есть ли альтернатива этому преобразованию в Spark?
EDIT 1:
Использование @holden и @ Дэвид-грифон предложения, я решил вопрос с помощью cartesian()
и reduceByKey()
. Это сценарий (предполагается, что sc
- SparkContext
и использование библиотеки Breeze).
val d1 = sc.parallelize(List((1,DenseVector(0.0,0.0)), (2,DenseVector(1.0,0.0)), (3,DenseVector(0.0,1.0))))
val d2 = sc.parallelize(List((1,DenseVector(0.0,0.75)), (2,DenseVector(0.0,0.25)), (3,DenseVector(1.0,1.0)), (4,DenseVector(0.75,0.0))))
val d1Xd2 = d1.cartesian(d2)
val pairDistances = d1Xd2.map{case ((k1, v1), (k2, v2)) => (k1, (k2, sqrt(sum(pow(v1-v2,2)))))}
val closestPoints = pairDistances.reduceByKey{case (x, y) => if (x._2 < y._2) x else y }
closestPoints.foreach(s => println(s._1 + " -> " + s._2._1))
Выход полученный:
1 -> 2
2 -> 4
3 -> 1
Я бы преобразовал их в DataFrames и попытался сделать это с помощью 'join' между ними. В противном случае, я думаю, вам нужно сделать 'd1.cartesian (d2)', а затем использовать 'reduce', чтобы найти кратчайшее расстояние для каждого' d1._1' –
Довольно много дубликатов: http://stackoverflow.com/ a/29953122/21755 –
Если оба набора данных велики, вы не можете сравнивать каждый элемент с каждым другим элементом. Вам нужно использовать схему разбиения пространства, затем «присоединиться» к разделам и найти наилучшие совпадения в разделах. –