У меня есть проблема с производительностью с кодом, который я редактирую, каждый раз будет давать OOM
при выполнении счета. Я думаю, что я нашел проблему, в основном после keyBy
tranformation, исполняемый aggregateByKey.
Проблема заключается в том, что почти 98% элементов RDD имеют один и тот же ключ, поэтому aggregationByKey генерирует shuffle, помещает почти все записи в один и тот же перегородка, нижняя строка: всего несколько исполнителей работает, и для этого требуется много давления памяти.избежать перегородки дисбаланс Spark
Это код:
val rddAnomaliesByProcess : RDD[AnomalyPO] = rddAnomalies
.keyBy(po => po.getProcessCreator.name)
.aggregateByKey(List[AnomalyPO]())((list,value) => value +: list,_++_)
.map {case(name,list) =>
val groupByKeys = list.groupBy(po => (po.getPodId, po.getAnomalyCode, po.getAnomalyReason, po.getAnomalyDate, po.getMeasureUUID))
val lastOfGroupByKeys = groupByKeys.map{po => (po._1, List(po._2.sortBy { po => po.getProcessDate.getMillis }.last))}
lastOfGroupByKeys.flatMap(f => f._2)
}
.flatMap(f => f)
log.info("not duplicated Anomalies: " + rddAnomaliesByProcess.count)
Я бы способ выполнить операцию в более параллельно, что позволяет всем исполнителям работать почти одинаково. Как я могу это сделать?
Должен ли я использовать пользовательский разделитель?
* «Проблема заключается в том, что почти 98% элементов RDD имеет один и тот же ключ» * Есть ли причина, так много элементы имеют один и тот же ключ? Это бизнес-требование? –
На самом деле я не знаю, у меня нет функциональных знаний, я просто пытаюсь найти узкое место производительности. Я должен подумать, что они думали об этом, и разделение правильное. – Giorgio
Возможно, если бы генерация ключей была лучше и идеально одинаковой, у вас не было бы проблем, когда один раздел был бы таким большим. –