2015-09-21 3 views
1

Я хочу присоединиться к двум наборам, применяя широковещательную переменную. Я пытаюсь реализовать первое предложение от Spark: what's the best strategy for joining a 2-tuple-key RDD with single-key RDD?Поиск значений в широковещательной переменной

val emp_newBC = sc.broadcast(emp_new.collectAsMap()) 
val joined = emp.mapPartitions({ iter => 
     val m = emp_newBC.value 
     for { 
     ((t, w)) <- iter 
     if m.contains(t) 
     } yield ((w + '-' + m.get(t).get),1) 
    }, preservesPartitioning = true) 

Однако, как указано здесь: broadcast variable fails to take all data мне нужно использовать сбор(), а не collectAsMAp(). Я попытался скорректировать свой код, как показано ниже:

val emp_newBC = sc.broadcast(emp_new.collect()) 
val joined = emp.mapPartitions({ iter => 
     val m = emp_newBC.value 
     for { 
     ((t, w)) <- iter 
     if m.contains(t) 
     amk = m.indexOf(t) 
     } yield ((w + '-' + emp_newBC.value(amk)),1) //yield ((t, w), (m.get(t).get)) //((w + '-' + m.get(t).get),1) 
    }, preservesPartitioning = true) 

Но, похоже, m.contains (t) не отвечает. Как я могу это исправить?

Заранее спасибо.

ответ

2

Как насчет чего-то подобного?

val emp_newBC = sc.broadcast(emp_new.groupByKey.collectAsMap) 

val joined = emp.mapPartitions(iter => for { 
    (k, v1) <- iter 
    v2 <- emp_newBC.value.getOrElse(k, Iterable()) 
} yield (s"$v1-$v2", 1)) 

Что касается вашего кода ... Насколько я понимаю emp_new является RDD[(String, String)]. Когда он будет собран, вы получите Array[(String, String)]. При использовании

((t, w)) <- iter 

t является String так m.contains(t) всегда будет возвращать false.

Другая проблема, которую я вижу, это preservesPartitioning = true внутри mapPartitions. Там несколько возможных сценариев:

  1. emp разбивается, и вы хотите joined быть разбит на разделы, а также. Так как вы меняете ключ от t до некоторого нового значения, разбиение не может быть сохранено, и итоговое значение RDD должно быть переразделено. Если вы используете preservesPartitioning = true, вывод RDD закончится неправильными разделами.
  2. emp разделен, но вам не нужно разбиение на разделы для joined. Нет причин использовать preservesPartitioning.
  3. emp не разделен. Значение preservesPartitioning не влияет.
+0

val emp_newBC = sc.broadcast (emp_new.groupByKey.collectAsMap) часть по-прежнему возвращает отсутствующие данные. Я пытаюсь с небольшим набором данных, и я ожидаю 354 строки в моей переменной широковещания, но он возвращает 312 строк. Я думаю, что причина по-прежнему действительна, как указано во 2-й ссылке, которую я написал на моем посту. Не возможно ли работать только с collect()? И да, emp должен быть разделен. На самом деле emp имеет 500 разделов. Я не понимаю, как переделать новый rdd. Должен ли я переделать? –

+0

Он должен охватывать все значения. Что вы получаете при запуске 'emp_newBC.value.values.map (_. Size) .sum'? Что касается секционирования, у вас есть разделитель? Каков результат от 'emp.partitioner'? – zero323

+0

Чтобы контролировать размер, я проверил его через 'val oo = emp_newBC.value', и в результате он все еще показывал меньшее количество. Однако, когда я запускаю свой код, он работает отлично. Теперь я задаюсь вопросом, будет ли это хороший подход. ..'joined.repartition (sc.defaultParallelism * 500) 'Будет ли я снова разбиваться на 500, как это было в начале? –

Смежные вопросы