2017-02-19 2 views
2

Я хочу использовать intersection() ключом или filter() в искры.как использовать перекрестное пересечение() с помощью ключа или фильтра() с двумя RDD?

Но я действительно не знаю, как использовать intersection() ключом.

Так что я пытался использовать filter(), но это не сработало.

пример - вот два РДД:

data1 //RDD[(String, Int)] = Array(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1)) 
data2 //RDD[(String, Int)] = Array(("a", 3), ("b", 5)) 

val data3 = data2.map{_._1} 

data1.filter{_._1 == data3}.collect //Array[(String, Int] = Array() 

Я хочу, чтобы получить (ключ, значение) пару с тем же ключом, как data1 на основе ключа, который data2 имеет.

Array(("a", 1), ("a", 2), ("b", 2), ("b", 3)) - результат, который я хочу.

Есть ли способ решить эту проблему, используя intersection() ключом или filter()?

ответ

0

Я пытался улучшить свое решение с broadcast переменной в filter()

val data1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1))) 
val data2 = sc.parallelize(Seq(("a", 3), ("b", 5))) 

// broadcast data2 key list to use in filter method, which runs in executor nodes 
val bcast = sc.broadcast(data2.map(_._1).collect()) 

val result = data1.filter(r => bcast.value.contains(r._1)) 


println(result.collect().toList) 
//Output 
List((a,1), (a,2), (b,2), (b,3)) 

Edit1: (Согласно комментарий для решения масштабируемости с выкатывания с помощью collect())

val data1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1))) 
val data2 = sc.parallelize(Seq(("a", 3), ("b", 5))) 

val cogroupRdd: RDD[(String, (Iterable[Int], Iterable[Int]))] = data1.cogroup(data2) 
/* List(
    (a, (CompactBuffer(1, 2), CompactBuffer(3))), 
    (b, (CompactBuffer(2, 3), CompactBuffer(5))), 
    (c, (CompactBuffer(1), CompactBuffer())) 
) */ 

//Now filter keys which have two non empty CompactBuffer. You can do that with 
//filter(row => row._2._1.nonEmpty && row._2._2.nonEmpty) also. 
val filterRdd = cogroupRdd.filter { case (k, (v1, v2)) => v1.nonEmpty && v2.nonEmpty } 
/* List(
    (a, (CompactBuffer(1, 2), CompactBuffer(3))), 
    (b, (CompactBuffer(2, 3), CompactBuffer(5))) 
) */ 

//As we care about first data only, lets pick first compact buffer only 
// by doing v1.map(val1 => (k, val1)) 
val result = filterRdd.flatMap { case (k, (v1, v2)) => v1.map(val1 => (k, val1)) } 
//List((a, 1), (a, 2), (b, 2), (b, 3)) 

edit2:

val resultRdd = data1.join(data2).map(r => (r._1, r._2._1)).distinct() 
//List((b,2), (b,3), (a,2), (a,1)) 

Здесь data1.join(data2) держит пары с общими ключами (внутреннее соединение)

//List((a,(1,3)), (a,(2,3)), (b,(2,5)), (b,(2,1)), (b,(3,5)), (b,(3,1))) 
4

Для вашей проблемы, я думаю, cogroup() лучше подходит. Метод intersection() рассмотрит как ключи, так и значения в ваших данных и приведет к пустым rdd.

Функция cogroup() группирует значения обоих rdd «с ключом и дает нам (key, vals1, vals2), где vals1 и vals2 содержат значения data1 и data2 соответственно, для каждого ключа. Обратите внимание, что если определенный ключ не используется в обоих наборах данных, один из vals1 или vals2 будет возвращен как пустой Seq, поэтому сначала нам нужно отфильтровать эти кортежи, чтобы добраться до перекрестка двух rdd. ,

Далее мы захватить vals1 - который содержит Значения из data1 для общих ключей - и преобразовать его в формат (key, Array). Наконец, мы используем flatMapValues(), чтобы распечатать результат в формате (key, value).

val result = (data1.cogroup(data2) 
    .filter{case (k, (vals1, vals2)) => vals1.nonEmpty && vals2.nonEmpty } 
    .map{case (k, (vals1, vals2)) => (k, vals1.toArray)} 
    .flatMapValues(identity[Array[Int]])) 

result.collect() 
// Array[(String, Int)] = Array((a,1), (a,2), (b,2), (b,3)) 
+0

Я не понимаю 'cogroup' ну, что, если я хочу, чтобы использовать функциональные операции, как валь результат' = data1.filter (г => bcast.value.contains (myFuncOper (r._1))) 'в' cogroup'? –

Смежные вопросы