2016-04-20 2 views
0

Я пытаюсь найти оптимизированный способ генерации списка уникальных пар совпадений. Я попытался сделать это, используя серию плоских карт и различных запросов, но я обнаружил, что плоская карта не слишком сильно работает при работе над миллионами записей. Любая помощь в оптимизации этого будет получена с благодарностью.Оптимизация задач Spark

Набор данных (geohash, id), и я запускаю это на 30 узловых кластерах.

val rdd = sc.parallelize(Seq(("gh5", "id1"), ("gh4", "id1"), ("gh5", "id2"),("gh5", "id3")) 

val uniquePairings = rdd.groupByKey().map(value => 
    value._2.toList.sorted.combinations(2).map{ 
    case Seq(x, y) => (x, y)}.filter(id => 
    id._1 != id._2)).flatMap(x => x).distinct()  

voutput = Array(("id1","id2"),("id1","id3"),("id2","id3")) 

ответ

1

Простой join должно быть более чем достаточно здесь. Например, с DataFrames:

val df = rdd.toDF 
df.as("df1").join(df.as("df2"), 
    ($"df1._1" === $"df2._1") && 
    ($"df1._2" < $"df2._2") 
).select($"df1._2", $"df2._2") 

или наборы данных

val ds = rdd.toDS 
ds.as("ds1").joinWith(ds.as("ds2"), 
    ($"ds1._1" === $"ds2._1") && 
    ($"ds1._2" < $"ds2._2") 
).map{ case ((_, x), (_, y)) => (x, y)} 
+0

Спасибо за ответ, что кадры данных обеспечивают любые преимущества производительности соединения в плане перетасовки. – SChorlton

+0

Я не знаю об этом, но он более эффективен, чем стандартная совместная группа 'join' на RDD. – zero323

+0

Спасибо, что я немного изменил версию, чтобы сначала отсортировать ключи, чтобы поисковые запросы с большей вероятностью выполнялись локально. Предполагаю, что Spark будет выглядеть локально первым? – SChorlton

0

Посмотрите на декартово функцию. Он создает RDD, что является всеми возможными комбинациями входных RDD. Обратите внимание, что это дорогостоящая операция (N^2 в размере РДУ)

Cartesian example

+0

Спасибо за ответ, но работает и ту же функцию, используя декартовы выглядит гораздо медленнее. Я предполагаю, что это потому, что это не настоящая проблема n^2. – SChorlton

+0

Конечно, может быть. Возможно, стоит попробовать кэшировать ваш RDD перед декартовой операцией. Думая об этом еще немного, ваш оригинальный подход кажется лучшим способом выполнить эту задачу. Но вы должны использовать reduceByKey вместо groupByKey. Эта задача ассоциативна (так что вам не нужен groupByKey), и reduceByKey сохранит некоторые перетасовки данных (https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html) – David

+0

Спасибо снова я написал combByKey в моем полном примере, поскольку структура данных немного сложнее, чем пример, но спасибо за головы. Кажется, что задача занимает около 4-5 часов для 11 миллионов записей, которые кажутся немного высокими. Не уверен, действительно ли это реально. – SChorlton

Смежные вопросы