2017-01-13 5 views
0

Рассмотрим следующую ситуацию: у вас есть два RDD пар ключ-значение, где каждый из двух ключей из двух RDD имеют разный тип.

RDD1 (Vector [String], String) выглядит следующим образом:Spark: пересечение ключей в RDD с разными типами ключей

(Vector("A", "B", "E"), "bla bla bla"), 
(Vector("W"), "bla bla bla bla"), 
(Vector("C", "M"), "bla bla bla bla bla"), 
(Vector("A", "V"), "bla bla bla") 
... 


RDD2 [(String, String)] выглядеть следующим образом:

("A", 12), 
("B", 434), 
("C", 8023), 
("D", 3454), 
... 
("N", 251) 

Примечание: что ключи в RDD2 от AN включительно.

Нужный выход пары первого RDD1 таким образом, что каждая строка в ключе вектора является подмножеством всего множества ключей RDD2

(Vector("A", "E", "B"), "bla bla bla"), 
(Vector("C", "M"), "bla bla bla bla bla") 


также, если это не представляется возможным с РДУ, Я хотел бы знать, как другие абстракции, как dataframe и набора данных может достичь этого результата

ответ

0
def myFilter(rdd1: RDD[(Vector[String],String)], rdd2: RDD[(String,String)]): RDD[(Vector[String],String)] = { 

    val keys = rdd2.map(_._1).collect() 

    val filtered = rdd1.filter{ entry => 
     entry._1.forall(str => keys.contains(str)) 
    } 
    filtered 
} 

Это не самый эффективный способ получить это сделать, но получает работу.

+0

спасибо за ваш ответ. FYI вы могли бы использовать ключи val = rdd2.keys.collect(), которые более читабельны. Тем не менее, я все еще ищу способ получить эти результаты с помощью преобразований RDD или использовать другие абстракции, которые мне менее знакомы с такими данными, как dataframe и dataatet. – 7kemZmani

Смежные вопросы