Я хочу присоединиться к двум наборам, применяя широковещательную переменную. Я пытаюсь реализовать первое предложение от Spark: what's the best strategy for joining a 2-tuple-key RDD with single-key RDD?Поиск значений в широковещательной переменной
val emp_newBC = sc.broadcast(emp_new.collectAsMap())
val joined = emp.mapPartitions({ iter =>
val m = emp_newBC.value
for {
((t, w)) <- iter
if m.contains(t)
} yield ((w + '-' + m.get(t).get),1)
}, preservesPartitioning = true)
Однако, как указано здесь: broadcast variable fails to take all data мне нужно использовать сбор(), а не collectAsMAp(). Я попытался скорректировать свой код, как показано ниже:
val emp_newBC = sc.broadcast(emp_new.collect())
val joined = emp.mapPartitions({ iter =>
val m = emp_newBC.value
for {
((t, w)) <- iter
if m.contains(t)
amk = m.indexOf(t)
} yield ((w + '-' + emp_newBC.value(amk)),1) //yield ((t, w), (m.get(t).get)) //((w + '-' + m.get(t).get),1)
}, preservesPartitioning = true)
Но, похоже, m.contains (t) не отвечает. Как я могу это исправить?
Заранее спасибо.
val emp_newBC = sc.broadcast (emp_new.groupByKey.collectAsMap) часть по-прежнему возвращает отсутствующие данные. Я пытаюсь с небольшим набором данных, и я ожидаю 354 строки в моей переменной широковещания, но он возвращает 312 строк. Я думаю, что причина по-прежнему действительна, как указано во 2-й ссылке, которую я написал на моем посту. Не возможно ли работать только с collect()? И да, emp должен быть разделен. На самом деле emp имеет 500 разделов. Я не понимаю, как переделать новый rdd. Должен ли я переделать? –
Он должен охватывать все значения. Что вы получаете при запуске 'emp_newBC.value.values.map (_. Size) .sum'? Что касается секционирования, у вас есть разделитель? Каков результат от 'emp.partitioner'? – zero323
Чтобы контролировать размер, я проверил его через 'val oo = emp_newBC.value', и в результате он все еще показывал меньшее количество. Однако, когда я запускаю свой код, он работает отлично. Теперь я задаюсь вопросом, будет ли это хороший подход. ..'joined.repartition (sc.defaultParallelism * 500) 'Будет ли я снова разбиваться на 500, как это было в начале? –