2016-01-04 2 views
2

Я ищу способ сравнения подмножеств RDD разумно.Сравнение подмножеств RDD

Допустим, у меня был RDD с парами ключ/значение типа (Int-> T). В конечном итоге мне нужно сказать «сравнить все значения ключа 1 со всеми значениями ключа 2 и сравнить значения ключа 3 со значениями ключа 5 и ключа 7», как бы я мог сделать это эффективно?

Пути я сейчас думаю сделать это путем создания списка отфильтрованного РДА, а затем с помощью RDD.cartesian()

def filterSubset[T] = (b:Int, r:RDD[(Int, T)]) => r.filter{case(name, _) => name == b} 

Val keyPairs:(Int, Int) // all key pairs 

Val rddPairs = keyPairs.map{ 

      case (a, b) => 

       filterSubset(a,r).cartesian(filterSubset(b,r)) 

     } 

rddPairs.map{whatever I want to compare…} 

Я бы тогда перебирать список и выполнить карту на каждом из RDD пар для сбора реляционных данных, которые мне нужны.

То, что я не могу сказать об этой идее, заключается в том, было бы крайне неэффективно устанавливать, возможно, сотни заданий на карту, а затем перебирать их через них. В этом случае ленивая оценка в искровом режиме оптимизирует перетасовку данных между всеми картами? Если нет, может кто-то может рекомендовать, возможно, более эффективный способ решения этой проблемы?

Спасибо за вашу помощь

+1

Может вам лучше объяснить логику сравнения? Какие ключи вы хотите сравнить? –

+1

Конечно, хорошим примером этого может быть, если бы у меня был RDD [(Int, Vector)], и я только хотел рассчитать косинус-сходство векторов с соответствующими ключами (ключи по существу разделяют разные группы векторов). –

+1

Сколько у вас данных за ключ? Это сопоставление «один к одному» или вы хотите сравнить с несколькими подмножествами (например, 1 с {2, 3, 5, 7})? – zero323

ответ

3

Одним из способов вы можете подойти к этой проблеме заключается в репликации и разделы данных, чтобы отразить ключевые пары, которые вы хотите сравнить. Давайте начнем с создания двух карт из реальных ключей временных ключей мы будем использовать для репликации и присоединяется:

def genMap(keys: Seq[Int]) = keys 
    .zipWithIndex.groupBy(_._1) 
    .map{case (k, vs) => (k -> vs.map(_._2))} 

val left = genMap(keyPairs.map(_._1)) 
val right = genMap(keyPairs.map(_._2)) 

Далее мы можем преобразовать данные по репликации с новыми ключами:

def mapAndReplicate[T: ClassTag](rdd: RDD[(Int, T)], map: Map[Int, Seq[Int]]) = { 
    rdd.flatMap{case (k, v) => map.getOrElse(k, Seq()).map(x => (x, (k, v)))} 
} 

val leftRDD = mapAndReplicate(rddPairs, left) 
val rightRDD = mapAndReplicate(rddPairs, right) 

Наконец мы может cogroup:

val cogrouped = leftRDD.cogroup(rightRDD) 

И сравните/фильтр пары:

cogrouped.values.flatMap{case (xs, ys) => for { 
    (kx, vx) <- xs 
    (ky, vy) <- ys 
    if cosineSimilarity(vx, vy) <= threshold 
} yield ((kx, vx), (ky, vy)) } 

Очевидно, что в текущей форме этот подход ограничен. Он предполагает, что значения для произвольной пары ключей могут вписываться в память и требуют значительного количества сетевого трафика. Тем не менее это должно дать вам некоторое представление о том, как действовать.

Другим возможным подходом является хранение данных во внешней системе (например, база данных) и выбор требуемых пар ключ-значение по требованию.

Поскольку вы пытаетесь найти сходство между элементами, я бы также рассмотрел совершенно другой подход. Вместо того, чтобы наивно сравнивать ключ за ключом, я попытался бы разделить данные с помощью пользовательского разделителя, который отражает ожидаемое сходство между документами. В целом это далеко не тривиально, но должно давать гораздо лучшие результаты.

+0

Спасибо zero323. Это дает мне много потенциальных направлений для реализации этого проекта (мне особенно нравится идея использования настраиваемого разделителя) –

0

Использование Dataframe вы можете легко сделать декартово операцию с помощью присоединения:

dataframe1.join(dataframe2, dataframe1("key")===dataframe2("key")) 

Это будет, вероятно, именно то, что вы хотите, но эффективно.

Если вы не знаете, как создать Dataframe, пожалуйста, обратитесь к http://spark.apache.org/docs/latest/sql-programming-guide.html#creating-dataframes

Смежные вопросы